Skip to main content

icydb_core/db/session/
query.rs

1//! Module: db::session::query
2//! Responsibility: session-bound query planning, explain, and cursor execution
3//! helpers that recover store visibility before delegating to query-owned logic.
4//! Does not own: query intent construction or executor runtime semantics.
5//! Boundary: resolves session visibility and cursor policy before handing work to the planner/executor.
6
7#[cfg(feature = "diagnostics")]
8use crate::db::executor::{
9    GroupedCountAttribution, GroupedExecutePhaseAttribution, ScalarExecutePhaseAttribution,
10};
11use crate::{
12    db::{
13        DbSession, EntityResponse, LoadQueryResult, PagedGroupedExecutionWithTrace,
14        PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
15        access::AccessStrategy,
16        commit::CommitSchemaFingerprint,
17        cursor::{
18            CursorPlanError, decode_optional_cursor_token, decode_optional_grouped_cursor_token,
19        },
20        diagnostics::ExecutionTrace,
21        executor::{
22            ExecutionFamily, GroupedCursorPage, LoadExecutor, PreparedExecutionPlan,
23            SharedPreparedExecutionPlan,
24        },
25        query::builder::{
26            PreparedFluentAggregateExplainStrategy, PreparedFluentProjectionStrategy,
27        },
28        query::explain::{
29            ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
30        },
31        query::{
32            intent::{CompiledQuery, PlannedQuery, StructuralQuery},
33            plan::{AccessPlannedQuery, QueryMode, VisibleIndexes},
34        },
35    },
36    error::InternalError,
37    model::entity::EntityModel,
38    traits::{CanisterKind, EntityKind, EntityValue, Path},
39};
40#[cfg(feature = "diagnostics")]
41use candid::CandidType;
42use icydb_utils::Xxh3;
43#[cfg(feature = "diagnostics")]
44use serde::Deserialize;
45use std::{cell::RefCell, collections::HashMap, hash::BuildHasherDefault};
46
47type CacheBuildHasher = BuildHasherDefault<Xxh3>;
48
49// Bump this when the shared lower query-plan cache key meaning changes in a
50// way that must force old in-heap entries to miss instead of aliasing.
51const SHARED_QUERY_PLAN_CACHE_METHOD_VERSION: u8 = 1;
52
53#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
54pub(in crate::db) enum QueryPlanVisibility {
55    StoreNotReady,
56    StoreReady,
57}
58
59#[derive(Clone, Debug, Eq, Hash, PartialEq)]
60pub(in crate::db) struct QueryPlanCacheKey {
61    cache_method_version: u8,
62    entity_path: &'static str,
63    schema_fingerprint: CommitSchemaFingerprint,
64    visibility: QueryPlanVisibility,
65    structural_query: crate::db::query::intent::StructuralQueryCacheKey,
66}
67
68#[derive(Clone, Debug)]
69pub(in crate::db) struct QueryPlanCacheEntry {
70    logical_plan: AccessPlannedQuery,
71    prepared_plan: SharedPreparedExecutionPlan,
72}
73
74impl QueryPlanCacheEntry {
75    #[must_use]
76    pub(in crate::db) const fn new(
77        logical_plan: AccessPlannedQuery,
78        prepared_plan: SharedPreparedExecutionPlan,
79    ) -> Self {
80        Self {
81            logical_plan,
82            prepared_plan,
83        }
84    }
85
86    #[must_use]
87    pub(in crate::db) const fn logical_plan(&self) -> &AccessPlannedQuery {
88        &self.logical_plan
89    }
90
91    #[must_use]
92    pub(in crate::db) fn typed_prepared_plan<E: EntityKind>(&self) -> PreparedExecutionPlan<E> {
93        self.prepared_plan.typed_clone::<E>()
94    }
95
96    #[must_use]
97    pub(in crate::db) const fn prepared_plan(&self) -> &SharedPreparedExecutionPlan {
98        &self.prepared_plan
99    }
100}
101
102pub(in crate::db) type QueryPlanCache =
103    HashMap<QueryPlanCacheKey, QueryPlanCacheEntry, CacheBuildHasher>;
104
105thread_local! {
106    // Keep one in-heap query-plan cache per store registry so fresh `DbSession`
107    // facades can share prepared logical plans across update/query calls while
108    // tests and multi-registry host processes remain isolated by registry
109    // identity.
110    static QUERY_PLAN_CACHES: RefCell<HashMap<usize, QueryPlanCache, CacheBuildHasher>> =
111        RefCell::new(HashMap::default());
112}
113
114#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
115pub(in crate::db) struct QueryPlanCacheAttribution {
116    pub hits: u64,
117    pub misses: u64,
118}
119
120impl QueryPlanCacheAttribution {
121    #[must_use]
122    const fn hit() -> Self {
123        Self { hits: 1, misses: 0 }
124    }
125
126    #[must_use]
127    const fn miss() -> Self {
128        Self { hits: 0, misses: 1 }
129    }
130}
131
132///
133/// QueryExecutionAttribution
134///
135/// QueryExecutionAttribution records the top-level compile/execute split for
136/// typed/fluent query execution at the session boundary.
137///
138#[cfg(feature = "diagnostics")]
139#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
140pub struct QueryExecutionAttribution {
141    pub compile_local_instructions: u64,
142    pub runtime_local_instructions: u64,
143    pub finalize_local_instructions: u64,
144    pub direct_data_row_scan_local_instructions: u64,
145    pub direct_data_row_key_stream_local_instructions: u64,
146    pub direct_data_row_row_read_local_instructions: u64,
147    pub direct_data_row_key_encode_local_instructions: u64,
148    pub direct_data_row_store_get_local_instructions: u64,
149    pub direct_data_row_order_window_local_instructions: u64,
150    pub direct_data_row_page_window_local_instructions: u64,
151    pub grouped_stream_local_instructions: u64,
152    pub grouped_fold_local_instructions: u64,
153    pub grouped_finalize_local_instructions: u64,
154    pub grouped_count_borrowed_hash_computations: u64,
155    pub grouped_count_bucket_candidate_checks: u64,
156    pub grouped_count_existing_group_hits: u64,
157    pub grouped_count_new_group_inserts: u64,
158    pub grouped_count_row_materialization_local_instructions: u64,
159    pub grouped_count_group_lookup_local_instructions: u64,
160    pub grouped_count_existing_group_update_local_instructions: u64,
161    pub grouped_count_new_group_insert_local_instructions: u64,
162    pub response_decode_local_instructions: u64,
163    pub execute_local_instructions: u64,
164    pub total_local_instructions: u64,
165    pub shared_query_plan_cache_hits: u64,
166    pub shared_query_plan_cache_misses: u64,
167}
168
169#[cfg(feature = "diagnostics")]
170#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
171struct QueryExecutePhaseAttribution {
172    runtime_local_instructions: u64,
173    finalize_local_instructions: u64,
174    direct_data_row_scan_local_instructions: u64,
175    direct_data_row_key_stream_local_instructions: u64,
176    direct_data_row_row_read_local_instructions: u64,
177    direct_data_row_key_encode_local_instructions: u64,
178    direct_data_row_store_get_local_instructions: u64,
179    direct_data_row_order_window_local_instructions: u64,
180    direct_data_row_page_window_local_instructions: u64,
181    grouped_stream_local_instructions: u64,
182    grouped_fold_local_instructions: u64,
183    grouped_finalize_local_instructions: u64,
184    grouped_count: GroupedCountAttribution,
185}
186
187#[cfg(feature = "diagnostics")]
188#[expect(
189    clippy::missing_const_for_fn,
190    reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
191)]
192fn read_query_local_instruction_counter() -> u64 {
193    #[cfg(target_arch = "wasm32")]
194    {
195        canic_cdk::api::performance_counter(1)
196    }
197
198    #[cfg(not(target_arch = "wasm32"))]
199    {
200        0
201    }
202}
203
204#[cfg(feature = "diagnostics")]
205fn measure_query_stage<T, E>(run: impl FnOnce() -> Result<T, E>) -> (u64, Result<T, E>) {
206    let start = read_query_local_instruction_counter();
207    let result = run();
208    let delta = read_query_local_instruction_counter().saturating_sub(start);
209
210    (delta, result)
211}
212
213impl<C: CanisterKind> DbSession<C> {
214    #[cfg(feature = "diagnostics")]
215    const fn empty_query_execute_phase_attribution() -> QueryExecutePhaseAttribution {
216        QueryExecutePhaseAttribution {
217            runtime_local_instructions: 0,
218            finalize_local_instructions: 0,
219            direct_data_row_scan_local_instructions: 0,
220            direct_data_row_key_stream_local_instructions: 0,
221            direct_data_row_row_read_local_instructions: 0,
222            direct_data_row_key_encode_local_instructions: 0,
223            direct_data_row_store_get_local_instructions: 0,
224            direct_data_row_order_window_local_instructions: 0,
225            direct_data_row_page_window_local_instructions: 0,
226            grouped_stream_local_instructions: 0,
227            grouped_fold_local_instructions: 0,
228            grouped_finalize_local_instructions: 0,
229            grouped_count: GroupedCountAttribution::none(),
230        }
231    }
232
233    #[cfg(feature = "diagnostics")]
234    const fn scalar_query_execute_phase_attribution(
235        phase: ScalarExecutePhaseAttribution,
236    ) -> QueryExecutePhaseAttribution {
237        QueryExecutePhaseAttribution {
238            runtime_local_instructions: phase.runtime_local_instructions,
239            finalize_local_instructions: phase.finalize_local_instructions,
240            direct_data_row_scan_local_instructions: phase.direct_data_row_scan_local_instructions,
241            direct_data_row_key_stream_local_instructions: phase
242                .direct_data_row_key_stream_local_instructions,
243            direct_data_row_row_read_local_instructions: phase
244                .direct_data_row_row_read_local_instructions,
245            direct_data_row_key_encode_local_instructions: phase
246                .direct_data_row_key_encode_local_instructions,
247            direct_data_row_store_get_local_instructions: phase
248                .direct_data_row_store_get_local_instructions,
249            direct_data_row_order_window_local_instructions: phase
250                .direct_data_row_order_window_local_instructions,
251            direct_data_row_page_window_local_instructions: phase
252                .direct_data_row_page_window_local_instructions,
253            grouped_stream_local_instructions: 0,
254            grouped_fold_local_instructions: 0,
255            grouped_finalize_local_instructions: 0,
256            grouped_count: GroupedCountAttribution::none(),
257        }
258    }
259
260    #[cfg(feature = "diagnostics")]
261    const fn grouped_query_execute_phase_attribution(
262        phase: GroupedExecutePhaseAttribution,
263    ) -> QueryExecutePhaseAttribution {
264        QueryExecutePhaseAttribution {
265            runtime_local_instructions: phase
266                .stream_local_instructions
267                .saturating_add(phase.fold_local_instructions),
268            finalize_local_instructions: phase.finalize_local_instructions,
269            direct_data_row_scan_local_instructions: 0,
270            direct_data_row_key_stream_local_instructions: 0,
271            direct_data_row_row_read_local_instructions: 0,
272            direct_data_row_key_encode_local_instructions: 0,
273            direct_data_row_store_get_local_instructions: 0,
274            direct_data_row_order_window_local_instructions: 0,
275            direct_data_row_page_window_local_instructions: 0,
276            grouped_stream_local_instructions: phase.stream_local_instructions,
277            grouped_fold_local_instructions: phase.fold_local_instructions,
278            grouped_finalize_local_instructions: phase.finalize_local_instructions,
279            grouped_count: phase.grouped_count,
280        }
281    }
282
283    fn query_plan_cache_scope_id(&self) -> usize {
284        self.db.cache_scope_id()
285    }
286
287    fn with_query_plan_cache<R>(&self, f: impl FnOnce(&mut QueryPlanCache) -> R) -> R {
288        let scope_id = self.query_plan_cache_scope_id();
289
290        QUERY_PLAN_CACHES.with(|caches| {
291            let mut caches = caches.borrow_mut();
292            let cache = caches.entry(scope_id).or_default();
293
294            f(cache)
295        })
296    }
297
298    const fn visible_indexes_for_model(
299        model: &'static EntityModel,
300        visibility: QueryPlanVisibility,
301    ) -> VisibleIndexes<'static> {
302        match visibility {
303            QueryPlanVisibility::StoreReady => VisibleIndexes::planner_visible(model.indexes()),
304            QueryPlanVisibility::StoreNotReady => VisibleIndexes::none(),
305        }
306    }
307
308    #[cfg(test)]
309    pub(in crate::db) fn query_plan_cache_len(&self) -> usize {
310        self.with_query_plan_cache(|cache| cache.len())
311    }
312
313    #[cfg(test)]
314    pub(in crate::db) fn clear_query_plan_cache_for_tests(&self) {
315        self.with_query_plan_cache(QueryPlanCache::clear);
316    }
317
318    pub(in crate::db) fn query_plan_visibility_for_store_path(
319        &self,
320        store_path: &'static str,
321    ) -> Result<QueryPlanVisibility, QueryError> {
322        let store = self
323            .db
324            .recovered_store(store_path)
325            .map_err(QueryError::execute)?;
326        let visibility = if store.index_state() == crate::db::IndexState::Ready {
327            QueryPlanVisibility::StoreReady
328        } else {
329            QueryPlanVisibility::StoreNotReady
330        };
331
332        Ok(visibility)
333    }
334
335    pub(in crate::db) fn cached_query_plan_entry_for_authority(
336        &self,
337        authority: crate::db::executor::EntityAuthority,
338        schema_fingerprint: CommitSchemaFingerprint,
339        query: &StructuralQuery,
340    ) -> Result<(QueryPlanCacheEntry, QueryPlanCacheAttribution), QueryError> {
341        let visibility = self.query_plan_visibility_for_store_path(authority.store_path())?;
342        let visible_indexes = Self::visible_indexes_for_model(authority.model(), visibility);
343        let normalized_predicate = query.prepare_normalized_scalar_predicate()?;
344        let cache_key = QueryPlanCacheKey::for_authority_with_normalized_predicate(
345            authority,
346            schema_fingerprint,
347            visibility,
348            query,
349            normalized_predicate.as_ref(),
350        );
351
352        {
353            let cached = self.with_query_plan_cache(|cache| cache.get(&cache_key).cloned());
354            if let Some(entry) = cached {
355                return Ok((entry, QueryPlanCacheAttribution::hit()));
356            }
357        }
358
359        let plan = query.build_plan_with_visible_indexes_from_normalized_predicate(
360            &visible_indexes,
361            normalized_predicate,
362        )?;
363        let entry = QueryPlanCacheEntry::new(
364            plan.clone(),
365            SharedPreparedExecutionPlan::from_plan(authority, plan),
366        );
367        self.with_query_plan_cache(|cache| {
368            cache.insert(cache_key, entry.clone());
369        });
370
371        Ok((entry, QueryPlanCacheAttribution::miss()))
372    }
373
374    #[cfg(test)]
375    pub(in crate::db) fn query_plan_cache_key_for_tests(
376        authority: crate::db::executor::EntityAuthority,
377        schema_fingerprint: CommitSchemaFingerprint,
378        visibility: QueryPlanVisibility,
379        query: &StructuralQuery,
380        cache_method_version: u8,
381    ) -> QueryPlanCacheKey {
382        QueryPlanCacheKey::for_authority_with_method_version(
383            authority,
384            schema_fingerprint,
385            visibility,
386            query,
387            cache_method_version,
388        )
389    }
390
391    pub(in crate::db) fn cached_structural_plan_for_authority(
392        &self,
393        authority: crate::db::executor::EntityAuthority,
394        schema_fingerprint: CommitSchemaFingerprint,
395        query: &StructuralQuery,
396    ) -> Result<AccessPlannedQuery, QueryError> {
397        let (entry, _) =
398            self.cached_query_plan_entry_for_authority(authority, schema_fingerprint, query)?;
399
400        Ok(entry.logical_plan().clone())
401    }
402
403    // Resolve the planner-visible index slice for one typed query exactly once
404    // at the session boundary before handing execution/planning off to query-owned logic.
405    fn with_query_visible_indexes<E, T>(
406        &self,
407        query: &Query<E>,
408        op: impl FnOnce(
409            &Query<E>,
410            &crate::db::query::plan::VisibleIndexes<'static>,
411        ) -> Result<T, QueryError>,
412    ) -> Result<T, QueryError>
413    where
414        E: EntityKind<Canister = C>,
415    {
416        let visibility = self.query_plan_visibility_for_store_path(E::Store::PATH)?;
417        let visible_indexes = Self::visible_indexes_for_model(E::MODEL, visibility);
418
419        op(query, &visible_indexes)
420    }
421
422    // Resolve one typed structural query onto the shared lower plan cache so
423    // typed/fluent callers do not each duplicate the entity metadata plumbing.
424    fn cached_structural_plan_for_entity<E>(
425        &self,
426        query: &StructuralQuery,
427    ) -> Result<AccessPlannedQuery, QueryError>
428    where
429        E: EntityKind<Canister = C>,
430    {
431        self.cached_structural_plan_for_authority(
432            crate::db::executor::EntityAuthority::for_type::<E>(),
433            crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
434            query,
435        )
436    }
437
438    // Resolve one cached structural plan for the typed entity and then
439    // project it into one caller-owned wrapper so planned vs compiled session
440    // surfaces do not each duplicate the same cache lookup and entity wiring.
441    fn map_cached_structural_plan_for_entity<E, T>(
442        &self,
443        query: &StructuralQuery,
444        map: impl FnOnce(AccessPlannedQuery) -> T,
445    ) -> Result<T, QueryError>
446    where
447        E: EntityKind<Canister = C>,
448    {
449        let plan = self.cached_structural_plan_for_entity::<E>(query)?;
450
451        Ok(map(plan))
452    }
453
454    pub(in crate::db::session) fn cached_prepared_query_plan_for_entity<E>(
455        &self,
456        query: &StructuralQuery,
457    ) -> Result<(PreparedExecutionPlan<E>, QueryPlanCacheAttribution), QueryError>
458    where
459        E: EntityKind<Canister = C>,
460    {
461        let (entry, attribution) = self.cached_query_plan_entry_for_authority(
462            crate::db::executor::EntityAuthority::for_type::<E>(),
463            crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
464            query,
465        )?;
466
467        Ok((entry.typed_prepared_plan::<E>(), attribution))
468    }
469
470    // Compile one typed query using only the indexes currently visible for the
471    // query's recovered store.
472    pub(in crate::db) fn compile_query_with_visible_indexes<E>(
473        &self,
474        query: &Query<E>,
475    ) -> Result<CompiledQuery<E>, QueryError>
476    where
477        E: EntityKind<Canister = C>,
478    {
479        self.map_cached_structural_plan_for_entity::<E, _>(
480            query.structural(),
481            Query::<E>::compiled_query_from_plan,
482        )
483    }
484
485    // Build one logical planned-query shell using only the indexes currently
486    // visible for the query's recovered store.
487    pub(in crate::db) fn planned_query_with_visible_indexes<E>(
488        &self,
489        query: &Query<E>,
490    ) -> Result<PlannedQuery<E>, QueryError>
491    where
492        E: EntityKind<Canister = C>,
493    {
494        self.map_cached_structural_plan_for_entity::<E, _>(
495            query.structural(),
496            Query::<E>::planned_query_from_plan,
497        )
498    }
499
500    // Project one logical explain payload using only planner-visible indexes.
501    pub(in crate::db) fn explain_query_with_visible_indexes<E>(
502        &self,
503        query: &Query<E>,
504    ) -> Result<ExplainPlan, QueryError>
505    where
506        E: EntityKind<Canister = C>,
507    {
508        self.with_query_visible_indexes(query, Query::<E>::explain_with_visible_indexes)
509    }
510
511    // Hash one typed query plan using only the indexes currently visible for
512    // the query's recovered store.
513    pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
514        &self,
515        query: &Query<E>,
516    ) -> Result<String, QueryError>
517    where
518        E: EntityKind<Canister = C>,
519    {
520        self.with_query_visible_indexes(query, Query::<E>::plan_hash_hex_with_visible_indexes)
521    }
522
523    // Explain one load execution shape using only planner-visible
524    // indexes from the recovered store state.
525    pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
526        &self,
527        query: &Query<E>,
528    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
529    where
530        E: EntityValue + EntityKind<Canister = C>,
531    {
532        self.with_query_visible_indexes(query, Query::<E>::explain_execution_with_visible_indexes)
533    }
534
535    // Render one load execution descriptor plus route diagnostics using
536    // only planner-visible indexes from the recovered store state.
537    pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
538        &self,
539        query: &Query<E>,
540    ) -> Result<String, QueryError>
541    where
542        E: EntityValue + EntityKind<Canister = C>,
543    {
544        self.with_query_visible_indexes(
545            query,
546            Query::<E>::explain_execution_verbose_with_visible_indexes,
547        )
548    }
549
550    // Explain one prepared fluent aggregate terminal using only
551    // planner-visible indexes from the recovered store state.
552    pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
553        &self,
554        query: &Query<E>,
555        strategy: &S,
556    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
557    where
558        E: EntityValue + EntityKind<Canister = C>,
559        S: PreparedFluentAggregateExplainStrategy,
560    {
561        self.with_query_visible_indexes(query, |query, visible_indexes| {
562            query
563                .explain_prepared_aggregate_terminal_with_visible_indexes(visible_indexes, strategy)
564        })
565    }
566
567    // Explain one `bytes_by(field)` terminal using only planner-visible
568    // indexes from the recovered store state.
569    pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
570        &self,
571        query: &Query<E>,
572        target_field: &str,
573    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
574    where
575        E: EntityValue + EntityKind<Canister = C>,
576    {
577        self.with_query_visible_indexes(query, |query, visible_indexes| {
578            query.explain_bytes_by_with_visible_indexes(visible_indexes, target_field)
579        })
580    }
581
582    // Explain one prepared fluent projection terminal using only
583    // planner-visible indexes from the recovered store state.
584    pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
585        &self,
586        query: &Query<E>,
587        strategy: &PreparedFluentProjectionStrategy,
588    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
589    where
590        E: EntityValue + EntityKind<Canister = C>,
591    {
592        self.with_query_visible_indexes(query, |query, visible_indexes| {
593            query.explain_prepared_projection_terminal_with_visible_indexes(
594                visible_indexes,
595                strategy,
596            )
597        })
598    }
599
600    // Validate that one execution strategy is admissible for scalar paged load
601    // execution and fail closed on grouped/primary-key-only routes.
602    fn ensure_scalar_paged_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
603        match family {
604            ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
605                CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
606            )),
607            ExecutionFamily::Ordered => Ok(()),
608            ExecutionFamily::Grouped => Err(QueryError::invariant(
609                "grouped queries execute via execute(), not page().execute()",
610            )),
611        }
612    }
613
614    // Validate that one execution strategy is admissible for the grouped
615    // execution surface.
616    fn ensure_grouped_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
617        match family {
618            ExecutionFamily::Grouped => Ok(()),
619            ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
620                "grouped execution requires grouped logical plans",
621            )),
622        }
623    }
624
625    // Finalize one grouped cursor page into the outward grouped execution
626    // payload so grouped cursor encoding and continuation-shape validation
627    // stay owned by the session boundary.
628    fn finalize_grouped_execution_page(
629        page: GroupedCursorPage,
630        trace: Option<ExecutionTrace>,
631    ) -> Result<PagedGroupedExecutionWithTrace, QueryError> {
632        let next_cursor = page
633            .next_cursor
634            .map(|token| {
635                let Some(token) = token.as_grouped() else {
636                    return Err(QueryError::grouped_paged_emitted_scalar_continuation());
637                };
638
639                token.encode().map_err(|err| {
640                    QueryError::serialize_internal(format!(
641                        "failed to serialize grouped continuation cursor: {err}"
642                    ))
643                })
644            })
645            .transpose()?;
646
647        Ok(PagedGroupedExecutionWithTrace::new(
648            page.rows,
649            next_cursor,
650            trace,
651        ))
652    }
653
654    // Execute one prepared grouped query result and finalize the outward
655    // grouped payload at the session boundary so fluent callers do not
656    // reassemble grouped result wrappers themselves.
657    fn execute_grouped_query_result<E>(
658        &self,
659        query: &Query<E>,
660        cursor_token: Option<&str>,
661    ) -> Result<LoadQueryResult<E>, QueryError>
662    where
663        E: PersistedRow<Canister = C> + EntityValue,
664    {
665        self.execute_grouped(query, cursor_token)
666            .map(LoadQueryResult::grouped)
667    }
668
669    // Execute one prepared grouped query result while returning the same empty
670    // scalar-attribution shell used by the grouped execution path in the
671    // diagnostics surface.
672    #[cfg(feature = "diagnostics")]
673    fn execute_grouped_query_result_with_attribution<E>(
674        &self,
675        plan: PreparedExecutionPlan<E>,
676    ) -> Result<(LoadQueryResult<E>, QueryExecutePhaseAttribution, u64), QueryError>
677    where
678        E: PersistedRow<Canister = C> + EntityValue,
679    {
680        let (page, trace, phase_attribution) =
681            self.execute_grouped_plan_with_trace_with_phase_attribution(plan, None)?;
682        let grouped = Self::finalize_grouped_execution_page(page, trace)?;
683
684        Ok((
685            LoadQueryResult::grouped(grouped),
686            Self::grouped_query_execute_phase_attribution(phase_attribution),
687            0,
688        ))
689    }
690
691    // Execute one non-grouped prepared query result while normalizing load vs
692    // delete output into the shared outward load-result contract plus the
693    // scalar execution attribution tuple expected by the perf surface.
694    #[cfg(feature = "diagnostics")]
695    fn execute_scalar_query_result_with_attribution<E>(
696        &self,
697        mode: QueryMode,
698        plan: PreparedExecutionPlan<E>,
699    ) -> Result<(LoadQueryResult<E>, QueryExecutePhaseAttribution, u64), QueryError>
700    where
701        E: PersistedRow<Canister = C> + EntityValue,
702    {
703        match mode {
704            QueryMode::Load(_) => {
705                let (rows, phase_attribution, response_decode_local_instructions) = self
706                    .load_executor::<E>()
707                    .execute_with_phase_attribution(plan)
708                    .map_err(QueryError::execute)?;
709
710                Ok((
711                    LoadQueryResult::rows(rows),
712                    Self::scalar_query_execute_phase_attribution(phase_attribution),
713                    response_decode_local_instructions,
714                ))
715            }
716            QueryMode::Delete(_) => {
717                let result = self.execute_query_dyn(mode, plan)?;
718
719                Ok((
720                    LoadQueryResult::rows(result),
721                    Self::empty_query_execute_phase_attribution(),
722                    0,
723                ))
724            }
725        }
726    }
727
728    /// Execute one scalar load/delete query and return materialized response rows.
729    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
730    where
731        E: PersistedRow<Canister = C> + EntityValue,
732    {
733        // Phase 1: compile typed intent into one prepared execution-plan contract.
734        let mode = query.mode();
735        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
736
737        // Phase 2: delegate execution to the shared compiled-plan entry path.
738        self.execute_query_dyn(mode, plan)
739    }
740
741    /// Execute one typed query while reporting the compile/execute split at
742    /// the shared fluent query seam.
743    #[cfg(feature = "diagnostics")]
744    #[doc(hidden)]
745    pub fn execute_query_result_with_attribution<E>(
746        &self,
747        query: &Query<E>,
748    ) -> Result<(LoadQueryResult<E>, QueryExecutionAttribution), QueryError>
749    where
750        E: PersistedRow<Canister = C> + EntityValue,
751    {
752        // Phase 1: measure compile work at the typed/fluent boundary,
753        // including the shared lower query-plan cache lookup/build exactly
754        // once. This preserves honest hit/miss attribution without
755        // double-building plans on one-shot cache misses.
756        let (compile_local_instructions, plan_and_cache) = measure_query_stage(|| {
757            self.cached_prepared_query_plan_for_entity::<E>(query.structural())
758        });
759        let (plan, cache_attribution) = plan_and_cache?;
760
761        // Phase 2: execute one query result using the prepared plan produced
762        // by the compile/cache boundary above.
763        let (execute_local_instructions, result) = measure_query_stage(|| {
764            if query.has_grouping() {
765                self.execute_grouped_query_result_with_attribution(plan)
766            } else {
767                self.execute_scalar_query_result_with_attribution(query.mode(), plan)
768            }
769        });
770        let (result, execute_phase_attribution, response_decode_local_instructions) = result?;
771        let total_local_instructions =
772            compile_local_instructions.saturating_add(execute_local_instructions);
773
774        Ok((
775            result,
776            QueryExecutionAttribution {
777                compile_local_instructions,
778                runtime_local_instructions: execute_phase_attribution.runtime_local_instructions,
779                finalize_local_instructions: execute_phase_attribution.finalize_local_instructions,
780                direct_data_row_scan_local_instructions: execute_phase_attribution
781                    .direct_data_row_scan_local_instructions,
782                direct_data_row_key_stream_local_instructions: execute_phase_attribution
783                    .direct_data_row_key_stream_local_instructions,
784                direct_data_row_row_read_local_instructions: execute_phase_attribution
785                    .direct_data_row_row_read_local_instructions,
786                direct_data_row_key_encode_local_instructions: execute_phase_attribution
787                    .direct_data_row_key_encode_local_instructions,
788                direct_data_row_store_get_local_instructions: execute_phase_attribution
789                    .direct_data_row_store_get_local_instructions,
790                direct_data_row_order_window_local_instructions: execute_phase_attribution
791                    .direct_data_row_order_window_local_instructions,
792                direct_data_row_page_window_local_instructions: execute_phase_attribution
793                    .direct_data_row_page_window_local_instructions,
794                grouped_stream_local_instructions: execute_phase_attribution
795                    .grouped_stream_local_instructions,
796                grouped_fold_local_instructions: execute_phase_attribution
797                    .grouped_fold_local_instructions,
798                grouped_finalize_local_instructions: execute_phase_attribution
799                    .grouped_finalize_local_instructions,
800                grouped_count_borrowed_hash_computations: execute_phase_attribution
801                    .grouped_count
802                    .borrowed_hash_computations,
803                grouped_count_bucket_candidate_checks: execute_phase_attribution
804                    .grouped_count
805                    .bucket_candidate_checks,
806                grouped_count_existing_group_hits: execute_phase_attribution
807                    .grouped_count
808                    .existing_group_hits,
809                grouped_count_new_group_inserts: execute_phase_attribution
810                    .grouped_count
811                    .new_group_inserts,
812                grouped_count_row_materialization_local_instructions: execute_phase_attribution
813                    .grouped_count
814                    .row_materialization_local_instructions,
815                grouped_count_group_lookup_local_instructions: execute_phase_attribution
816                    .grouped_count
817                    .group_lookup_local_instructions,
818                grouped_count_existing_group_update_local_instructions: execute_phase_attribution
819                    .grouped_count
820                    .existing_group_update_local_instructions,
821                grouped_count_new_group_insert_local_instructions: execute_phase_attribution
822                    .grouped_count
823                    .new_group_insert_local_instructions,
824                response_decode_local_instructions,
825                execute_local_instructions,
826                total_local_instructions,
827                shared_query_plan_cache_hits: cache_attribution.hits,
828                shared_query_plan_cache_misses: cache_attribution.misses,
829            },
830        ))
831    }
832
833    // Execute one typed query through the unified row/grouped result surface so
834    // higher layers do not need to branch on grouped shape themselves.
835    #[doc(hidden)]
836    pub fn execute_query_result<E>(
837        &self,
838        query: &Query<E>,
839    ) -> Result<LoadQueryResult<E>, QueryError>
840    where
841        E: PersistedRow<Canister = C> + EntityValue,
842    {
843        if query.has_grouping() {
844            return self.execute_grouped_query_result(query, None);
845        }
846
847        self.execute_query(query).map(LoadQueryResult::rows)
848    }
849
850    /// Execute one typed delete query and return only the affected-row count.
851    #[doc(hidden)]
852    pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
853    where
854        E: PersistedRow<Canister = C> + EntityValue,
855    {
856        // Phase 1: fail closed if the caller routes a non-delete query here.
857        if !query.mode().is_delete() {
858            return Err(QueryError::unsupported_query(
859                "delete count execution requires delete query mode",
860            ));
861        }
862
863        // Phase 2: compile typed delete intent into one prepared execution-plan contract.
864        let plan = self
865            .compile_query_with_visible_indexes(query)?
866            .into_prepared_execution_plan();
867
868        // Phase 3: execute the shared delete core while skipping response-row materialization.
869        self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
870            .map_err(QueryError::execute)
871    }
872
873    /// Execute one scalar query from one pre-built prepared execution contract.
874    ///
875    /// This is the shared compiled-plan entry boundary used by the typed
876    /// `execute_query(...)` surface and adjacent query execution facades.
877    pub(in crate::db) fn execute_query_dyn<E>(
878        &self,
879        mode: QueryMode,
880        plan: PreparedExecutionPlan<E>,
881    ) -> Result<EntityResponse<E>, QueryError>
882    where
883        E: PersistedRow<Canister = C> + EntityValue,
884    {
885        let result = match mode {
886            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
887            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
888        };
889
890        result.map_err(QueryError::execute)
891    }
892
893    // Shared load-query terminal wrapper: build plan, run under metrics, map
894    // execution errors into query-facing errors.
895    pub(in crate::db) fn execute_load_query_with<E, T>(
896        &self,
897        query: &Query<E>,
898        op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
899    ) -> Result<T, QueryError>
900    where
901        E: PersistedRow<Canister = C> + EntityValue,
902    {
903        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
904
905        self.with_metrics(|| op(self.load_executor::<E>(), plan))
906            .map_err(QueryError::execute)
907    }
908
909    /// Build one trace payload for a query without executing it.
910    ///
911    /// This lightweight surface is intended for developer diagnostics:
912    /// plan hash, access strategy summary, and planner/executor route shape.
913    pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
914    where
915        E: EntityKind<Canister = C>,
916    {
917        let compiled = self.compile_query_with_visible_indexes(query)?;
918        let explain = compiled.explain();
919        let plan_hash = compiled.plan_hash_hex();
920
921        let (executable, _) =
922            self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
923        let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
924        let execution_family = match query.mode() {
925            QueryMode::Load(_) => Some(executable.execution_family().map_err(QueryError::execute)?),
926            QueryMode::Delete(_) => None,
927        };
928
929        Ok(QueryTracePlan::new(
930            plan_hash,
931            access_strategy,
932            execution_family,
933            explain,
934        ))
935    }
936
937    /// Execute one scalar paged load query and return optional continuation cursor plus trace.
938    pub(crate) fn execute_load_query_paged_with_trace<E>(
939        &self,
940        query: &Query<E>,
941        cursor_token: Option<&str>,
942    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
943    where
944        E: PersistedRow<Canister = C> + EntityValue,
945    {
946        // Phase 1: build/validate prepared execution plan and reject grouped plans.
947        let plan = self
948            .cached_prepared_query_plan_for_entity::<E>(query.structural())?
949            .0;
950        Self::ensure_scalar_paged_execution_family(
951            plan.execution_family().map_err(QueryError::execute)?,
952        )?;
953
954        // Phase 2: decode external cursor token and validate it against plan surface.
955        let cursor_bytes = decode_optional_cursor_token(cursor_token)
956            .map_err(QueryError::from_cursor_plan_error)?;
957        let cursor = plan
958            .prepare_cursor(cursor_bytes.as_deref())
959            .map_err(QueryError::from_executor_plan_error)?;
960
961        // Phase 3: execute one traced page and encode outbound continuation token.
962        let (page, trace) = self
963            .with_metrics(|| {
964                self.load_executor::<E>()
965                    .execute_paged_with_cursor_traced(plan, cursor)
966            })
967            .map_err(QueryError::execute)?;
968        let next_cursor = page
969            .next_cursor
970            .map(|token| {
971                let Some(token) = token.as_scalar() else {
972                    return Err(QueryError::scalar_paged_emitted_grouped_continuation());
973                };
974
975                token.encode().map_err(|err| {
976                    QueryError::serialize_internal(format!(
977                        "failed to serialize continuation cursor: {err}"
978                    ))
979                })
980            })
981            .transpose()?;
982
983        Ok(PagedLoadExecutionWithTrace::new(
984            page.items,
985            next_cursor,
986            trace,
987        ))
988    }
989
990    /// Execute one grouped query page with optional grouped continuation cursor.
991    ///
992    /// This is the explicit grouped execution boundary; scalar load APIs reject
993    /// grouped plans to preserve scalar response contracts.
994    pub(in crate::db) fn execute_grouped<E>(
995        &self,
996        query: &Query<E>,
997        cursor_token: Option<&str>,
998    ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
999    where
1000        E: PersistedRow<Canister = C> + EntityValue,
1001    {
1002        let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
1003        Self::finalize_grouped_execution_page(page, trace)
1004    }
1005
1006    // Execute the canonical grouped query core and return the raw grouped page
1007    // plus optional execution trace before outward cursor formatting.
1008    fn execute_grouped_page_with_trace<E>(
1009        &self,
1010        query: &Query<E>,
1011        cursor_token: Option<&str>,
1012    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
1013    where
1014        E: PersistedRow<Canister = C> + EntityValue,
1015    {
1016        // Phase 1: build the prepared execution plan once from the typed query.
1017        let plan = self
1018            .cached_prepared_query_plan_for_entity::<E>(query.structural())?
1019            .0;
1020
1021        // Phase 2: reuse the shared prepared grouped execution path.
1022        self.execute_grouped_plan_with_trace(plan, cursor_token)
1023    }
1024
1025    // Execute one grouped prepared plan page with optional grouped cursor.
1026    fn execute_grouped_plan_with_trace<E>(
1027        &self,
1028        plan: PreparedExecutionPlan<E>,
1029        cursor_token: Option<&str>,
1030    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
1031    where
1032        E: PersistedRow<Canister = C> + EntityValue,
1033    {
1034        // Phase 1: validate the prepared plan shape before decoding cursors.
1035        Self::ensure_grouped_execution_family(
1036            plan.execution_family().map_err(QueryError::execute)?,
1037        )?;
1038
1039        // Phase 2: decode external grouped cursor token and validate against plan.
1040        let cursor = decode_optional_grouped_cursor_token(cursor_token)
1041            .map_err(QueryError::from_cursor_plan_error)?;
1042        let cursor = plan
1043            .prepare_grouped_cursor_token(cursor)
1044            .map_err(QueryError::from_executor_plan_error)?;
1045
1046        // Phase 3: execute one grouped page while preserving the structural
1047        // grouped cursor payload for whichever outward cursor format the caller needs.
1048        self.with_metrics(|| {
1049            self.load_executor::<E>()
1050                .execute_grouped_paged_with_cursor_traced(plan, cursor)
1051        })
1052        .map_err(QueryError::execute)
1053    }
1054
1055    #[cfg(feature = "diagnostics")]
1056    fn execute_grouped_plan_with_trace_with_phase_attribution<E>(
1057        &self,
1058        plan: PreparedExecutionPlan<E>,
1059        cursor_token: Option<&str>,
1060    ) -> Result<
1061        (
1062            GroupedCursorPage,
1063            Option<ExecutionTrace>,
1064            GroupedExecutePhaseAttribution,
1065        ),
1066        QueryError,
1067    >
1068    where
1069        E: PersistedRow<Canister = C> + EntityValue,
1070    {
1071        Self::ensure_grouped_execution_family(
1072            plan.execution_family().map_err(QueryError::execute)?,
1073        )?;
1074
1075        let cursor = decode_optional_grouped_cursor_token(cursor_token)
1076            .map_err(QueryError::from_cursor_plan_error)?;
1077        let cursor = plan
1078            .prepare_grouped_cursor_token(cursor)
1079            .map_err(QueryError::from_executor_plan_error)?;
1080
1081        self.with_metrics(|| {
1082            self.load_executor::<E>()
1083                .execute_grouped_paged_with_cursor_traced_with_phase_attribution(plan, cursor)
1084        })
1085        .map_err(QueryError::execute)
1086    }
1087}
1088
1089impl QueryPlanCacheKey {
1090    fn for_authority_with_normalized_predicate(
1091        authority: crate::db::executor::EntityAuthority,
1092        schema_fingerprint: CommitSchemaFingerprint,
1093        visibility: QueryPlanVisibility,
1094        query: &StructuralQuery,
1095        normalized_predicate: Option<&crate::db::predicate::Predicate>,
1096    ) -> Self {
1097        Self::for_authority_with_normalized_predicate_and_method_version(
1098            authority,
1099            schema_fingerprint,
1100            visibility,
1101            query,
1102            normalized_predicate,
1103            SHARED_QUERY_PLAN_CACHE_METHOD_VERSION,
1104        )
1105    }
1106
1107    // Assemble the canonical cache-key shell once so the test and
1108    // normalized-predicate constructors only decide which structural query key
1109    // they feed into the shared session cache identity.
1110    const fn from_authority_parts(
1111        authority: crate::db::executor::EntityAuthority,
1112        schema_fingerprint: CommitSchemaFingerprint,
1113        visibility: QueryPlanVisibility,
1114        structural_query: crate::db::query::intent::StructuralQueryCacheKey,
1115        cache_method_version: u8,
1116    ) -> Self {
1117        Self {
1118            cache_method_version,
1119            entity_path: authority.entity_path(),
1120            schema_fingerprint,
1121            visibility,
1122            structural_query,
1123        }
1124    }
1125
1126    #[cfg(test)]
1127    fn for_authority_with_method_version(
1128        authority: crate::db::executor::EntityAuthority,
1129        schema_fingerprint: CommitSchemaFingerprint,
1130        visibility: QueryPlanVisibility,
1131        query: &StructuralQuery,
1132        cache_method_version: u8,
1133    ) -> Self {
1134        Self::from_authority_parts(
1135            authority,
1136            schema_fingerprint,
1137            visibility,
1138            query.structural_cache_key(),
1139            cache_method_version,
1140        )
1141    }
1142
1143    fn for_authority_with_normalized_predicate_and_method_version(
1144        authority: crate::db::executor::EntityAuthority,
1145        schema_fingerprint: CommitSchemaFingerprint,
1146        visibility: QueryPlanVisibility,
1147        query: &StructuralQuery,
1148        normalized_predicate: Option<&crate::db::predicate::Predicate>,
1149        cache_method_version: u8,
1150    ) -> Self {
1151        Self::from_authority_parts(
1152            authority,
1153            schema_fingerprint,
1154            visibility,
1155            query.structural_cache_key_with_normalized_predicate(normalized_predicate),
1156            cache_method_version,
1157        )
1158    }
1159}