Skip to main content

icydb_core/db/session/
query.rs

1//! Module: db::session::query
2//! Responsibility: session-bound query planning, explain, and cursor execution
3//! helpers that recover store visibility before delegating to query-owned logic.
4//! Does not own: query intent construction or executor runtime semantics.
5//! Boundary: resolves session visibility and cursor policy before handing work to the planner/executor.
6
7#[cfg(feature = "perf-attribution")]
8use crate::db::executor::ScalarExecutePhaseAttribution;
9use crate::{
10    db::{
11        DbSession, EntityResponse, LoadQueryResult, PagedGroupedExecutionWithTrace,
12        PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
13        access::AccessStrategy,
14        commit::CommitSchemaFingerprint,
15        cursor::{
16            CursorPlanError, decode_optional_cursor_token, decode_optional_grouped_cursor_token,
17        },
18        diagnostics::ExecutionTrace,
19        executor::{
20            ExecutionFamily, GroupedCursorPage, LoadExecutor, PreparedExecutionPlan,
21            SharedPreparedExecutionPlan,
22        },
23        query::builder::{
24            PreparedFluentAggregateExplainStrategy, PreparedFluentProjectionStrategy,
25        },
26        query::explain::{
27            ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
28        },
29        query::{
30            intent::{CompiledQuery, PlannedQuery, StructuralQuery},
31            plan::{AccessPlannedQuery, QueryMode, VisibleIndexes},
32        },
33    },
34    error::InternalError,
35    model::entity::EntityModel,
36    traits::{CanisterKind, EntityKind, EntityValue, Path},
37};
38#[cfg(feature = "perf-attribution")]
39use candid::CandidType;
40use icydb_utils::Xxh3;
41#[cfg(feature = "perf-attribution")]
42use serde::Deserialize;
43use std::{cell::RefCell, collections::HashMap, hash::BuildHasherDefault};
44
45type CacheBuildHasher = BuildHasherDefault<Xxh3>;
46
47// Bump this when the shared lower query-plan cache key meaning changes in a
48// way that must force old in-heap entries to miss instead of aliasing.
49const SHARED_QUERY_PLAN_CACHE_METHOD_VERSION: u8 = 1;
50
51#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
52pub(in crate::db) enum QueryPlanVisibility {
53    StoreNotReady,
54    StoreReady,
55}
56
57#[derive(Clone, Debug, Eq, Hash, PartialEq)]
58pub(in crate::db) struct QueryPlanCacheKey {
59    cache_method_version: u8,
60    entity_path: &'static str,
61    schema_fingerprint: CommitSchemaFingerprint,
62    visibility: QueryPlanVisibility,
63    structural_query: crate::db::query::intent::StructuralQueryCacheKey,
64}
65
66#[derive(Clone, Debug)]
67pub(in crate::db) struct QueryPlanCacheEntry {
68    logical_plan: AccessPlannedQuery,
69    prepared_plan: SharedPreparedExecutionPlan,
70}
71
72impl QueryPlanCacheEntry {
73    #[must_use]
74    pub(in crate::db) const fn new(
75        logical_plan: AccessPlannedQuery,
76        prepared_plan: SharedPreparedExecutionPlan,
77    ) -> Self {
78        Self {
79            logical_plan,
80            prepared_plan,
81        }
82    }
83
84    #[must_use]
85    pub(in crate::db) const fn logical_plan(&self) -> &AccessPlannedQuery {
86        &self.logical_plan
87    }
88
89    #[must_use]
90    pub(in crate::db) fn typed_prepared_plan<E: EntityKind>(&self) -> PreparedExecutionPlan<E> {
91        self.prepared_plan.typed_clone::<E>()
92    }
93}
94
95pub(in crate::db) type QueryPlanCache =
96    HashMap<QueryPlanCacheKey, QueryPlanCacheEntry, CacheBuildHasher>;
97
98thread_local! {
99    // Keep one in-heap query-plan cache per store registry so fresh `DbSession`
100    // facades can share prepared logical plans across update/query calls while
101    // tests and multi-registry host processes remain isolated by registry
102    // identity.
103    static QUERY_PLAN_CACHES: RefCell<HashMap<usize, QueryPlanCache, CacheBuildHasher>> =
104        RefCell::new(HashMap::default());
105}
106
107#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
108pub(in crate::db) struct QueryPlanCacheAttribution {
109    pub hits: u64,
110    pub misses: u64,
111}
112
113impl QueryPlanCacheAttribution {
114    #[must_use]
115    const fn hit() -> Self {
116        Self { hits: 1, misses: 0 }
117    }
118
119    #[must_use]
120    const fn miss() -> Self {
121        Self { hits: 0, misses: 1 }
122    }
123}
124
125///
126/// QueryExecutionAttribution
127///
128/// QueryExecutionAttribution records the top-level compile/execute split for
129/// typed/fluent query execution at the session boundary.
130///
131#[cfg(feature = "perf-attribution")]
132#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
133pub struct QueryExecutionAttribution {
134    pub compile_local_instructions: u64,
135    pub runtime_local_instructions: u64,
136    pub finalize_local_instructions: u64,
137    pub direct_data_row_scan_local_instructions: u64,
138    pub direct_data_row_key_stream_local_instructions: u64,
139    pub direct_data_row_row_read_local_instructions: u64,
140    pub direct_data_row_key_encode_local_instructions: u64,
141    pub direct_data_row_store_get_local_instructions: u64,
142    pub direct_data_row_order_window_local_instructions: u64,
143    pub direct_data_row_page_window_local_instructions: u64,
144    pub response_decode_local_instructions: u64,
145    pub execute_local_instructions: u64,
146    pub total_local_instructions: u64,
147    pub shared_query_plan_cache_hits: u64,
148    pub shared_query_plan_cache_misses: u64,
149}
150
151#[cfg(feature = "perf-attribution")]
152#[expect(
153    clippy::missing_const_for_fn,
154    reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
155)]
156fn read_query_local_instruction_counter() -> u64 {
157    #[cfg(target_arch = "wasm32")]
158    {
159        canic_cdk::api::performance_counter(1)
160    }
161
162    #[cfg(not(target_arch = "wasm32"))]
163    {
164        0
165    }
166}
167
168#[cfg(feature = "perf-attribution")]
169fn measure_query_stage<T, E>(run: impl FnOnce() -> Result<T, E>) -> (u64, Result<T, E>) {
170    let start = read_query_local_instruction_counter();
171    let result = run();
172    let delta = read_query_local_instruction_counter().saturating_sub(start);
173
174    (delta, result)
175}
176
177impl<C: CanisterKind> DbSession<C> {
178    #[cfg(feature = "perf-attribution")]
179    const fn empty_scalar_execute_phase_attribution() -> ScalarExecutePhaseAttribution {
180        ScalarExecutePhaseAttribution {
181            runtime_local_instructions: 0,
182            finalize_local_instructions: 0,
183            direct_data_row_scan_local_instructions: 0,
184            direct_data_row_key_stream_local_instructions: 0,
185            direct_data_row_row_read_local_instructions: 0,
186            direct_data_row_key_encode_local_instructions: 0,
187            direct_data_row_store_get_local_instructions: 0,
188            direct_data_row_order_window_local_instructions: 0,
189            direct_data_row_page_window_local_instructions: 0,
190        }
191    }
192
193    fn query_plan_cache_scope_id(&self) -> usize {
194        self.db.cache_scope_id()
195    }
196
197    fn with_query_plan_cache<R>(&self, f: impl FnOnce(&mut QueryPlanCache) -> R) -> R {
198        let scope_id = self.query_plan_cache_scope_id();
199
200        QUERY_PLAN_CACHES.with(|caches| {
201            let mut caches = caches.borrow_mut();
202            let cache = caches.entry(scope_id).or_default();
203
204            f(cache)
205        })
206    }
207
208    const fn visible_indexes_for_model(
209        model: &'static EntityModel,
210        visibility: QueryPlanVisibility,
211    ) -> VisibleIndexes<'static> {
212        match visibility {
213            QueryPlanVisibility::StoreReady => VisibleIndexes::planner_visible(model.indexes()),
214            QueryPlanVisibility::StoreNotReady => VisibleIndexes::none(),
215        }
216    }
217
218    #[cfg(test)]
219    pub(in crate::db) fn query_plan_cache_len(&self) -> usize {
220        self.with_query_plan_cache(|cache| cache.len())
221    }
222
223    #[cfg(test)]
224    pub(in crate::db) fn clear_query_plan_cache_for_tests(&self) {
225        self.with_query_plan_cache(QueryPlanCache::clear);
226    }
227
228    pub(in crate::db) fn query_plan_visibility_for_store_path(
229        &self,
230        store_path: &'static str,
231    ) -> Result<QueryPlanVisibility, QueryError> {
232        let store = self
233            .db
234            .recovered_store(store_path)
235            .map_err(QueryError::execute)?;
236        let visibility = if store.index_state() == crate::db::IndexState::Ready {
237            QueryPlanVisibility::StoreReady
238        } else {
239            QueryPlanVisibility::StoreNotReady
240        };
241
242        Ok(visibility)
243    }
244
245    pub(in crate::db) fn cached_query_plan_entry_for_authority(
246        &self,
247        authority: crate::db::executor::EntityAuthority,
248        schema_fingerprint: CommitSchemaFingerprint,
249        query: &StructuralQuery,
250    ) -> Result<(QueryPlanCacheEntry, QueryPlanCacheAttribution), QueryError> {
251        let visibility = self.query_plan_visibility_for_store_path(authority.store_path())?;
252        let visible_indexes = Self::visible_indexes_for_model(authority.model(), visibility);
253        let normalized_predicate = query.prepare_normalized_scalar_predicate()?;
254        let cache_key = QueryPlanCacheKey::for_authority_with_normalized_predicate(
255            authority,
256            schema_fingerprint,
257            visibility,
258            query,
259            normalized_predicate.as_ref(),
260        );
261
262        {
263            let cached = self.with_query_plan_cache(|cache| cache.get(&cache_key).cloned());
264            if let Some(entry) = cached {
265                return Ok((entry, QueryPlanCacheAttribution::hit()));
266            }
267        }
268
269        let plan = query.build_plan_with_visible_indexes_from_normalized_predicate(
270            &visible_indexes,
271            normalized_predicate,
272        )?;
273        let entry = QueryPlanCacheEntry::new(
274            plan.clone(),
275            SharedPreparedExecutionPlan::from_plan(authority, plan),
276        );
277        self.with_query_plan_cache(|cache| {
278            cache.insert(cache_key, entry.clone());
279        });
280
281        Ok((entry, QueryPlanCacheAttribution::miss()))
282    }
283
284    #[cfg(test)]
285    pub(in crate::db) fn query_plan_cache_key_for_tests(
286        authority: crate::db::executor::EntityAuthority,
287        schema_fingerprint: CommitSchemaFingerprint,
288        visibility: QueryPlanVisibility,
289        query: &StructuralQuery,
290        cache_method_version: u8,
291    ) -> QueryPlanCacheKey {
292        QueryPlanCacheKey::for_authority_with_method_version(
293            authority,
294            schema_fingerprint,
295            visibility,
296            query,
297            cache_method_version,
298        )
299    }
300
301    pub(in crate::db) fn cached_structural_plan_for_authority(
302        &self,
303        authority: crate::db::executor::EntityAuthority,
304        schema_fingerprint: CommitSchemaFingerprint,
305        query: &StructuralQuery,
306    ) -> Result<AccessPlannedQuery, QueryError> {
307        let (entry, _) =
308            self.cached_query_plan_entry_for_authority(authority, schema_fingerprint, query)?;
309
310        Ok(entry.logical_plan().clone())
311    }
312
313    // Resolve the planner-visible index slice for one typed query exactly once
314    // at the session boundary before handing execution/planning off to query-owned logic.
315    fn with_query_visible_indexes<E, T>(
316        &self,
317        query: &Query<E>,
318        op: impl FnOnce(
319            &Query<E>,
320            &crate::db::query::plan::VisibleIndexes<'static>,
321        ) -> Result<T, QueryError>,
322    ) -> Result<T, QueryError>
323    where
324        E: EntityKind<Canister = C>,
325    {
326        let visibility = self.query_plan_visibility_for_store_path(E::Store::PATH)?;
327        let visible_indexes = Self::visible_indexes_for_model(E::MODEL, visibility);
328
329        op(query, &visible_indexes)
330    }
331
332    // Resolve one typed structural query onto the shared lower plan cache so
333    // typed/fluent callers do not each duplicate the entity metadata plumbing.
334    fn cached_structural_plan_for_entity<E>(
335        &self,
336        query: &StructuralQuery,
337    ) -> Result<AccessPlannedQuery, QueryError>
338    where
339        E: EntityKind<Canister = C>,
340    {
341        self.cached_structural_plan_for_authority(
342            crate::db::executor::EntityAuthority::for_type::<E>(),
343            crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
344            query,
345        )
346    }
347
348    // Resolve one cached structural plan for the typed entity and then
349    // project it into one caller-owned wrapper so planned vs compiled session
350    // surfaces do not each duplicate the same cache lookup and entity wiring.
351    fn map_cached_structural_plan_for_entity<E, T>(
352        &self,
353        query: &StructuralQuery,
354        map: impl FnOnce(AccessPlannedQuery) -> T,
355    ) -> Result<T, QueryError>
356    where
357        E: EntityKind<Canister = C>,
358    {
359        let plan = self.cached_structural_plan_for_entity::<E>(query)?;
360
361        Ok(map(plan))
362    }
363
364    pub(in crate::db::session) fn cached_prepared_query_plan_for_entity<E>(
365        &self,
366        query: &StructuralQuery,
367    ) -> Result<(PreparedExecutionPlan<E>, QueryPlanCacheAttribution), QueryError>
368    where
369        E: EntityKind<Canister = C>,
370    {
371        let (entry, attribution) = self.cached_query_plan_entry_for_authority(
372            crate::db::executor::EntityAuthority::for_type::<E>(),
373            crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
374            query,
375        )?;
376
377        Ok((entry.typed_prepared_plan::<E>(), attribution))
378    }
379
380    // Compile one typed query using only the indexes currently visible for the
381    // query's recovered store.
382    pub(in crate::db) fn compile_query_with_visible_indexes<E>(
383        &self,
384        query: &Query<E>,
385    ) -> Result<CompiledQuery<E>, QueryError>
386    where
387        E: EntityKind<Canister = C>,
388    {
389        self.map_cached_structural_plan_for_entity::<E, _>(
390            query.structural(),
391            Query::<E>::compiled_query_from_plan,
392        )
393    }
394
395    // Build one logical planned-query shell using only the indexes currently
396    // visible for the query's recovered store.
397    pub(in crate::db) fn planned_query_with_visible_indexes<E>(
398        &self,
399        query: &Query<E>,
400    ) -> Result<PlannedQuery<E>, QueryError>
401    where
402        E: EntityKind<Canister = C>,
403    {
404        self.map_cached_structural_plan_for_entity::<E, _>(
405            query.structural(),
406            Query::<E>::planned_query_from_plan,
407        )
408    }
409
410    // Project one logical explain payload using only planner-visible indexes.
411    pub(in crate::db) fn explain_query_with_visible_indexes<E>(
412        &self,
413        query: &Query<E>,
414    ) -> Result<ExplainPlan, QueryError>
415    where
416        E: EntityKind<Canister = C>,
417    {
418        self.with_query_visible_indexes(query, |query, visible_indexes| {
419            query.explain_with_visible_indexes(visible_indexes)
420        })
421    }
422
423    // Hash one typed query plan using only the indexes currently visible for
424    // the query's recovered store.
425    pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
426        &self,
427        query: &Query<E>,
428    ) -> Result<String, QueryError>
429    where
430        E: EntityKind<Canister = C>,
431    {
432        self.with_query_visible_indexes(query, |query, visible_indexes| {
433            query.plan_hash_hex_with_visible_indexes(visible_indexes)
434        })
435    }
436
437    // Explain one load execution shape using only planner-visible
438    // indexes from the recovered store state.
439    pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
440        &self,
441        query: &Query<E>,
442    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
443    where
444        E: EntityValue + EntityKind<Canister = C>,
445    {
446        self.with_query_visible_indexes(query, |query, visible_indexes| {
447            query.explain_execution_with_visible_indexes(visible_indexes)
448        })
449    }
450
451    // Render one load execution descriptor plus route diagnostics using
452    // only planner-visible indexes from the recovered store state.
453    pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
454        &self,
455        query: &Query<E>,
456    ) -> Result<String, QueryError>
457    where
458        E: EntityValue + EntityKind<Canister = C>,
459    {
460        self.with_query_visible_indexes(query, |query, visible_indexes| {
461            query.explain_execution_verbose_with_visible_indexes(visible_indexes)
462        })
463    }
464
465    // Explain one prepared fluent aggregate terminal using only
466    // planner-visible indexes from the recovered store state.
467    pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
468        &self,
469        query: &Query<E>,
470        strategy: &S,
471    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
472    where
473        E: EntityValue + EntityKind<Canister = C>,
474        S: PreparedFluentAggregateExplainStrategy,
475    {
476        self.with_query_visible_indexes(query, |query, visible_indexes| {
477            query
478                .explain_prepared_aggregate_terminal_with_visible_indexes(visible_indexes, strategy)
479        })
480    }
481
482    // Explain one `bytes_by(field)` terminal using only planner-visible
483    // indexes from the recovered store state.
484    pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
485        &self,
486        query: &Query<E>,
487        target_field: &str,
488    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
489    where
490        E: EntityValue + EntityKind<Canister = C>,
491    {
492        self.with_query_visible_indexes(query, |query, visible_indexes| {
493            query.explain_bytes_by_with_visible_indexes(visible_indexes, target_field)
494        })
495    }
496
497    // Explain one prepared fluent projection terminal using only
498    // planner-visible indexes from the recovered store state.
499    pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
500        &self,
501        query: &Query<E>,
502        strategy: &PreparedFluentProjectionStrategy,
503    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
504    where
505        E: EntityValue + EntityKind<Canister = C>,
506    {
507        self.with_query_visible_indexes(query, |query, visible_indexes| {
508            query.explain_prepared_projection_terminal_with_visible_indexes(
509                visible_indexes,
510                strategy,
511            )
512        })
513    }
514
515    // Validate that one execution strategy is admissible for scalar paged load
516    // execution and fail closed on grouped/primary-key-only routes.
517    fn ensure_scalar_paged_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
518        match family {
519            ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
520                CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
521            )),
522            ExecutionFamily::Ordered => Ok(()),
523            ExecutionFamily::Grouped => Err(QueryError::invariant(
524                "grouped queries execute via execute(), not page().execute()",
525            )),
526        }
527    }
528
529    // Validate that one execution strategy is admissible for the grouped
530    // execution surface.
531    fn ensure_grouped_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
532        match family {
533            ExecutionFamily::Grouped => Ok(()),
534            ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
535                "grouped execution requires grouped logical plans",
536            )),
537        }
538    }
539
540    /// Execute one scalar load/delete query and return materialized response rows.
541    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
542    where
543        E: PersistedRow<Canister = C> + EntityValue,
544    {
545        // Phase 1: compile typed intent into one prepared execution-plan contract.
546        let mode = query.mode();
547        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
548
549        // Phase 2: delegate execution to the shared compiled-plan entry path.
550        self.execute_query_dyn(mode, plan)
551    }
552
553    /// Execute one typed query while reporting the compile/execute split at
554    /// the shared fluent query seam.
555    #[cfg(feature = "perf-attribution")]
556    #[doc(hidden)]
557    pub fn execute_query_result_with_attribution<E>(
558        &self,
559        query: &Query<E>,
560    ) -> Result<(LoadQueryResult<E>, QueryExecutionAttribution), QueryError>
561    where
562        E: PersistedRow<Canister = C> + EntityValue,
563    {
564        // Phase 1: measure compile work at the typed/fluent boundary,
565        // including the shared lower query-plan cache lookup/build exactly
566        // once. This preserves honest hit/miss attribution without
567        // double-building plans on one-shot cache misses.
568        let (compile_local_instructions, plan_and_cache) = measure_query_stage(|| {
569            self.cached_prepared_query_plan_for_entity::<E>(query.structural())
570        });
571        let (plan, cache_attribution) = plan_and_cache?;
572
573        // Phase 2: execute one query result using the prepared plan produced
574        // by the compile/cache boundary above.
575        let (execute_local_instructions, result) = measure_query_stage(|| {
576            if query.has_grouping() {
577                self.execute_grouped_plan_with_trace(plan, None)
578                    .map(|(page, trace)| {
579                        let next_cursor = page
580                            .next_cursor
581                            .map(|token| {
582                                let Some(token) = token.as_grouped() else {
583                                    return Err(
584                                        QueryError::grouped_paged_emitted_scalar_continuation(),
585                                    );
586                                };
587
588                                token.encode().map_err(|err| {
589                                    QueryError::serialize_internal(format!(
590                                        "failed to serialize grouped continuation cursor: {err}"
591                                    ))
592                                })
593                            })
594                            .transpose()?;
595
596                        Ok::<(LoadQueryResult<E>, ScalarExecutePhaseAttribution, u64), QueryError>(
597                            (
598                                LoadQueryResult::Grouped(PagedGroupedExecutionWithTrace::new(
599                                    page.rows,
600                                    next_cursor,
601                                    trace,
602                                )),
603                                Self::empty_scalar_execute_phase_attribution(),
604                                0,
605                            ),
606                        )
607                    })?
608            } else {
609                match query.mode() {
610                    QueryMode::Load(_) => {
611                        let (rows, phase_attribution, response_decode_local_instructions) = self
612                            .load_executor::<E>()
613                            .execute_with_phase_attribution(plan)
614                            .map_err(QueryError::execute)?;
615
616                        Ok::<(LoadQueryResult<E>, ScalarExecutePhaseAttribution, u64), QueryError>(
617                            (
618                                LoadQueryResult::Rows(rows),
619                                phase_attribution,
620                                response_decode_local_instructions,
621                            ),
622                        )
623                    }
624                    QueryMode::Delete(_) => {
625                        let result = self.execute_query_dyn(query.mode(), plan)?;
626
627                        Ok((
628                            LoadQueryResult::Rows(result),
629                            Self::empty_scalar_execute_phase_attribution(),
630                            0,
631                        ))
632                    }
633                }
634            }
635        });
636        let (result, execute_phase_attribution, response_decode_local_instructions) = result?;
637        let total_local_instructions =
638            compile_local_instructions.saturating_add(execute_local_instructions);
639
640        Ok((
641            result,
642            QueryExecutionAttribution {
643                compile_local_instructions,
644                runtime_local_instructions: execute_phase_attribution.runtime_local_instructions,
645                finalize_local_instructions: execute_phase_attribution.finalize_local_instructions,
646                direct_data_row_scan_local_instructions: execute_phase_attribution
647                    .direct_data_row_scan_local_instructions,
648                direct_data_row_key_stream_local_instructions: execute_phase_attribution
649                    .direct_data_row_key_stream_local_instructions,
650                direct_data_row_row_read_local_instructions: execute_phase_attribution
651                    .direct_data_row_row_read_local_instructions,
652                direct_data_row_key_encode_local_instructions: execute_phase_attribution
653                    .direct_data_row_key_encode_local_instructions,
654                direct_data_row_store_get_local_instructions: execute_phase_attribution
655                    .direct_data_row_store_get_local_instructions,
656                direct_data_row_order_window_local_instructions: execute_phase_attribution
657                    .direct_data_row_order_window_local_instructions,
658                direct_data_row_page_window_local_instructions: execute_phase_attribution
659                    .direct_data_row_page_window_local_instructions,
660                response_decode_local_instructions,
661                execute_local_instructions,
662                total_local_instructions,
663                shared_query_plan_cache_hits: cache_attribution.hits,
664                shared_query_plan_cache_misses: cache_attribution.misses,
665            },
666        ))
667    }
668
669    // Execute one typed query through the unified row/grouped result surface so
670    // higher layers do not need to branch on grouped shape themselves.
671    #[doc(hidden)]
672    pub fn execute_query_result<E>(
673        &self,
674        query: &Query<E>,
675    ) -> Result<LoadQueryResult<E>, QueryError>
676    where
677        E: PersistedRow<Canister = C> + EntityValue,
678    {
679        if query.has_grouping() {
680            return self
681                .execute_grouped(query, None)
682                .map(LoadQueryResult::Grouped);
683        }
684
685        self.execute_query(query).map(LoadQueryResult::Rows)
686    }
687
688    /// Execute one typed delete query and return only the affected-row count.
689    #[doc(hidden)]
690    pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
691    where
692        E: PersistedRow<Canister = C> + EntityValue,
693    {
694        // Phase 1: fail closed if the caller routes a non-delete query here.
695        if !query.mode().is_delete() {
696            return Err(QueryError::unsupported_query(
697                "delete count execution requires delete query mode",
698            ));
699        }
700
701        // Phase 2: compile typed delete intent into one prepared execution-plan contract.
702        let plan = self
703            .compile_query_with_visible_indexes(query)?
704            .into_prepared_execution_plan();
705
706        // Phase 3: execute the shared delete core while skipping response-row materialization.
707        self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
708            .map_err(QueryError::execute)
709    }
710
711    /// Execute one scalar query from one pre-built prepared execution contract.
712    ///
713    /// This is the shared compiled-plan entry boundary used by the typed
714    /// `execute_query(...)` surface and adjacent query execution facades.
715    pub(in crate::db) fn execute_query_dyn<E>(
716        &self,
717        mode: QueryMode,
718        plan: PreparedExecutionPlan<E>,
719    ) -> Result<EntityResponse<E>, QueryError>
720    where
721        E: PersistedRow<Canister = C> + EntityValue,
722    {
723        let result = match mode {
724            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
725            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
726        };
727
728        result.map_err(QueryError::execute)
729    }
730
731    // Shared load-query terminal wrapper: build plan, run under metrics, map
732    // execution errors into query-facing errors.
733    pub(in crate::db) fn execute_load_query_with<E, T>(
734        &self,
735        query: &Query<E>,
736        op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
737    ) -> Result<T, QueryError>
738    where
739        E: PersistedRow<Canister = C> + EntityValue,
740    {
741        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
742
743        self.with_metrics(|| op(self.load_executor::<E>(), plan))
744            .map_err(QueryError::execute)
745    }
746
747    /// Build one trace payload for a query without executing it.
748    ///
749    /// This lightweight surface is intended for developer diagnostics:
750    /// plan hash, access strategy summary, and planner/executor route shape.
751    pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
752    where
753        E: EntityKind<Canister = C>,
754    {
755        let compiled = self.compile_query_with_visible_indexes(query)?;
756        let explain = compiled.explain();
757        let plan_hash = compiled.plan_hash_hex();
758
759        let (executable, _) =
760            self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
761        let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
762        let execution_family = match query.mode() {
763            QueryMode::Load(_) => Some(executable.execution_family().map_err(QueryError::execute)?),
764            QueryMode::Delete(_) => None,
765        };
766
767        Ok(QueryTracePlan::new(
768            plan_hash,
769            access_strategy,
770            execution_family,
771            explain,
772        ))
773    }
774
775    /// Execute one scalar paged load query and return optional continuation cursor plus trace.
776    pub(crate) fn execute_load_query_paged_with_trace<E>(
777        &self,
778        query: &Query<E>,
779        cursor_token: Option<&str>,
780    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
781    where
782        E: PersistedRow<Canister = C> + EntityValue,
783    {
784        // Phase 1: build/validate prepared execution plan and reject grouped plans.
785        let plan = self
786            .cached_prepared_query_plan_for_entity::<E>(query.structural())?
787            .0;
788        Self::ensure_scalar_paged_execution_family(
789            plan.execution_family().map_err(QueryError::execute)?,
790        )?;
791
792        // Phase 2: decode external cursor token and validate it against plan surface.
793        let cursor_bytes = decode_optional_cursor_token(cursor_token)
794            .map_err(QueryError::from_cursor_plan_error)?;
795        let cursor = plan
796            .prepare_cursor(cursor_bytes.as_deref())
797            .map_err(QueryError::from_executor_plan_error)?;
798
799        // Phase 3: execute one traced page and encode outbound continuation token.
800        let (page, trace) = self
801            .with_metrics(|| {
802                self.load_executor::<E>()
803                    .execute_paged_with_cursor_traced(plan, cursor)
804            })
805            .map_err(QueryError::execute)?;
806        let next_cursor = page
807            .next_cursor
808            .map(|token| {
809                let Some(token) = token.as_scalar() else {
810                    return Err(QueryError::scalar_paged_emitted_grouped_continuation());
811                };
812
813                token.encode().map_err(|err| {
814                    QueryError::serialize_internal(format!(
815                        "failed to serialize continuation cursor: {err}"
816                    ))
817                })
818            })
819            .transpose()?;
820
821        Ok(PagedLoadExecutionWithTrace::new(
822            page.items,
823            next_cursor,
824            trace,
825        ))
826    }
827
828    /// Execute one grouped query page with optional grouped continuation cursor.
829    ///
830    /// This is the explicit grouped execution boundary; scalar load APIs reject
831    /// grouped plans to preserve scalar response contracts.
832    pub(in crate::db) fn execute_grouped<E>(
833        &self,
834        query: &Query<E>,
835        cursor_token: Option<&str>,
836    ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
837    where
838        E: PersistedRow<Canister = C> + EntityValue,
839    {
840        let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
841        let next_cursor = page
842            .next_cursor
843            .map(|token| {
844                let Some(token) = token.as_grouped() else {
845                    return Err(QueryError::grouped_paged_emitted_scalar_continuation());
846                };
847
848                token.encode().map_err(|err| {
849                    QueryError::serialize_internal(format!(
850                        "failed to serialize grouped continuation cursor: {err}"
851                    ))
852                })
853            })
854            .transpose()?;
855
856        Ok(PagedGroupedExecutionWithTrace::new(
857            page.rows,
858            next_cursor,
859            trace,
860        ))
861    }
862
863    // Execute the canonical grouped query core and return the raw grouped page
864    // plus optional execution trace before outward cursor formatting.
865    fn execute_grouped_page_with_trace<E>(
866        &self,
867        query: &Query<E>,
868        cursor_token: Option<&str>,
869    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
870    where
871        E: PersistedRow<Canister = C> + EntityValue,
872    {
873        // Phase 1: build the prepared execution plan once from the typed query.
874        let plan = self
875            .cached_prepared_query_plan_for_entity::<E>(query.structural())?
876            .0;
877
878        // Phase 2: reuse the shared prepared grouped execution path.
879        self.execute_grouped_plan_with_trace(plan, cursor_token)
880    }
881
882    // Execute one grouped prepared plan page with optional grouped cursor.
883    fn execute_grouped_plan_with_trace<E>(
884        &self,
885        plan: PreparedExecutionPlan<E>,
886        cursor_token: Option<&str>,
887    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
888    where
889        E: PersistedRow<Canister = C> + EntityValue,
890    {
891        // Phase 1: validate the prepared plan shape before decoding cursors.
892        Self::ensure_grouped_execution_family(
893            plan.execution_family().map_err(QueryError::execute)?,
894        )?;
895
896        // Phase 2: decode external grouped cursor token and validate against plan.
897        let cursor = decode_optional_grouped_cursor_token(cursor_token)
898            .map_err(QueryError::from_cursor_plan_error)?;
899        let cursor = plan
900            .prepare_grouped_cursor_token(cursor)
901            .map_err(QueryError::from_executor_plan_error)?;
902
903        // Phase 3: execute one grouped page while preserving the structural
904        // grouped cursor payload for whichever outward cursor format the caller needs.
905        self.with_metrics(|| {
906            self.load_executor::<E>()
907                .execute_grouped_paged_with_cursor_traced(plan, cursor)
908        })
909        .map_err(QueryError::execute)
910    }
911}
912
913impl QueryPlanCacheKey {
914    fn for_authority_with_normalized_predicate(
915        authority: crate::db::executor::EntityAuthority,
916        schema_fingerprint: CommitSchemaFingerprint,
917        visibility: QueryPlanVisibility,
918        query: &StructuralQuery,
919        normalized_predicate: Option<&crate::db::predicate::Predicate>,
920    ) -> Self {
921        Self::for_authority_with_normalized_predicate_and_method_version(
922            authority,
923            schema_fingerprint,
924            visibility,
925            query,
926            normalized_predicate,
927            SHARED_QUERY_PLAN_CACHE_METHOD_VERSION,
928        )
929    }
930
931    // Assemble the canonical cache-key shell once so the test and
932    // normalized-predicate constructors only decide which structural query key
933    // they feed into the shared session cache identity.
934    const fn from_authority_parts(
935        authority: crate::db::executor::EntityAuthority,
936        schema_fingerprint: CommitSchemaFingerprint,
937        visibility: QueryPlanVisibility,
938        structural_query: crate::db::query::intent::StructuralQueryCacheKey,
939        cache_method_version: u8,
940    ) -> Self {
941        Self {
942            cache_method_version,
943            entity_path: authority.entity_path(),
944            schema_fingerprint,
945            visibility,
946            structural_query,
947        }
948    }
949
950    #[cfg(test)]
951    fn for_authority_with_method_version(
952        authority: crate::db::executor::EntityAuthority,
953        schema_fingerprint: CommitSchemaFingerprint,
954        visibility: QueryPlanVisibility,
955        query: &StructuralQuery,
956        cache_method_version: u8,
957    ) -> Self {
958        Self::from_authority_parts(
959            authority,
960            schema_fingerprint,
961            visibility,
962            query.structural_cache_key(),
963            cache_method_version,
964        )
965    }
966
967    fn for_authority_with_normalized_predicate_and_method_version(
968        authority: crate::db::executor::EntityAuthority,
969        schema_fingerprint: CommitSchemaFingerprint,
970        visibility: QueryPlanVisibility,
971        query: &StructuralQuery,
972        normalized_predicate: Option<&crate::db::predicate::Predicate>,
973        cache_method_version: u8,
974    ) -> Self {
975        Self::from_authority_parts(
976            authority,
977            schema_fingerprint,
978            visibility,
979            query.structural_cache_key_with_normalized_predicate(normalized_predicate),
980            cache_method_version,
981        )
982    }
983}