Skip to main content

icydb_core/db/session/
query.rs

1//! Module: db::session::query
2//! Responsibility: session-bound query planning, explain, and cursor execution
3//! helpers that recover store visibility before delegating to query-owned logic.
4//! Does not own: query intent construction or executor runtime semantics.
5//! Boundary: resolves session visibility and cursor policy before handing work to the planner/executor.
6
7#[cfg(feature = "perf-attribution")]
8use crate::db::executor::{GroupedExecutePhaseAttribution, ScalarExecutePhaseAttribution};
9use crate::{
10    db::{
11        DbSession, EntityResponse, LoadQueryResult, PagedGroupedExecutionWithTrace,
12        PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
13        access::AccessStrategy,
14        commit::CommitSchemaFingerprint,
15        cursor::{
16            CursorPlanError, decode_optional_cursor_token, decode_optional_grouped_cursor_token,
17        },
18        diagnostics::ExecutionTrace,
19        executor::{
20            ExecutionFamily, GroupedCursorPage, LoadExecutor, PreparedExecutionPlan,
21            SharedPreparedExecutionPlan,
22        },
23        query::builder::{
24            PreparedFluentAggregateExplainStrategy, PreparedFluentProjectionStrategy,
25        },
26        query::explain::{
27            ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
28        },
29        query::{
30            intent::{CompiledQuery, PlannedQuery, StructuralQuery},
31            plan::{AccessPlannedQuery, QueryMode, VisibleIndexes},
32        },
33    },
34    error::InternalError,
35    model::entity::EntityModel,
36    traits::{CanisterKind, EntityKind, EntityValue, Path},
37};
38#[cfg(feature = "perf-attribution")]
39use candid::CandidType;
40use icydb_utils::Xxh3;
41#[cfg(feature = "perf-attribution")]
42use serde::Deserialize;
43use std::{cell::RefCell, collections::HashMap, hash::BuildHasherDefault};
44
45type CacheBuildHasher = BuildHasherDefault<Xxh3>;
46
47// Bump this when the shared lower query-plan cache key meaning changes in a
48// way that must force old in-heap entries to miss instead of aliasing.
49const SHARED_QUERY_PLAN_CACHE_METHOD_VERSION: u8 = 1;
50
51#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
52pub(in crate::db) enum QueryPlanVisibility {
53    StoreNotReady,
54    StoreReady,
55}
56
57#[derive(Clone, Debug, Eq, Hash, PartialEq)]
58pub(in crate::db) struct QueryPlanCacheKey {
59    cache_method_version: u8,
60    entity_path: &'static str,
61    schema_fingerprint: CommitSchemaFingerprint,
62    visibility: QueryPlanVisibility,
63    structural_query: crate::db::query::intent::StructuralQueryCacheKey,
64}
65
66#[derive(Clone, Debug)]
67pub(in crate::db) struct QueryPlanCacheEntry {
68    logical_plan: AccessPlannedQuery,
69    prepared_plan: SharedPreparedExecutionPlan,
70}
71
72impl QueryPlanCacheEntry {
73    #[must_use]
74    pub(in crate::db) const fn new(
75        logical_plan: AccessPlannedQuery,
76        prepared_plan: SharedPreparedExecutionPlan,
77    ) -> Self {
78        Self {
79            logical_plan,
80            prepared_plan,
81        }
82    }
83
84    #[must_use]
85    pub(in crate::db) const fn logical_plan(&self) -> &AccessPlannedQuery {
86        &self.logical_plan
87    }
88
89    #[must_use]
90    pub(in crate::db) fn typed_prepared_plan<E: EntityKind>(&self) -> PreparedExecutionPlan<E> {
91        self.prepared_plan.typed_clone::<E>()
92    }
93
94    #[must_use]
95    pub(in crate::db) const fn prepared_plan(&self) -> &SharedPreparedExecutionPlan {
96        &self.prepared_plan
97    }
98}
99
100pub(in crate::db) type QueryPlanCache =
101    HashMap<QueryPlanCacheKey, QueryPlanCacheEntry, CacheBuildHasher>;
102
103thread_local! {
104    // Keep one in-heap query-plan cache per store registry so fresh `DbSession`
105    // facades can share prepared logical plans across update/query calls while
106    // tests and multi-registry host processes remain isolated by registry
107    // identity.
108    static QUERY_PLAN_CACHES: RefCell<HashMap<usize, QueryPlanCache, CacheBuildHasher>> =
109        RefCell::new(HashMap::default());
110}
111
112#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
113pub(in crate::db) struct QueryPlanCacheAttribution {
114    pub hits: u64,
115    pub misses: u64,
116}
117
118impl QueryPlanCacheAttribution {
119    #[must_use]
120    const fn hit() -> Self {
121        Self { hits: 1, misses: 0 }
122    }
123
124    #[must_use]
125    const fn miss() -> Self {
126        Self { hits: 0, misses: 1 }
127    }
128}
129
130///
131/// QueryExecutionAttribution
132///
133/// QueryExecutionAttribution records the top-level compile/execute split for
134/// typed/fluent query execution at the session boundary.
135///
136#[cfg(feature = "perf-attribution")]
137#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
138pub struct QueryExecutionAttribution {
139    pub compile_local_instructions: u64,
140    pub runtime_local_instructions: u64,
141    pub finalize_local_instructions: u64,
142    pub direct_data_row_scan_local_instructions: u64,
143    pub direct_data_row_key_stream_local_instructions: u64,
144    pub direct_data_row_row_read_local_instructions: u64,
145    pub direct_data_row_key_encode_local_instructions: u64,
146    pub direct_data_row_store_get_local_instructions: u64,
147    pub direct_data_row_order_window_local_instructions: u64,
148    pub direct_data_row_page_window_local_instructions: u64,
149    pub grouped_stream_local_instructions: u64,
150    pub grouped_fold_local_instructions: u64,
151    pub grouped_finalize_local_instructions: u64,
152    pub grouped_count_borrowed_hash_computations: u64,
153    pub grouped_count_bucket_candidate_checks: u64,
154    pub grouped_count_existing_group_hits: u64,
155    pub grouped_count_new_group_inserts: u64,
156    pub response_decode_local_instructions: u64,
157    pub execute_local_instructions: u64,
158    pub total_local_instructions: u64,
159    pub shared_query_plan_cache_hits: u64,
160    pub shared_query_plan_cache_misses: u64,
161}
162
163#[cfg(feature = "perf-attribution")]
164#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
165struct QueryExecutePhaseAttribution {
166    runtime_local_instructions: u64,
167    finalize_local_instructions: u64,
168    direct_data_row_scan_local_instructions: u64,
169    direct_data_row_key_stream_local_instructions: u64,
170    direct_data_row_row_read_local_instructions: u64,
171    direct_data_row_key_encode_local_instructions: u64,
172    direct_data_row_store_get_local_instructions: u64,
173    direct_data_row_order_window_local_instructions: u64,
174    direct_data_row_page_window_local_instructions: u64,
175    grouped_stream_local_instructions: u64,
176    grouped_fold_local_instructions: u64,
177    grouped_finalize_local_instructions: u64,
178    grouped_count_borrowed_hash_computations: u64,
179    grouped_count_bucket_candidate_checks: u64,
180    grouped_count_existing_group_hits: u64,
181    grouped_count_new_group_inserts: u64,
182}
183
184#[cfg(feature = "perf-attribution")]
185#[expect(
186    clippy::missing_const_for_fn,
187    reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
188)]
189fn read_query_local_instruction_counter() -> u64 {
190    #[cfg(target_arch = "wasm32")]
191    {
192        canic_cdk::api::performance_counter(1)
193    }
194
195    #[cfg(not(target_arch = "wasm32"))]
196    {
197        0
198    }
199}
200
201#[cfg(feature = "perf-attribution")]
202fn measure_query_stage<T, E>(run: impl FnOnce() -> Result<T, E>) -> (u64, Result<T, E>) {
203    let start = read_query_local_instruction_counter();
204    let result = run();
205    let delta = read_query_local_instruction_counter().saturating_sub(start);
206
207    (delta, result)
208}
209
210impl<C: CanisterKind> DbSession<C> {
211    #[cfg(feature = "perf-attribution")]
212    const fn empty_query_execute_phase_attribution() -> QueryExecutePhaseAttribution {
213        QueryExecutePhaseAttribution {
214            runtime_local_instructions: 0,
215            finalize_local_instructions: 0,
216            direct_data_row_scan_local_instructions: 0,
217            direct_data_row_key_stream_local_instructions: 0,
218            direct_data_row_row_read_local_instructions: 0,
219            direct_data_row_key_encode_local_instructions: 0,
220            direct_data_row_store_get_local_instructions: 0,
221            direct_data_row_order_window_local_instructions: 0,
222            direct_data_row_page_window_local_instructions: 0,
223            grouped_stream_local_instructions: 0,
224            grouped_fold_local_instructions: 0,
225            grouped_finalize_local_instructions: 0,
226            grouped_count_borrowed_hash_computations: 0,
227            grouped_count_bucket_candidate_checks: 0,
228            grouped_count_existing_group_hits: 0,
229            grouped_count_new_group_inserts: 0,
230        }
231    }
232
233    #[cfg(feature = "perf-attribution")]
234    const fn scalar_query_execute_phase_attribution(
235        phase: ScalarExecutePhaseAttribution,
236    ) -> QueryExecutePhaseAttribution {
237        QueryExecutePhaseAttribution {
238            runtime_local_instructions: phase.runtime_local_instructions,
239            finalize_local_instructions: phase.finalize_local_instructions,
240            direct_data_row_scan_local_instructions: phase.direct_data_row_scan_local_instructions,
241            direct_data_row_key_stream_local_instructions: phase
242                .direct_data_row_key_stream_local_instructions,
243            direct_data_row_row_read_local_instructions: phase
244                .direct_data_row_row_read_local_instructions,
245            direct_data_row_key_encode_local_instructions: phase
246                .direct_data_row_key_encode_local_instructions,
247            direct_data_row_store_get_local_instructions: phase
248                .direct_data_row_store_get_local_instructions,
249            direct_data_row_order_window_local_instructions: phase
250                .direct_data_row_order_window_local_instructions,
251            direct_data_row_page_window_local_instructions: phase
252                .direct_data_row_page_window_local_instructions,
253            grouped_stream_local_instructions: 0,
254            grouped_fold_local_instructions: 0,
255            grouped_finalize_local_instructions: 0,
256            grouped_count_borrowed_hash_computations: 0,
257            grouped_count_bucket_candidate_checks: 0,
258            grouped_count_existing_group_hits: 0,
259            grouped_count_new_group_inserts: 0,
260        }
261    }
262
263    #[cfg(feature = "perf-attribution")]
264    const fn grouped_query_execute_phase_attribution(
265        phase: GroupedExecutePhaseAttribution,
266    ) -> QueryExecutePhaseAttribution {
267        QueryExecutePhaseAttribution {
268            runtime_local_instructions: phase
269                .stream_local_instructions
270                .saturating_add(phase.fold_local_instructions),
271            finalize_local_instructions: phase.finalize_local_instructions,
272            direct_data_row_scan_local_instructions: 0,
273            direct_data_row_key_stream_local_instructions: 0,
274            direct_data_row_row_read_local_instructions: 0,
275            direct_data_row_key_encode_local_instructions: 0,
276            direct_data_row_store_get_local_instructions: 0,
277            direct_data_row_order_window_local_instructions: 0,
278            direct_data_row_page_window_local_instructions: 0,
279            grouped_stream_local_instructions: phase.stream_local_instructions,
280            grouped_fold_local_instructions: phase.fold_local_instructions,
281            grouped_finalize_local_instructions: phase.finalize_local_instructions,
282            grouped_count_borrowed_hash_computations: phase
283                .grouped_count_borrowed_hash_computations,
284            grouped_count_bucket_candidate_checks: phase.grouped_count_bucket_candidate_checks,
285            grouped_count_existing_group_hits: phase.grouped_count_existing_group_hits,
286            grouped_count_new_group_inserts: phase.grouped_count_new_group_inserts,
287        }
288    }
289
290    fn query_plan_cache_scope_id(&self) -> usize {
291        self.db.cache_scope_id()
292    }
293
294    fn with_query_plan_cache<R>(&self, f: impl FnOnce(&mut QueryPlanCache) -> R) -> R {
295        let scope_id = self.query_plan_cache_scope_id();
296
297        QUERY_PLAN_CACHES.with(|caches| {
298            let mut caches = caches.borrow_mut();
299            let cache = caches.entry(scope_id).or_default();
300
301            f(cache)
302        })
303    }
304
305    const fn visible_indexes_for_model(
306        model: &'static EntityModel,
307        visibility: QueryPlanVisibility,
308    ) -> VisibleIndexes<'static> {
309        match visibility {
310            QueryPlanVisibility::StoreReady => VisibleIndexes::planner_visible(model.indexes()),
311            QueryPlanVisibility::StoreNotReady => VisibleIndexes::none(),
312        }
313    }
314
315    #[cfg(test)]
316    pub(in crate::db) fn query_plan_cache_len(&self) -> usize {
317        self.with_query_plan_cache(|cache| cache.len())
318    }
319
320    #[cfg(test)]
321    pub(in crate::db) fn clear_query_plan_cache_for_tests(&self) {
322        self.with_query_plan_cache(QueryPlanCache::clear);
323    }
324
325    pub(in crate::db) fn query_plan_visibility_for_store_path(
326        &self,
327        store_path: &'static str,
328    ) -> Result<QueryPlanVisibility, QueryError> {
329        let store = self
330            .db
331            .recovered_store(store_path)
332            .map_err(QueryError::execute)?;
333        let visibility = if store.index_state() == crate::db::IndexState::Ready {
334            QueryPlanVisibility::StoreReady
335        } else {
336            QueryPlanVisibility::StoreNotReady
337        };
338
339        Ok(visibility)
340    }
341
342    pub(in crate::db) fn cached_query_plan_entry_for_authority(
343        &self,
344        authority: crate::db::executor::EntityAuthority,
345        schema_fingerprint: CommitSchemaFingerprint,
346        query: &StructuralQuery,
347    ) -> Result<(QueryPlanCacheEntry, QueryPlanCacheAttribution), QueryError> {
348        let visibility = self.query_plan_visibility_for_store_path(authority.store_path())?;
349        let visible_indexes = Self::visible_indexes_for_model(authority.model(), visibility);
350        let normalized_predicate = query.prepare_normalized_scalar_predicate()?;
351        let cache_key = QueryPlanCacheKey::for_authority_with_normalized_predicate(
352            authority,
353            schema_fingerprint,
354            visibility,
355            query,
356            normalized_predicate.as_ref(),
357        );
358
359        {
360            let cached = self.with_query_plan_cache(|cache| cache.get(&cache_key).cloned());
361            if let Some(entry) = cached {
362                return Ok((entry, QueryPlanCacheAttribution::hit()));
363            }
364        }
365
366        let plan = query.build_plan_with_visible_indexes_from_normalized_predicate(
367            &visible_indexes,
368            normalized_predicate,
369        )?;
370        let entry = QueryPlanCacheEntry::new(
371            plan.clone(),
372            SharedPreparedExecutionPlan::from_plan(authority, plan),
373        );
374        self.with_query_plan_cache(|cache| {
375            cache.insert(cache_key, entry.clone());
376        });
377
378        Ok((entry, QueryPlanCacheAttribution::miss()))
379    }
380
381    #[cfg(test)]
382    pub(in crate::db) fn query_plan_cache_key_for_tests(
383        authority: crate::db::executor::EntityAuthority,
384        schema_fingerprint: CommitSchemaFingerprint,
385        visibility: QueryPlanVisibility,
386        query: &StructuralQuery,
387        cache_method_version: u8,
388    ) -> QueryPlanCacheKey {
389        QueryPlanCacheKey::for_authority_with_method_version(
390            authority,
391            schema_fingerprint,
392            visibility,
393            query,
394            cache_method_version,
395        )
396    }
397
398    pub(in crate::db) fn cached_structural_plan_for_authority(
399        &self,
400        authority: crate::db::executor::EntityAuthority,
401        schema_fingerprint: CommitSchemaFingerprint,
402        query: &StructuralQuery,
403    ) -> Result<AccessPlannedQuery, QueryError> {
404        let (entry, _) =
405            self.cached_query_plan_entry_for_authority(authority, schema_fingerprint, query)?;
406
407        Ok(entry.logical_plan().clone())
408    }
409
410    // Resolve the planner-visible index slice for one typed query exactly once
411    // at the session boundary before handing execution/planning off to query-owned logic.
412    fn with_query_visible_indexes<E, T>(
413        &self,
414        query: &Query<E>,
415        op: impl FnOnce(
416            &Query<E>,
417            &crate::db::query::plan::VisibleIndexes<'static>,
418        ) -> Result<T, QueryError>,
419    ) -> Result<T, QueryError>
420    where
421        E: EntityKind<Canister = C>,
422    {
423        let visibility = self.query_plan_visibility_for_store_path(E::Store::PATH)?;
424        let visible_indexes = Self::visible_indexes_for_model(E::MODEL, visibility);
425
426        op(query, &visible_indexes)
427    }
428
429    // Resolve one typed structural query onto the shared lower plan cache so
430    // typed/fluent callers do not each duplicate the entity metadata plumbing.
431    fn cached_structural_plan_for_entity<E>(
432        &self,
433        query: &StructuralQuery,
434    ) -> Result<AccessPlannedQuery, QueryError>
435    where
436        E: EntityKind<Canister = C>,
437    {
438        self.cached_structural_plan_for_authority(
439            crate::db::executor::EntityAuthority::for_type::<E>(),
440            crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
441            query,
442        )
443    }
444
445    // Resolve one cached structural plan for the typed entity and then
446    // project it into one caller-owned wrapper so planned vs compiled session
447    // surfaces do not each duplicate the same cache lookup and entity wiring.
448    fn map_cached_structural_plan_for_entity<E, T>(
449        &self,
450        query: &StructuralQuery,
451        map: impl FnOnce(AccessPlannedQuery) -> T,
452    ) -> Result<T, QueryError>
453    where
454        E: EntityKind<Canister = C>,
455    {
456        let plan = self.cached_structural_plan_for_entity::<E>(query)?;
457
458        Ok(map(plan))
459    }
460
461    pub(in crate::db::session) fn cached_prepared_query_plan_for_entity<E>(
462        &self,
463        query: &StructuralQuery,
464    ) -> Result<(PreparedExecutionPlan<E>, QueryPlanCacheAttribution), QueryError>
465    where
466        E: EntityKind<Canister = C>,
467    {
468        let (entry, attribution) = self.cached_query_plan_entry_for_authority(
469            crate::db::executor::EntityAuthority::for_type::<E>(),
470            crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
471            query,
472        )?;
473
474        Ok((entry.typed_prepared_plan::<E>(), attribution))
475    }
476
477    // Compile one typed query using only the indexes currently visible for the
478    // query's recovered store.
479    pub(in crate::db) fn compile_query_with_visible_indexes<E>(
480        &self,
481        query: &Query<E>,
482    ) -> Result<CompiledQuery<E>, QueryError>
483    where
484        E: EntityKind<Canister = C>,
485    {
486        self.map_cached_structural_plan_for_entity::<E, _>(
487            query.structural(),
488            Query::<E>::compiled_query_from_plan,
489        )
490    }
491
492    // Build one logical planned-query shell using only the indexes currently
493    // visible for the query's recovered store.
494    pub(in crate::db) fn planned_query_with_visible_indexes<E>(
495        &self,
496        query: &Query<E>,
497    ) -> Result<PlannedQuery<E>, QueryError>
498    where
499        E: EntityKind<Canister = C>,
500    {
501        self.map_cached_structural_plan_for_entity::<E, _>(
502            query.structural(),
503            Query::<E>::planned_query_from_plan,
504        )
505    }
506
507    // Project one logical explain payload using only planner-visible indexes.
508    pub(in crate::db) fn explain_query_with_visible_indexes<E>(
509        &self,
510        query: &Query<E>,
511    ) -> Result<ExplainPlan, QueryError>
512    where
513        E: EntityKind<Canister = C>,
514    {
515        self.with_query_visible_indexes(query, Query::<E>::explain_with_visible_indexes)
516    }
517
518    // Hash one typed query plan using only the indexes currently visible for
519    // the query's recovered store.
520    pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
521        &self,
522        query: &Query<E>,
523    ) -> Result<String, QueryError>
524    where
525        E: EntityKind<Canister = C>,
526    {
527        self.with_query_visible_indexes(query, Query::<E>::plan_hash_hex_with_visible_indexes)
528    }
529
530    // Explain one load execution shape using only planner-visible
531    // indexes from the recovered store state.
532    pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
533        &self,
534        query: &Query<E>,
535    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
536    where
537        E: EntityValue + EntityKind<Canister = C>,
538    {
539        self.with_query_visible_indexes(query, Query::<E>::explain_execution_with_visible_indexes)
540    }
541
542    // Render one load execution descriptor plus route diagnostics using
543    // only planner-visible indexes from the recovered store state.
544    pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
545        &self,
546        query: &Query<E>,
547    ) -> Result<String, QueryError>
548    where
549        E: EntityValue + EntityKind<Canister = C>,
550    {
551        self.with_query_visible_indexes(
552            query,
553            Query::<E>::explain_execution_verbose_with_visible_indexes,
554        )
555    }
556
557    // Explain one prepared fluent aggregate terminal using only
558    // planner-visible indexes from the recovered store state.
559    pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
560        &self,
561        query: &Query<E>,
562        strategy: &S,
563    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
564    where
565        E: EntityValue + EntityKind<Canister = C>,
566        S: PreparedFluentAggregateExplainStrategy,
567    {
568        self.with_query_visible_indexes(query, |query, visible_indexes| {
569            query
570                .explain_prepared_aggregate_terminal_with_visible_indexes(visible_indexes, strategy)
571        })
572    }
573
574    // Explain one `bytes_by(field)` terminal using only planner-visible
575    // indexes from the recovered store state.
576    pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
577        &self,
578        query: &Query<E>,
579        target_field: &str,
580    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
581    where
582        E: EntityValue + EntityKind<Canister = C>,
583    {
584        self.with_query_visible_indexes(query, |query, visible_indexes| {
585            query.explain_bytes_by_with_visible_indexes(visible_indexes, target_field)
586        })
587    }
588
589    // Explain one prepared fluent projection terminal using only
590    // planner-visible indexes from the recovered store state.
591    pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
592        &self,
593        query: &Query<E>,
594        strategy: &PreparedFluentProjectionStrategy,
595    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
596    where
597        E: EntityValue + EntityKind<Canister = C>,
598    {
599        self.with_query_visible_indexes(query, |query, visible_indexes| {
600            query.explain_prepared_projection_terminal_with_visible_indexes(
601                visible_indexes,
602                strategy,
603            )
604        })
605    }
606
607    // Validate that one execution strategy is admissible for scalar paged load
608    // execution and fail closed on grouped/primary-key-only routes.
609    fn ensure_scalar_paged_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
610        match family {
611            ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
612                CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
613            )),
614            ExecutionFamily::Ordered => Ok(()),
615            ExecutionFamily::Grouped => Err(QueryError::invariant(
616                "grouped queries execute via execute(), not page().execute()",
617            )),
618        }
619    }
620
621    // Validate that one execution strategy is admissible for the grouped
622    // execution surface.
623    fn ensure_grouped_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
624        match family {
625            ExecutionFamily::Grouped => Ok(()),
626            ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
627                "grouped execution requires grouped logical plans",
628            )),
629        }
630    }
631
632    // Finalize one grouped cursor page into the outward grouped execution
633    // payload so grouped cursor encoding and continuation-shape validation
634    // stay owned by the session boundary.
635    fn finalize_grouped_execution_page(
636        page: GroupedCursorPage,
637        trace: Option<ExecutionTrace>,
638    ) -> Result<PagedGroupedExecutionWithTrace, QueryError> {
639        let next_cursor = page
640            .next_cursor
641            .map(|token| {
642                let Some(token) = token.as_grouped() else {
643                    return Err(QueryError::grouped_paged_emitted_scalar_continuation());
644                };
645
646                token.encode().map_err(|err| {
647                    QueryError::serialize_internal(format!(
648                        "failed to serialize grouped continuation cursor: {err}"
649                    ))
650                })
651            })
652            .transpose()?;
653
654        Ok(PagedGroupedExecutionWithTrace::new(
655            page.rows,
656            next_cursor,
657            trace,
658        ))
659    }
660
661    // Execute one prepared grouped query result and finalize the outward
662    // grouped payload at the session boundary so fluent callers do not
663    // reassemble grouped result wrappers themselves.
664    fn execute_grouped_query_result<E>(
665        &self,
666        query: &Query<E>,
667        cursor_token: Option<&str>,
668    ) -> Result<LoadQueryResult<E>, QueryError>
669    where
670        E: PersistedRow<Canister = C> + EntityValue,
671    {
672        self.execute_grouped(query, cursor_token)
673            .map(LoadQueryResult::grouped)
674    }
675
676    // Execute one prepared grouped query result while returning the same empty
677    // scalar-attribution shell used by the grouped execution path in the
678    // perf-attribution surface.
679    #[cfg(feature = "perf-attribution")]
680    fn execute_grouped_query_result_with_attribution<E>(
681        &self,
682        plan: PreparedExecutionPlan<E>,
683    ) -> Result<(LoadQueryResult<E>, QueryExecutePhaseAttribution, u64), QueryError>
684    where
685        E: PersistedRow<Canister = C> + EntityValue,
686    {
687        let (page, trace, phase_attribution) =
688            self.execute_grouped_plan_with_trace_with_phase_attribution(plan, None)?;
689        let grouped = Self::finalize_grouped_execution_page(page, trace)?;
690
691        Ok((
692            LoadQueryResult::grouped(grouped),
693            Self::grouped_query_execute_phase_attribution(phase_attribution),
694            0,
695        ))
696    }
697
698    // Execute one non-grouped prepared query result while normalizing load vs
699    // delete output into the shared outward load-result contract plus the
700    // scalar execution attribution tuple expected by the perf surface.
701    #[cfg(feature = "perf-attribution")]
702    fn execute_scalar_query_result_with_attribution<E>(
703        &self,
704        mode: QueryMode,
705        plan: PreparedExecutionPlan<E>,
706    ) -> Result<(LoadQueryResult<E>, QueryExecutePhaseAttribution, u64), QueryError>
707    where
708        E: PersistedRow<Canister = C> + EntityValue,
709    {
710        match mode {
711            QueryMode::Load(_) => {
712                let (rows, phase_attribution, response_decode_local_instructions) = self
713                    .load_executor::<E>()
714                    .execute_with_phase_attribution(plan)
715                    .map_err(QueryError::execute)?;
716
717                Ok((
718                    LoadQueryResult::rows(rows),
719                    Self::scalar_query_execute_phase_attribution(phase_attribution),
720                    response_decode_local_instructions,
721                ))
722            }
723            QueryMode::Delete(_) => {
724                let result = self.execute_query_dyn(mode, plan)?;
725
726                Ok((
727                    LoadQueryResult::rows(result),
728                    Self::empty_query_execute_phase_attribution(),
729                    0,
730                ))
731            }
732        }
733    }
734
735    /// Execute one scalar load/delete query and return materialized response rows.
736    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
737    where
738        E: PersistedRow<Canister = C> + EntityValue,
739    {
740        // Phase 1: compile typed intent into one prepared execution-plan contract.
741        let mode = query.mode();
742        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
743
744        // Phase 2: delegate execution to the shared compiled-plan entry path.
745        self.execute_query_dyn(mode, plan)
746    }
747
748    /// Execute one typed query while reporting the compile/execute split at
749    /// the shared fluent query seam.
750    #[cfg(feature = "perf-attribution")]
751    #[doc(hidden)]
752    pub fn execute_query_result_with_attribution<E>(
753        &self,
754        query: &Query<E>,
755    ) -> Result<(LoadQueryResult<E>, QueryExecutionAttribution), QueryError>
756    where
757        E: PersistedRow<Canister = C> + EntityValue,
758    {
759        // Phase 1: measure compile work at the typed/fluent boundary,
760        // including the shared lower query-plan cache lookup/build exactly
761        // once. This preserves honest hit/miss attribution without
762        // double-building plans on one-shot cache misses.
763        let (compile_local_instructions, plan_and_cache) = measure_query_stage(|| {
764            self.cached_prepared_query_plan_for_entity::<E>(query.structural())
765        });
766        let (plan, cache_attribution) = plan_and_cache?;
767
768        // Phase 2: execute one query result using the prepared plan produced
769        // by the compile/cache boundary above.
770        let (execute_local_instructions, result) = measure_query_stage(|| {
771            if query.has_grouping() {
772                self.execute_grouped_query_result_with_attribution(plan)
773            } else {
774                self.execute_scalar_query_result_with_attribution(query.mode(), plan)
775            }
776        });
777        let (result, execute_phase_attribution, response_decode_local_instructions) = result?;
778        let total_local_instructions =
779            compile_local_instructions.saturating_add(execute_local_instructions);
780
781        Ok((
782            result,
783            QueryExecutionAttribution {
784                compile_local_instructions,
785                runtime_local_instructions: execute_phase_attribution.runtime_local_instructions,
786                finalize_local_instructions: execute_phase_attribution.finalize_local_instructions,
787                direct_data_row_scan_local_instructions: execute_phase_attribution
788                    .direct_data_row_scan_local_instructions,
789                direct_data_row_key_stream_local_instructions: execute_phase_attribution
790                    .direct_data_row_key_stream_local_instructions,
791                direct_data_row_row_read_local_instructions: execute_phase_attribution
792                    .direct_data_row_row_read_local_instructions,
793                direct_data_row_key_encode_local_instructions: execute_phase_attribution
794                    .direct_data_row_key_encode_local_instructions,
795                direct_data_row_store_get_local_instructions: execute_phase_attribution
796                    .direct_data_row_store_get_local_instructions,
797                direct_data_row_order_window_local_instructions: execute_phase_attribution
798                    .direct_data_row_order_window_local_instructions,
799                direct_data_row_page_window_local_instructions: execute_phase_attribution
800                    .direct_data_row_page_window_local_instructions,
801                grouped_stream_local_instructions: execute_phase_attribution
802                    .grouped_stream_local_instructions,
803                grouped_fold_local_instructions: execute_phase_attribution
804                    .grouped_fold_local_instructions,
805                grouped_finalize_local_instructions: execute_phase_attribution
806                    .grouped_finalize_local_instructions,
807                grouped_count_borrowed_hash_computations: execute_phase_attribution
808                    .grouped_count_borrowed_hash_computations,
809                grouped_count_bucket_candidate_checks: execute_phase_attribution
810                    .grouped_count_bucket_candidate_checks,
811                grouped_count_existing_group_hits: execute_phase_attribution
812                    .grouped_count_existing_group_hits,
813                grouped_count_new_group_inserts: execute_phase_attribution
814                    .grouped_count_new_group_inserts,
815                response_decode_local_instructions,
816                execute_local_instructions,
817                total_local_instructions,
818                shared_query_plan_cache_hits: cache_attribution.hits,
819                shared_query_plan_cache_misses: cache_attribution.misses,
820            },
821        ))
822    }
823
824    // Execute one typed query through the unified row/grouped result surface so
825    // higher layers do not need to branch on grouped shape themselves.
826    #[doc(hidden)]
827    pub fn execute_query_result<E>(
828        &self,
829        query: &Query<E>,
830    ) -> Result<LoadQueryResult<E>, QueryError>
831    where
832        E: PersistedRow<Canister = C> + EntityValue,
833    {
834        if query.has_grouping() {
835            return self.execute_grouped_query_result(query, None);
836        }
837
838        self.execute_query(query).map(LoadQueryResult::rows)
839    }
840
841    /// Execute one typed delete query and return only the affected-row count.
842    #[doc(hidden)]
843    pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
844    where
845        E: PersistedRow<Canister = C> + EntityValue,
846    {
847        // Phase 1: fail closed if the caller routes a non-delete query here.
848        if !query.mode().is_delete() {
849            return Err(QueryError::unsupported_query(
850                "delete count execution requires delete query mode",
851            ));
852        }
853
854        // Phase 2: compile typed delete intent into one prepared execution-plan contract.
855        let plan = self
856            .compile_query_with_visible_indexes(query)?
857            .into_prepared_execution_plan();
858
859        // Phase 3: execute the shared delete core while skipping response-row materialization.
860        self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
861            .map_err(QueryError::execute)
862    }
863
864    /// Execute one scalar query from one pre-built prepared execution contract.
865    ///
866    /// This is the shared compiled-plan entry boundary used by the typed
867    /// `execute_query(...)` surface and adjacent query execution facades.
868    pub(in crate::db) fn execute_query_dyn<E>(
869        &self,
870        mode: QueryMode,
871        plan: PreparedExecutionPlan<E>,
872    ) -> Result<EntityResponse<E>, QueryError>
873    where
874        E: PersistedRow<Canister = C> + EntityValue,
875    {
876        let result = match mode {
877            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
878            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
879        };
880
881        result.map_err(QueryError::execute)
882    }
883
884    // Shared load-query terminal wrapper: build plan, run under metrics, map
885    // execution errors into query-facing errors.
886    pub(in crate::db) fn execute_load_query_with<E, T>(
887        &self,
888        query: &Query<E>,
889        op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
890    ) -> Result<T, QueryError>
891    where
892        E: PersistedRow<Canister = C> + EntityValue,
893    {
894        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
895
896        self.with_metrics(|| op(self.load_executor::<E>(), plan))
897            .map_err(QueryError::execute)
898    }
899
900    /// Build one trace payload for a query without executing it.
901    ///
902    /// This lightweight surface is intended for developer diagnostics:
903    /// plan hash, access strategy summary, and planner/executor route shape.
904    pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
905    where
906        E: EntityKind<Canister = C>,
907    {
908        let compiled = self.compile_query_with_visible_indexes(query)?;
909        let explain = compiled.explain();
910        let plan_hash = compiled.plan_hash_hex();
911
912        let (executable, _) =
913            self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
914        let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
915        let execution_family = match query.mode() {
916            QueryMode::Load(_) => Some(executable.execution_family().map_err(QueryError::execute)?),
917            QueryMode::Delete(_) => None,
918        };
919
920        Ok(QueryTracePlan::new(
921            plan_hash,
922            access_strategy,
923            execution_family,
924            explain,
925        ))
926    }
927
928    /// Execute one scalar paged load query and return optional continuation cursor plus trace.
929    pub(crate) fn execute_load_query_paged_with_trace<E>(
930        &self,
931        query: &Query<E>,
932        cursor_token: Option<&str>,
933    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
934    where
935        E: PersistedRow<Canister = C> + EntityValue,
936    {
937        // Phase 1: build/validate prepared execution plan and reject grouped plans.
938        let plan = self
939            .cached_prepared_query_plan_for_entity::<E>(query.structural())?
940            .0;
941        Self::ensure_scalar_paged_execution_family(
942            plan.execution_family().map_err(QueryError::execute)?,
943        )?;
944
945        // Phase 2: decode external cursor token and validate it against plan surface.
946        let cursor_bytes = decode_optional_cursor_token(cursor_token)
947            .map_err(QueryError::from_cursor_plan_error)?;
948        let cursor = plan
949            .prepare_cursor(cursor_bytes.as_deref())
950            .map_err(QueryError::from_executor_plan_error)?;
951
952        // Phase 3: execute one traced page and encode outbound continuation token.
953        let (page, trace) = self
954            .with_metrics(|| {
955                self.load_executor::<E>()
956                    .execute_paged_with_cursor_traced(plan, cursor)
957            })
958            .map_err(QueryError::execute)?;
959        let next_cursor = page
960            .next_cursor
961            .map(|token| {
962                let Some(token) = token.as_scalar() else {
963                    return Err(QueryError::scalar_paged_emitted_grouped_continuation());
964                };
965
966                token.encode().map_err(|err| {
967                    QueryError::serialize_internal(format!(
968                        "failed to serialize continuation cursor: {err}"
969                    ))
970                })
971            })
972            .transpose()?;
973
974        Ok(PagedLoadExecutionWithTrace::new(
975            page.items,
976            next_cursor,
977            trace,
978        ))
979    }
980
981    /// Execute one grouped query page with optional grouped continuation cursor.
982    ///
983    /// This is the explicit grouped execution boundary; scalar load APIs reject
984    /// grouped plans to preserve scalar response contracts.
985    pub(in crate::db) fn execute_grouped<E>(
986        &self,
987        query: &Query<E>,
988        cursor_token: Option<&str>,
989    ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
990    where
991        E: PersistedRow<Canister = C> + EntityValue,
992    {
993        let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
994        Self::finalize_grouped_execution_page(page, trace)
995    }
996
997    // Execute the canonical grouped query core and return the raw grouped page
998    // plus optional execution trace before outward cursor formatting.
999    fn execute_grouped_page_with_trace<E>(
1000        &self,
1001        query: &Query<E>,
1002        cursor_token: Option<&str>,
1003    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
1004    where
1005        E: PersistedRow<Canister = C> + EntityValue,
1006    {
1007        // Phase 1: build the prepared execution plan once from the typed query.
1008        let plan = self
1009            .cached_prepared_query_plan_for_entity::<E>(query.structural())?
1010            .0;
1011
1012        // Phase 2: reuse the shared prepared grouped execution path.
1013        self.execute_grouped_plan_with_trace(plan, cursor_token)
1014    }
1015
1016    // Execute one grouped prepared plan page with optional grouped cursor.
1017    fn execute_grouped_plan_with_trace<E>(
1018        &self,
1019        plan: PreparedExecutionPlan<E>,
1020        cursor_token: Option<&str>,
1021    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
1022    where
1023        E: PersistedRow<Canister = C> + EntityValue,
1024    {
1025        // Phase 1: validate the prepared plan shape before decoding cursors.
1026        Self::ensure_grouped_execution_family(
1027            plan.execution_family().map_err(QueryError::execute)?,
1028        )?;
1029
1030        // Phase 2: decode external grouped cursor token and validate against plan.
1031        let cursor = decode_optional_grouped_cursor_token(cursor_token)
1032            .map_err(QueryError::from_cursor_plan_error)?;
1033        let cursor = plan
1034            .prepare_grouped_cursor_token(cursor)
1035            .map_err(QueryError::from_executor_plan_error)?;
1036
1037        // Phase 3: execute one grouped page while preserving the structural
1038        // grouped cursor payload for whichever outward cursor format the caller needs.
1039        self.with_metrics(|| {
1040            self.load_executor::<E>()
1041                .execute_grouped_paged_with_cursor_traced(plan, cursor)
1042        })
1043        .map_err(QueryError::execute)
1044    }
1045
1046    #[cfg(feature = "perf-attribution")]
1047    fn execute_grouped_plan_with_trace_with_phase_attribution<E>(
1048        &self,
1049        plan: PreparedExecutionPlan<E>,
1050        cursor_token: Option<&str>,
1051    ) -> Result<
1052        (
1053            GroupedCursorPage,
1054            Option<ExecutionTrace>,
1055            GroupedExecutePhaseAttribution,
1056        ),
1057        QueryError,
1058    >
1059    where
1060        E: PersistedRow<Canister = C> + EntityValue,
1061    {
1062        Self::ensure_grouped_execution_family(
1063            plan.execution_family().map_err(QueryError::execute)?,
1064        )?;
1065
1066        let cursor = decode_optional_grouped_cursor_token(cursor_token)
1067            .map_err(QueryError::from_cursor_plan_error)?;
1068        let cursor = plan
1069            .prepare_grouped_cursor_token(cursor)
1070            .map_err(QueryError::from_executor_plan_error)?;
1071
1072        self.with_metrics(|| {
1073            self.load_executor::<E>()
1074                .execute_grouped_paged_with_cursor_traced_with_phase_attribution(plan, cursor)
1075        })
1076        .map_err(QueryError::execute)
1077    }
1078}
1079
1080impl QueryPlanCacheKey {
1081    fn for_authority_with_normalized_predicate(
1082        authority: crate::db::executor::EntityAuthority,
1083        schema_fingerprint: CommitSchemaFingerprint,
1084        visibility: QueryPlanVisibility,
1085        query: &StructuralQuery,
1086        normalized_predicate: Option<&crate::db::predicate::Predicate>,
1087    ) -> Self {
1088        Self::for_authority_with_normalized_predicate_and_method_version(
1089            authority,
1090            schema_fingerprint,
1091            visibility,
1092            query,
1093            normalized_predicate,
1094            SHARED_QUERY_PLAN_CACHE_METHOD_VERSION,
1095        )
1096    }
1097
1098    // Assemble the canonical cache-key shell once so the test and
1099    // normalized-predicate constructors only decide which structural query key
1100    // they feed into the shared session cache identity.
1101    const fn from_authority_parts(
1102        authority: crate::db::executor::EntityAuthority,
1103        schema_fingerprint: CommitSchemaFingerprint,
1104        visibility: QueryPlanVisibility,
1105        structural_query: crate::db::query::intent::StructuralQueryCacheKey,
1106        cache_method_version: u8,
1107    ) -> Self {
1108        Self {
1109            cache_method_version,
1110            entity_path: authority.entity_path(),
1111            schema_fingerprint,
1112            visibility,
1113            structural_query,
1114        }
1115    }
1116
1117    #[cfg(test)]
1118    fn for_authority_with_method_version(
1119        authority: crate::db::executor::EntityAuthority,
1120        schema_fingerprint: CommitSchemaFingerprint,
1121        visibility: QueryPlanVisibility,
1122        query: &StructuralQuery,
1123        cache_method_version: u8,
1124    ) -> Self {
1125        Self::from_authority_parts(
1126            authority,
1127            schema_fingerprint,
1128            visibility,
1129            query.structural_cache_key(),
1130            cache_method_version,
1131        )
1132    }
1133
1134    fn for_authority_with_normalized_predicate_and_method_version(
1135        authority: crate::db::executor::EntityAuthority,
1136        schema_fingerprint: CommitSchemaFingerprint,
1137        visibility: QueryPlanVisibility,
1138        query: &StructuralQuery,
1139        normalized_predicate: Option<&crate::db::predicate::Predicate>,
1140        cache_method_version: u8,
1141    ) -> Self {
1142        Self::from_authority_parts(
1143            authority,
1144            schema_fingerprint,
1145            visibility,
1146            query.structural_cache_key_with_normalized_predicate(normalized_predicate),
1147            cache_method_version,
1148        )
1149    }
1150}