Skip to main content

icydb_core/db/session/
query.rs

1//! Module: db::session::query
2//! Responsibility: session-bound query planning, explain, and cursor execution
3//! helpers that recover store visibility before delegating to query-owned logic.
4//! Does not own: query intent construction or executor runtime semantics.
5//! Boundary: resolves session visibility and cursor policy before handing work to the planner/executor.
6
7#[cfg(feature = "perf-attribution")]
8use crate::db::executor::ScalarExecutePhaseAttribution;
9use crate::{
10    db::{
11        DbSession, EntityResponse, LoadQueryResult, PagedGroupedExecutionWithTrace,
12        PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
13        access::AccessStrategy,
14        commit::CommitSchemaFingerprint,
15        cursor::{
16            CursorPlanError, decode_optional_cursor_token, decode_optional_grouped_cursor_token,
17        },
18        diagnostics::ExecutionTrace,
19        executor::{
20            ExecutionFamily, GroupedCursorPage, LoadExecutor, PreparedExecutionPlan,
21            SharedPreparedExecutionPlan,
22        },
23        query::builder::{
24            PreparedFluentAggregateExplainStrategy, PreparedFluentProjectionStrategy,
25        },
26        query::explain::{
27            ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
28        },
29        query::{
30            intent::{CompiledQuery, PlannedQuery, StructuralQuery},
31            plan::{AccessPlannedQuery, QueryMode, VisibleIndexes},
32        },
33    },
34    error::InternalError,
35    model::entity::EntityModel,
36    traits::{CanisterKind, EntityKind, EntityValue, Path},
37};
38#[cfg(feature = "perf-attribution")]
39use candid::CandidType;
40use icydb_utils::Xxh3;
41#[cfg(feature = "perf-attribution")]
42use serde::Deserialize;
43use std::{cell::RefCell, collections::HashMap, hash::BuildHasherDefault};
44
45type CacheBuildHasher = BuildHasherDefault<Xxh3>;
46
47// Bump this when the shared lower query-plan cache key meaning changes in a
48// way that must force old in-heap entries to miss instead of aliasing.
49const SHARED_QUERY_PLAN_CACHE_METHOD_VERSION: u8 = 1;
50
51#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
52pub(in crate::db) enum QueryPlanVisibility {
53    StoreNotReady,
54    StoreReady,
55}
56
57#[derive(Clone, Debug, Eq, Hash, PartialEq)]
58pub(in crate::db) struct QueryPlanCacheKey {
59    cache_method_version: u8,
60    entity_path: &'static str,
61    schema_fingerprint: CommitSchemaFingerprint,
62    visibility: QueryPlanVisibility,
63    structural_query: crate::db::query::intent::StructuralQueryCacheKey,
64}
65
66#[derive(Clone, Debug)]
67pub(in crate::db) struct QueryPlanCacheEntry {
68    logical_plan: AccessPlannedQuery,
69    prepared_plan: SharedPreparedExecutionPlan,
70}
71
72impl QueryPlanCacheEntry {
73    #[must_use]
74    pub(in crate::db) const fn new(
75        logical_plan: AccessPlannedQuery,
76        prepared_plan: SharedPreparedExecutionPlan,
77    ) -> Self {
78        Self {
79            logical_plan,
80            prepared_plan,
81        }
82    }
83
84    #[must_use]
85    pub(in crate::db) const fn logical_plan(&self) -> &AccessPlannedQuery {
86        &self.logical_plan
87    }
88
89    #[must_use]
90    pub(in crate::db) fn typed_prepared_plan<E: EntityKind>(&self) -> PreparedExecutionPlan<E> {
91        self.prepared_plan.typed_clone::<E>()
92    }
93}
94
95pub(in crate::db) type QueryPlanCache =
96    HashMap<QueryPlanCacheKey, QueryPlanCacheEntry, CacheBuildHasher>;
97
98thread_local! {
99    // Keep one in-heap query-plan cache per store registry so fresh `DbSession`
100    // facades can share prepared logical plans across update/query calls while
101    // tests and multi-registry host processes remain isolated by registry
102    // identity.
103    static QUERY_PLAN_CACHES: RefCell<HashMap<usize, QueryPlanCache, CacheBuildHasher>> =
104        RefCell::new(HashMap::default());
105}
106
107#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
108pub(in crate::db) struct QueryPlanCacheAttribution {
109    pub hits: u64,
110    pub misses: u64,
111}
112
113impl QueryPlanCacheAttribution {
114    #[must_use]
115    const fn hit() -> Self {
116        Self { hits: 1, misses: 0 }
117    }
118
119    #[must_use]
120    const fn miss() -> Self {
121        Self { hits: 0, misses: 1 }
122    }
123}
124
125///
126/// QueryExecutionAttribution
127///
128/// QueryExecutionAttribution records the top-level compile/execute split for
129/// typed/fluent query execution at the session boundary.
130///
131#[cfg(feature = "perf-attribution")]
132#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
133pub struct QueryExecutionAttribution {
134    pub compile_local_instructions: u64,
135    pub runtime_local_instructions: u64,
136    pub finalize_local_instructions: u64,
137    pub direct_data_row_scan_local_instructions: u64,
138    pub direct_data_row_key_stream_local_instructions: u64,
139    pub direct_data_row_row_read_local_instructions: u64,
140    pub direct_data_row_key_encode_local_instructions: u64,
141    pub direct_data_row_store_get_local_instructions: u64,
142    pub direct_data_row_order_window_local_instructions: u64,
143    pub direct_data_row_page_window_local_instructions: u64,
144    pub response_decode_local_instructions: u64,
145    pub execute_local_instructions: u64,
146    pub total_local_instructions: u64,
147    pub shared_query_plan_cache_hits: u64,
148    pub shared_query_plan_cache_misses: u64,
149}
150
151#[cfg(feature = "perf-attribution")]
152#[expect(
153    clippy::missing_const_for_fn,
154    reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
155)]
156fn read_query_local_instruction_counter() -> u64 {
157    #[cfg(target_arch = "wasm32")]
158    {
159        canic_cdk::api::performance_counter(1)
160    }
161
162    #[cfg(not(target_arch = "wasm32"))]
163    {
164        0
165    }
166}
167
168#[cfg(feature = "perf-attribution")]
169fn measure_query_stage<T, E>(run: impl FnOnce() -> Result<T, E>) -> (u64, Result<T, E>) {
170    let start = read_query_local_instruction_counter();
171    let result = run();
172    let delta = read_query_local_instruction_counter().saturating_sub(start);
173
174    (delta, result)
175}
176
177impl<C: CanisterKind> DbSession<C> {
178    #[cfg(feature = "perf-attribution")]
179    const fn empty_scalar_execute_phase_attribution() -> ScalarExecutePhaseAttribution {
180        ScalarExecutePhaseAttribution {
181            runtime_local_instructions: 0,
182            finalize_local_instructions: 0,
183            direct_data_row_scan_local_instructions: 0,
184            direct_data_row_key_stream_local_instructions: 0,
185            direct_data_row_row_read_local_instructions: 0,
186            direct_data_row_key_encode_local_instructions: 0,
187            direct_data_row_store_get_local_instructions: 0,
188            direct_data_row_order_window_local_instructions: 0,
189            direct_data_row_page_window_local_instructions: 0,
190        }
191    }
192
193    fn query_plan_cache_scope_id(&self) -> usize {
194        self.db.cache_scope_id()
195    }
196
197    fn with_query_plan_cache<R>(&self, f: impl FnOnce(&mut QueryPlanCache) -> R) -> R {
198        let scope_id = self.query_plan_cache_scope_id();
199
200        QUERY_PLAN_CACHES.with(|caches| {
201            let mut caches = caches.borrow_mut();
202            let cache = caches.entry(scope_id).or_default();
203
204            f(cache)
205        })
206    }
207
208    const fn visible_indexes_for_model(
209        model: &'static EntityModel,
210        visibility: QueryPlanVisibility,
211    ) -> VisibleIndexes<'static> {
212        match visibility {
213            QueryPlanVisibility::StoreReady => VisibleIndexes::planner_visible(model.indexes()),
214            QueryPlanVisibility::StoreNotReady => VisibleIndexes::none(),
215        }
216    }
217
218    #[cfg(test)]
219    pub(in crate::db) fn query_plan_cache_len(&self) -> usize {
220        self.with_query_plan_cache(|cache| cache.len())
221    }
222
223    #[cfg(test)]
224    pub(in crate::db) fn clear_query_plan_cache_for_tests(&self) {
225        self.with_query_plan_cache(QueryPlanCache::clear);
226    }
227
228    pub(in crate::db) fn query_plan_visibility_for_store_path(
229        &self,
230        store_path: &'static str,
231    ) -> Result<QueryPlanVisibility, QueryError> {
232        let store = self
233            .db
234            .recovered_store(store_path)
235            .map_err(QueryError::execute)?;
236        let visibility = if store.index_state() == crate::db::IndexState::Ready {
237            QueryPlanVisibility::StoreReady
238        } else {
239            QueryPlanVisibility::StoreNotReady
240        };
241
242        Ok(visibility)
243    }
244
245    pub(in crate::db) fn cached_query_plan_entry_for_authority(
246        &self,
247        authority: crate::db::executor::EntityAuthority,
248        schema_fingerprint: CommitSchemaFingerprint,
249        query: &StructuralQuery,
250    ) -> Result<(QueryPlanCacheEntry, QueryPlanCacheAttribution), QueryError> {
251        let visibility = self.query_plan_visibility_for_store_path(authority.store_path())?;
252        let visible_indexes = Self::visible_indexes_for_model(authority.model(), visibility);
253        let normalized_predicate = query.prepare_normalized_scalar_predicate()?;
254        let cache_key = QueryPlanCacheKey::for_authority_with_normalized_predicate(
255            authority,
256            schema_fingerprint,
257            visibility,
258            query,
259            normalized_predicate.as_ref(),
260        );
261
262        {
263            let cached = self.with_query_plan_cache(|cache| cache.get(&cache_key).cloned());
264            if let Some(entry) = cached {
265                return Ok((entry, QueryPlanCacheAttribution::hit()));
266            }
267        }
268
269        let plan = query.build_plan_with_visible_indexes_from_normalized_predicate(
270            &visible_indexes,
271            normalized_predicate,
272        )?;
273        let entry = QueryPlanCacheEntry::new(
274            plan.clone(),
275            SharedPreparedExecutionPlan::from_plan(authority, plan),
276        );
277        self.with_query_plan_cache(|cache| {
278            cache.insert(cache_key, entry.clone());
279        });
280
281        Ok((entry, QueryPlanCacheAttribution::miss()))
282    }
283
284    #[cfg(test)]
285    pub(in crate::db) fn query_plan_cache_key_for_tests(
286        authority: crate::db::executor::EntityAuthority,
287        schema_fingerprint: CommitSchemaFingerprint,
288        visibility: QueryPlanVisibility,
289        query: &StructuralQuery,
290        cache_method_version: u8,
291    ) -> QueryPlanCacheKey {
292        QueryPlanCacheKey::for_authority_with_method_version(
293            authority,
294            schema_fingerprint,
295            visibility,
296            query,
297            cache_method_version,
298        )
299    }
300
301    pub(in crate::db) fn cached_structural_plan_for_authority(
302        &self,
303        authority: crate::db::executor::EntityAuthority,
304        schema_fingerprint: CommitSchemaFingerprint,
305        query: &StructuralQuery,
306    ) -> Result<AccessPlannedQuery, QueryError> {
307        let (entry, _) =
308            self.cached_query_plan_entry_for_authority(authority, schema_fingerprint, query)?;
309
310        Ok(entry.logical_plan().clone())
311    }
312
313    // Resolve the planner-visible index slice for one typed query exactly once
314    // at the session boundary before handing execution/planning off to query-owned logic.
315    fn with_query_visible_indexes<E, T>(
316        &self,
317        query: &Query<E>,
318        op: impl FnOnce(
319            &Query<E>,
320            &crate::db::query::plan::VisibleIndexes<'static>,
321        ) -> Result<T, QueryError>,
322    ) -> Result<T, QueryError>
323    where
324        E: EntityKind<Canister = C>,
325    {
326        let visibility = self.query_plan_visibility_for_store_path(E::Store::PATH)?;
327        let visible_indexes = Self::visible_indexes_for_model(E::MODEL, visibility);
328
329        op(query, &visible_indexes)
330    }
331
332    // Resolve one typed structural query onto the shared lower plan cache so
333    // typed/fluent callers do not each duplicate the entity metadata plumbing.
334    fn cached_structural_plan_for_entity<E>(
335        &self,
336        query: &StructuralQuery,
337    ) -> Result<AccessPlannedQuery, QueryError>
338    where
339        E: EntityKind<Canister = C>,
340    {
341        self.cached_structural_plan_for_authority(
342            crate::db::executor::EntityAuthority::for_type::<E>(),
343            crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
344            query,
345        )
346    }
347
348    pub(in crate::db::session) fn cached_prepared_query_plan_for_entity<E>(
349        &self,
350        query: &StructuralQuery,
351    ) -> Result<(PreparedExecutionPlan<E>, QueryPlanCacheAttribution), QueryError>
352    where
353        E: EntityKind<Canister = C>,
354    {
355        let (entry, attribution) = self.cached_query_plan_entry_for_authority(
356            crate::db::executor::EntityAuthority::for_type::<E>(),
357            crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
358            query,
359        )?;
360
361        Ok((entry.typed_prepared_plan::<E>(), attribution))
362    }
363
364    // Compile one typed query using only the indexes currently visible for the
365    // query's recovered store.
366    pub(in crate::db) fn compile_query_with_visible_indexes<E>(
367        &self,
368        query: &Query<E>,
369    ) -> Result<CompiledQuery<E>, QueryError>
370    where
371        E: EntityKind<Canister = C>,
372    {
373        let plan = self.cached_structural_plan_for_entity::<E>(query.structural())?;
374
375        Ok(Query::<E>::compiled_query_from_plan(plan))
376    }
377
378    // Build one logical planned-query shell using only the indexes currently
379    // visible for the query's recovered store.
380    pub(in crate::db) fn planned_query_with_visible_indexes<E>(
381        &self,
382        query: &Query<E>,
383    ) -> Result<PlannedQuery<E>, QueryError>
384    where
385        E: EntityKind<Canister = C>,
386    {
387        let plan = self.cached_structural_plan_for_entity::<E>(query.structural())?;
388
389        Ok(Query::<E>::planned_query_from_plan(plan))
390    }
391
392    // Project one logical explain payload using only planner-visible indexes.
393    pub(in crate::db) fn explain_query_with_visible_indexes<E>(
394        &self,
395        query: &Query<E>,
396    ) -> Result<ExplainPlan, QueryError>
397    where
398        E: EntityKind<Canister = C>,
399    {
400        self.with_query_visible_indexes(query, |query, visible_indexes| {
401            query.explain_with_visible_indexes(visible_indexes)
402        })
403    }
404
405    // Hash one typed query plan using only the indexes currently visible for
406    // the query's recovered store.
407    pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
408        &self,
409        query: &Query<E>,
410    ) -> Result<String, QueryError>
411    where
412        E: EntityKind<Canister = C>,
413    {
414        self.with_query_visible_indexes(query, |query, visible_indexes| {
415            query.plan_hash_hex_with_visible_indexes(visible_indexes)
416        })
417    }
418
419    // Explain one load execution shape using only planner-visible
420    // indexes from the recovered store state.
421    pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
422        &self,
423        query: &Query<E>,
424    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
425    where
426        E: EntityValue + EntityKind<Canister = C>,
427    {
428        self.with_query_visible_indexes(query, |query, visible_indexes| {
429            query.explain_execution_with_visible_indexes(visible_indexes)
430        })
431    }
432
433    // Render one load execution descriptor as deterministic text using
434    // only planner-visible indexes from the recovered store state.
435    pub(in crate::db) fn explain_query_execution_text_with_visible_indexes<E>(
436        &self,
437        query: &Query<E>,
438    ) -> Result<String, QueryError>
439    where
440        E: EntityValue + EntityKind<Canister = C>,
441    {
442        self.with_query_visible_indexes(query, |query, visible_indexes| {
443            query.explain_execution_text_with_visible_indexes(visible_indexes)
444        })
445    }
446
447    // Render one load execution descriptor as canonical JSON using
448    // only planner-visible indexes from the recovered store state.
449    pub(in crate::db) fn explain_query_execution_json_with_visible_indexes<E>(
450        &self,
451        query: &Query<E>,
452    ) -> Result<String, QueryError>
453    where
454        E: EntityValue + EntityKind<Canister = C>,
455    {
456        self.with_query_visible_indexes(query, |query, visible_indexes| {
457            query.explain_execution_json_with_visible_indexes(visible_indexes)
458        })
459    }
460
461    // Render one load execution descriptor plus route diagnostics using
462    // only planner-visible indexes from the recovered store state.
463    pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
464        &self,
465        query: &Query<E>,
466    ) -> Result<String, QueryError>
467    where
468        E: EntityValue + EntityKind<Canister = C>,
469    {
470        self.with_query_visible_indexes(query, |query, visible_indexes| {
471            query.explain_execution_verbose_with_visible_indexes(visible_indexes)
472        })
473    }
474
475    // Explain one prepared fluent aggregate terminal using only
476    // planner-visible indexes from the recovered store state.
477    pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
478        &self,
479        query: &Query<E>,
480        strategy: &S,
481    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
482    where
483        E: EntityValue + EntityKind<Canister = C>,
484        S: PreparedFluentAggregateExplainStrategy,
485    {
486        self.with_query_visible_indexes(query, |query, visible_indexes| {
487            query
488                .explain_prepared_aggregate_terminal_with_visible_indexes(visible_indexes, strategy)
489        })
490    }
491
492    // Explain one `bytes_by(field)` terminal using only planner-visible
493    // indexes from the recovered store state.
494    pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
495        &self,
496        query: &Query<E>,
497        target_field: &str,
498    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
499    where
500        E: EntityValue + EntityKind<Canister = C>,
501    {
502        self.with_query_visible_indexes(query, |query, visible_indexes| {
503            query.explain_bytes_by_with_visible_indexes(visible_indexes, target_field)
504        })
505    }
506
507    // Explain one prepared fluent projection terminal using only
508    // planner-visible indexes from the recovered store state.
509    pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
510        &self,
511        query: &Query<E>,
512        strategy: &PreparedFluentProjectionStrategy,
513    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
514    where
515        E: EntityValue + EntityKind<Canister = C>,
516    {
517        self.with_query_visible_indexes(query, |query, visible_indexes| {
518            query.explain_prepared_projection_terminal_with_visible_indexes(
519                visible_indexes,
520                strategy,
521            )
522        })
523    }
524
525    // Validate that one execution strategy is admissible for scalar paged load
526    // execution and fail closed on grouped/primary-key-only routes.
527    fn ensure_scalar_paged_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
528        match family {
529            ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
530                CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
531            )),
532            ExecutionFamily::Ordered => Ok(()),
533            ExecutionFamily::Grouped => Err(QueryError::invariant(
534                "grouped queries execute via execute(), not page().execute()",
535            )),
536        }
537    }
538
539    // Validate that one execution strategy is admissible for the grouped
540    // execution surface.
541    fn ensure_grouped_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
542        match family {
543            ExecutionFamily::Grouped => Ok(()),
544            ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
545                "grouped execution requires grouped logical plans",
546            )),
547        }
548    }
549
550    /// Execute one scalar load/delete query and return materialized response rows.
551    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
552    where
553        E: PersistedRow<Canister = C> + EntityValue,
554    {
555        // Phase 1: compile typed intent into one prepared execution-plan contract.
556        let mode = query.mode();
557        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
558
559        // Phase 2: delegate execution to the shared compiled-plan entry path.
560        self.execute_query_dyn(mode, plan)
561    }
562
563    /// Execute one typed query while reporting the compile/execute split at
564    /// the shared fluent query seam.
565    #[cfg(feature = "perf-attribution")]
566    #[doc(hidden)]
567    pub fn execute_query_result_with_attribution<E>(
568        &self,
569        query: &Query<E>,
570    ) -> Result<(LoadQueryResult<E>, QueryExecutionAttribution), QueryError>
571    where
572        E: PersistedRow<Canister = C> + EntityValue,
573    {
574        // Phase 1: measure compile work at the typed/fluent boundary,
575        // including the shared lower query-plan cache lookup/build exactly
576        // once. This preserves honest hit/miss attribution without
577        // double-building plans on one-shot cache misses.
578        let (compile_local_instructions, plan_and_cache) = measure_query_stage(|| {
579            self.cached_prepared_query_plan_for_entity::<E>(query.structural())
580        });
581        let (plan, cache_attribution) = plan_and_cache?;
582
583        // Phase 2: execute one query result using the prepared plan produced
584        // by the compile/cache boundary above.
585        let (execute_local_instructions, result) = measure_query_stage(|| {
586            if query.has_grouping() {
587                self.execute_grouped_plan_with_trace(plan, None)
588                    .map(|(page, trace)| {
589                        let next_cursor = page
590                            .next_cursor
591                            .map(|token| {
592                                let Some(token) = token.as_grouped() else {
593                                    return Err(
594                                        QueryError::grouped_paged_emitted_scalar_continuation(),
595                                    );
596                                };
597
598                                token.encode().map_err(|err| {
599                                    QueryError::serialize_internal(format!(
600                                        "failed to serialize grouped continuation cursor: {err}"
601                                    ))
602                                })
603                            })
604                            .transpose()?;
605
606                        Ok::<(LoadQueryResult<E>, ScalarExecutePhaseAttribution, u64), QueryError>(
607                            (
608                                LoadQueryResult::Grouped(PagedGroupedExecutionWithTrace::new(
609                                    page.rows,
610                                    next_cursor,
611                                    trace,
612                                )),
613                                Self::empty_scalar_execute_phase_attribution(),
614                                0,
615                            ),
616                        )
617                    })?
618            } else {
619                match query.mode() {
620                    QueryMode::Load(_) => {
621                        let (rows, phase_attribution, response_decode_local_instructions) = self
622                            .load_executor::<E>()
623                            .execute_with_phase_attribution(plan)
624                            .map_err(QueryError::execute)?;
625
626                        Ok::<(LoadQueryResult<E>, ScalarExecutePhaseAttribution, u64), QueryError>(
627                            (
628                                LoadQueryResult::Rows(rows),
629                                phase_attribution,
630                                response_decode_local_instructions,
631                            ),
632                        )
633                    }
634                    QueryMode::Delete(_) => {
635                        let result = self.execute_query_dyn(query.mode(), plan)?;
636
637                        Ok((
638                            LoadQueryResult::Rows(result),
639                            Self::empty_scalar_execute_phase_attribution(),
640                            0,
641                        ))
642                    }
643                }
644            }
645        });
646        let (result, execute_phase_attribution, response_decode_local_instructions) = result?;
647        let total_local_instructions =
648            compile_local_instructions.saturating_add(execute_local_instructions);
649
650        Ok((
651            result,
652            QueryExecutionAttribution {
653                compile_local_instructions,
654                runtime_local_instructions: execute_phase_attribution.runtime_local_instructions,
655                finalize_local_instructions: execute_phase_attribution.finalize_local_instructions,
656                direct_data_row_scan_local_instructions: execute_phase_attribution
657                    .direct_data_row_scan_local_instructions,
658                direct_data_row_key_stream_local_instructions: execute_phase_attribution
659                    .direct_data_row_key_stream_local_instructions,
660                direct_data_row_row_read_local_instructions: execute_phase_attribution
661                    .direct_data_row_row_read_local_instructions,
662                direct_data_row_key_encode_local_instructions: execute_phase_attribution
663                    .direct_data_row_key_encode_local_instructions,
664                direct_data_row_store_get_local_instructions: execute_phase_attribution
665                    .direct_data_row_store_get_local_instructions,
666                direct_data_row_order_window_local_instructions: execute_phase_attribution
667                    .direct_data_row_order_window_local_instructions,
668                direct_data_row_page_window_local_instructions: execute_phase_attribution
669                    .direct_data_row_page_window_local_instructions,
670                response_decode_local_instructions,
671                execute_local_instructions,
672                total_local_instructions,
673                shared_query_plan_cache_hits: cache_attribution.hits,
674                shared_query_plan_cache_misses: cache_attribution.misses,
675            },
676        ))
677    }
678
679    // Execute one typed query through the unified row/grouped result surface so
680    // higher layers do not need to branch on grouped shape themselves.
681    #[doc(hidden)]
682    pub fn execute_query_result<E>(
683        &self,
684        query: &Query<E>,
685    ) -> Result<LoadQueryResult<E>, QueryError>
686    where
687        E: PersistedRow<Canister = C> + EntityValue,
688    {
689        if query.has_grouping() {
690            return self
691                .execute_grouped(query, None)
692                .map(LoadQueryResult::Grouped);
693        }
694
695        self.execute_query(query).map(LoadQueryResult::Rows)
696    }
697
698    /// Execute one typed delete query and return only the affected-row count.
699    #[doc(hidden)]
700    pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
701    where
702        E: PersistedRow<Canister = C> + EntityValue,
703    {
704        // Phase 1: fail closed if the caller routes a non-delete query here.
705        if !query.mode().is_delete() {
706            return Err(QueryError::unsupported_query(
707                "delete count execution requires delete query mode",
708            ));
709        }
710
711        // Phase 2: compile typed delete intent into one prepared execution-plan contract.
712        let plan = self
713            .compile_query_with_visible_indexes(query)?
714            .into_prepared_execution_plan();
715
716        // Phase 3: execute the shared delete core while skipping response-row materialization.
717        self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
718            .map_err(QueryError::execute)
719    }
720
721    /// Execute one scalar query from one pre-built prepared execution contract.
722    ///
723    /// This is the shared compiled-plan entry boundary used by the typed
724    /// `execute_query(...)` surface and adjacent query execution facades.
725    pub(in crate::db) fn execute_query_dyn<E>(
726        &self,
727        mode: QueryMode,
728        plan: PreparedExecutionPlan<E>,
729    ) -> Result<EntityResponse<E>, QueryError>
730    where
731        E: PersistedRow<Canister = C> + EntityValue,
732    {
733        let result = match mode {
734            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
735            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
736        };
737
738        result.map_err(QueryError::execute)
739    }
740
741    // Shared load-query terminal wrapper: build plan, run under metrics, map
742    // execution errors into query-facing errors.
743    pub(in crate::db) fn execute_load_query_with<E, T>(
744        &self,
745        query: &Query<E>,
746        op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
747    ) -> Result<T, QueryError>
748    where
749        E: PersistedRow<Canister = C> + EntityValue,
750    {
751        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
752
753        self.with_metrics(|| op(self.load_executor::<E>(), plan))
754            .map_err(QueryError::execute)
755    }
756
757    /// Build one trace payload for a query without executing it.
758    ///
759    /// This lightweight surface is intended for developer diagnostics:
760    /// plan hash, access strategy summary, and planner/executor route shape.
761    pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
762    where
763        E: EntityKind<Canister = C>,
764    {
765        let compiled = self.compile_query_with_visible_indexes(query)?;
766        let explain = compiled.explain();
767        let plan_hash = compiled.plan_hash_hex();
768
769        let (executable, _) =
770            self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
771        let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
772        let execution_family = match query.mode() {
773            QueryMode::Load(_) => Some(executable.execution_family().map_err(QueryError::execute)?),
774            QueryMode::Delete(_) => None,
775        };
776
777        Ok(QueryTracePlan::new(
778            plan_hash,
779            access_strategy,
780            execution_family,
781            explain,
782        ))
783    }
784
785    /// Execute one scalar paged load query and return optional continuation cursor plus trace.
786    pub(crate) fn execute_load_query_paged_with_trace<E>(
787        &self,
788        query: &Query<E>,
789        cursor_token: Option<&str>,
790    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
791    where
792        E: PersistedRow<Canister = C> + EntityValue,
793    {
794        // Phase 1: build/validate prepared execution plan and reject grouped plans.
795        let plan = self
796            .cached_prepared_query_plan_for_entity::<E>(query.structural())?
797            .0;
798        Self::ensure_scalar_paged_execution_family(
799            plan.execution_family().map_err(QueryError::execute)?,
800        )?;
801
802        // Phase 2: decode external cursor token and validate it against plan surface.
803        let cursor_bytes = decode_optional_cursor_token(cursor_token)
804            .map_err(QueryError::from_cursor_plan_error)?;
805        let cursor = plan
806            .prepare_cursor(cursor_bytes.as_deref())
807            .map_err(QueryError::from_executor_plan_error)?;
808
809        // Phase 3: execute one traced page and encode outbound continuation token.
810        let (page, trace) = self
811            .with_metrics(|| {
812                self.load_executor::<E>()
813                    .execute_paged_with_cursor_traced(plan, cursor)
814            })
815            .map_err(QueryError::execute)?;
816        let next_cursor = page
817            .next_cursor
818            .map(|token| {
819                let Some(token) = token.as_scalar() else {
820                    return Err(QueryError::scalar_paged_emitted_grouped_continuation());
821                };
822
823                token.encode().map_err(|err| {
824                    QueryError::serialize_internal(format!(
825                        "failed to serialize continuation cursor: {err}"
826                    ))
827                })
828            })
829            .transpose()?;
830
831        Ok(PagedLoadExecutionWithTrace::new(
832            page.items,
833            next_cursor,
834            trace,
835        ))
836    }
837
838    /// Execute one grouped query page with optional grouped continuation cursor.
839    ///
840    /// This is the explicit grouped execution boundary; scalar load APIs reject
841    /// grouped plans to preserve scalar response contracts.
842    pub(in crate::db) fn execute_grouped<E>(
843        &self,
844        query: &Query<E>,
845        cursor_token: Option<&str>,
846    ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
847    where
848        E: PersistedRow<Canister = C> + EntityValue,
849    {
850        let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
851        let next_cursor = page
852            .next_cursor
853            .map(|token| {
854                let Some(token) = token.as_grouped() else {
855                    return Err(QueryError::grouped_paged_emitted_scalar_continuation());
856                };
857
858                token.encode().map_err(|err| {
859                    QueryError::serialize_internal(format!(
860                        "failed to serialize grouped continuation cursor: {err}"
861                    ))
862                })
863            })
864            .transpose()?;
865
866        Ok(PagedGroupedExecutionWithTrace::new(
867            page.rows,
868            next_cursor,
869            trace,
870        ))
871    }
872
873    // Execute the canonical grouped query core and return the raw grouped page
874    // plus optional execution trace before outward cursor formatting.
875    fn execute_grouped_page_with_trace<E>(
876        &self,
877        query: &Query<E>,
878        cursor_token: Option<&str>,
879    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
880    where
881        E: PersistedRow<Canister = C> + EntityValue,
882    {
883        // Phase 1: build the prepared execution plan once from the typed query.
884        let plan = self
885            .cached_prepared_query_plan_for_entity::<E>(query.structural())?
886            .0;
887
888        // Phase 2: reuse the shared prepared grouped execution path.
889        self.execute_grouped_plan_with_trace(plan, cursor_token)
890    }
891
892    // Execute one grouped prepared plan page with optional grouped cursor.
893    fn execute_grouped_plan_with_trace<E>(
894        &self,
895        plan: PreparedExecutionPlan<E>,
896        cursor_token: Option<&str>,
897    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
898    where
899        E: PersistedRow<Canister = C> + EntityValue,
900    {
901        // Phase 1: validate the prepared plan shape before decoding cursors.
902        Self::ensure_grouped_execution_family(
903            plan.execution_family().map_err(QueryError::execute)?,
904        )?;
905
906        // Phase 2: decode external grouped cursor token and validate against plan.
907        let cursor = decode_optional_grouped_cursor_token(cursor_token)
908            .map_err(QueryError::from_cursor_plan_error)?;
909        let cursor = plan
910            .prepare_grouped_cursor_token(cursor)
911            .map_err(QueryError::from_executor_plan_error)?;
912
913        // Phase 3: execute one grouped page while preserving the structural
914        // grouped cursor payload for whichever outward cursor format the caller needs.
915        self.with_metrics(|| {
916            self.load_executor::<E>()
917                .execute_grouped_paged_with_cursor_traced(plan, cursor)
918        })
919        .map_err(QueryError::execute)
920    }
921}
922
923impl QueryPlanCacheKey {
924    fn for_authority_with_normalized_predicate(
925        authority: crate::db::executor::EntityAuthority,
926        schema_fingerprint: CommitSchemaFingerprint,
927        visibility: QueryPlanVisibility,
928        query: &StructuralQuery,
929        normalized_predicate: Option<&crate::db::predicate::Predicate>,
930    ) -> Self {
931        Self::for_authority_with_normalized_predicate_and_method_version(
932            authority,
933            schema_fingerprint,
934            visibility,
935            query,
936            normalized_predicate,
937            SHARED_QUERY_PLAN_CACHE_METHOD_VERSION,
938        )
939    }
940
941    // Assemble the canonical cache-key shell once so the test and
942    // normalized-predicate constructors only decide which structural query key
943    // they feed into the shared session cache identity.
944    const fn from_authority_parts(
945        authority: crate::db::executor::EntityAuthority,
946        schema_fingerprint: CommitSchemaFingerprint,
947        visibility: QueryPlanVisibility,
948        structural_query: crate::db::query::intent::StructuralQueryCacheKey,
949        cache_method_version: u8,
950    ) -> Self {
951        Self {
952            cache_method_version,
953            entity_path: authority.entity_path(),
954            schema_fingerprint,
955            visibility,
956            structural_query,
957        }
958    }
959
960    #[cfg(test)]
961    fn for_authority_with_method_version(
962        authority: crate::db::executor::EntityAuthority,
963        schema_fingerprint: CommitSchemaFingerprint,
964        visibility: QueryPlanVisibility,
965        query: &StructuralQuery,
966        cache_method_version: u8,
967    ) -> Self {
968        Self::from_authority_parts(
969            authority,
970            schema_fingerprint,
971            visibility,
972            query.structural_cache_key(),
973            cache_method_version,
974        )
975    }
976
977    fn for_authority_with_normalized_predicate_and_method_version(
978        authority: crate::db::executor::EntityAuthority,
979        schema_fingerprint: CommitSchemaFingerprint,
980        visibility: QueryPlanVisibility,
981        query: &StructuralQuery,
982        normalized_predicate: Option<&crate::db::predicate::Predicate>,
983        cache_method_version: u8,
984    ) -> Self {
985        Self::from_authority_parts(
986            authority,
987            schema_fingerprint,
988            visibility,
989            query.structural_cache_key_with_normalized_predicate(normalized_predicate),
990            cache_method_version,
991        )
992    }
993}