Skip to main content

icydb_core/db/session/
query.rs

1//! Module: db::session::query
2//! Responsibility: session-bound query planning, explain, and cursor execution
3//! helpers that recover store visibility before delegating to query-owned logic.
4//! Does not own: query intent construction or executor runtime semantics.
5//! Boundary: resolves session visibility and cursor policy before handing work to the planner/executor.
6
7#[cfg(feature = "perf-attribution")]
8use crate::db::executor::ScalarExecutePhaseAttribution;
9use crate::{
10    db::{
11        DbSession, EntityResponse, LoadQueryResult, PagedGroupedExecutionWithTrace,
12        PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
13        access::AccessStrategy,
14        commit::CommitSchemaFingerprint,
15        cursor::{
16            CursorPlanError, decode_optional_cursor_token, decode_optional_grouped_cursor_token,
17        },
18        diagnostics::ExecutionTrace,
19        executor::{
20            ExecutionFamily, GroupedCursorPage, LoadExecutor, PreparedExecutionPlan,
21            SharedPreparedExecutionPlan,
22        },
23        query::builder::{
24            PreparedFluentAggregateExplainStrategy, PreparedFluentProjectionStrategy,
25        },
26        query::explain::{
27            ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
28        },
29        query::{
30            intent::{CompiledQuery, PlannedQuery, StructuralQuery},
31            plan::{AccessPlannedQuery, QueryMode, VisibleIndexes},
32        },
33    },
34    error::InternalError,
35    model::entity::EntityModel,
36    traits::{CanisterKind, EntityKind, EntityValue, Path},
37};
38#[cfg(feature = "perf-attribution")]
39use candid::CandidType;
40use icydb_utils::Xxh3;
41#[cfg(feature = "perf-attribution")]
42use serde::Deserialize;
43use std::{cell::RefCell, collections::HashMap, hash::BuildHasherDefault};
44
45type CacheBuildHasher = BuildHasherDefault<Xxh3>;
46
47// Bump this when the shared lower query-plan cache key meaning changes in a
48// way that must force old in-heap entries to miss instead of aliasing.
49const SHARED_QUERY_PLAN_CACHE_METHOD_VERSION: u8 = 1;
50
51#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
52pub(in crate::db) enum QueryPlanVisibility {
53    StoreNotReady,
54    StoreReady,
55}
56
57#[derive(Clone, Debug, Eq, Hash, PartialEq)]
58pub(in crate::db) struct QueryPlanCacheKey {
59    cache_method_version: u8,
60    entity_path: &'static str,
61    schema_fingerprint: CommitSchemaFingerprint,
62    visibility: QueryPlanVisibility,
63    structural_query: crate::db::query::intent::StructuralQueryCacheKey,
64}
65
66#[derive(Clone, Debug)]
67pub(in crate::db) struct QueryPlanCacheEntry {
68    logical_plan: AccessPlannedQuery,
69    prepared_plan: SharedPreparedExecutionPlan,
70}
71
72impl QueryPlanCacheEntry {
73    #[must_use]
74    pub(in crate::db) const fn new(
75        logical_plan: AccessPlannedQuery,
76        prepared_plan: SharedPreparedExecutionPlan,
77    ) -> Self {
78        Self {
79            logical_plan,
80            prepared_plan,
81        }
82    }
83
84    #[must_use]
85    pub(in crate::db) const fn logical_plan(&self) -> &AccessPlannedQuery {
86        &self.logical_plan
87    }
88
89    #[must_use]
90    pub(in crate::db) fn typed_prepared_plan<E: EntityKind>(&self) -> PreparedExecutionPlan<E> {
91        self.prepared_plan.typed_clone::<E>()
92    }
93}
94
95pub(in crate::db) type QueryPlanCache =
96    HashMap<QueryPlanCacheKey, QueryPlanCacheEntry, CacheBuildHasher>;
97
98thread_local! {
99    // Keep one in-heap query-plan cache per store registry so fresh `DbSession`
100    // facades can share prepared logical plans across update/query calls while
101    // tests and multi-registry host processes remain isolated by registry
102    // identity.
103    static QUERY_PLAN_CACHES: RefCell<HashMap<usize, QueryPlanCache, CacheBuildHasher>> =
104        RefCell::new(HashMap::default());
105}
106
107#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
108pub(in crate::db) struct QueryPlanCacheAttribution {
109    pub hits: u64,
110    pub misses: u64,
111}
112
113impl QueryPlanCacheAttribution {
114    #[must_use]
115    const fn hit() -> Self {
116        Self { hits: 1, misses: 0 }
117    }
118
119    #[must_use]
120    const fn miss() -> Self {
121        Self { hits: 0, misses: 1 }
122    }
123}
124
125///
126/// QueryExecutionAttribution
127///
128/// QueryExecutionAttribution records the top-level compile/execute split for
129/// typed/fluent query execution at the session boundary.
130///
131#[cfg(feature = "perf-attribution")]
132#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
133pub struct QueryExecutionAttribution {
134    pub compile_local_instructions: u64,
135    pub runtime_local_instructions: u64,
136    pub finalize_local_instructions: u64,
137    pub direct_data_row_scan_local_instructions: u64,
138    pub direct_data_row_key_stream_local_instructions: u64,
139    pub direct_data_row_row_read_local_instructions: u64,
140    pub direct_data_row_key_encode_local_instructions: u64,
141    pub direct_data_row_store_get_local_instructions: u64,
142    pub direct_data_row_order_window_local_instructions: u64,
143    pub direct_data_row_page_window_local_instructions: u64,
144    pub response_decode_local_instructions: u64,
145    pub execute_local_instructions: u64,
146    pub total_local_instructions: u64,
147    pub shared_query_plan_cache_hits: u64,
148    pub shared_query_plan_cache_misses: u64,
149}
150
151#[cfg(feature = "perf-attribution")]
152#[expect(
153    clippy::missing_const_for_fn,
154    reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
155)]
156fn read_query_local_instruction_counter() -> u64 {
157    #[cfg(target_arch = "wasm32")]
158    {
159        canic_cdk::api::performance_counter(1)
160    }
161
162    #[cfg(not(target_arch = "wasm32"))]
163    {
164        0
165    }
166}
167
168#[cfg(feature = "perf-attribution")]
169fn measure_query_stage<T, E>(run: impl FnOnce() -> Result<T, E>) -> (u64, Result<T, E>) {
170    let start = read_query_local_instruction_counter();
171    let result = run();
172    let delta = read_query_local_instruction_counter().saturating_sub(start);
173
174    (delta, result)
175}
176
177impl<C: CanisterKind> DbSession<C> {
178    #[cfg(feature = "perf-attribution")]
179    const fn empty_scalar_execute_phase_attribution() -> ScalarExecutePhaseAttribution {
180        ScalarExecutePhaseAttribution {
181            runtime_local_instructions: 0,
182            finalize_local_instructions: 0,
183            direct_data_row_scan_local_instructions: 0,
184            direct_data_row_key_stream_local_instructions: 0,
185            direct_data_row_row_read_local_instructions: 0,
186            direct_data_row_key_encode_local_instructions: 0,
187            direct_data_row_store_get_local_instructions: 0,
188            direct_data_row_order_window_local_instructions: 0,
189            direct_data_row_page_window_local_instructions: 0,
190        }
191    }
192
193    fn query_plan_cache_scope_id(&self) -> usize {
194        self.db.cache_scope_id()
195    }
196
197    fn with_query_plan_cache<R>(&self, f: impl FnOnce(&mut QueryPlanCache) -> R) -> R {
198        let scope_id = self.query_plan_cache_scope_id();
199
200        QUERY_PLAN_CACHES.with(|caches| {
201            let mut caches = caches.borrow_mut();
202            let cache = caches.entry(scope_id).or_default();
203
204            f(cache)
205        })
206    }
207
208    const fn visible_indexes_for_model(
209        model: &'static EntityModel,
210        visibility: QueryPlanVisibility,
211    ) -> VisibleIndexes<'static> {
212        match visibility {
213            QueryPlanVisibility::StoreReady => VisibleIndexes::planner_visible(model.indexes()),
214            QueryPlanVisibility::StoreNotReady => VisibleIndexes::none(),
215        }
216    }
217
218    #[cfg(test)]
219    pub(in crate::db) fn query_plan_cache_len(&self) -> usize {
220        self.with_query_plan_cache(|cache| cache.len())
221    }
222
223    #[cfg(test)]
224    pub(in crate::db) fn clear_query_plan_cache_for_tests(&self) {
225        self.with_query_plan_cache(QueryPlanCache::clear);
226    }
227
228    pub(in crate::db) fn query_plan_visibility_for_store_path(
229        &self,
230        store_path: &'static str,
231    ) -> Result<QueryPlanVisibility, QueryError> {
232        let store = self
233            .db
234            .recovered_store(store_path)
235            .map_err(QueryError::execute)?;
236        let visibility = if store.index_state() == crate::db::IndexState::Ready {
237            QueryPlanVisibility::StoreReady
238        } else {
239            QueryPlanVisibility::StoreNotReady
240        };
241
242        Ok(visibility)
243    }
244
245    pub(in crate::db) fn cached_query_plan_entry_for_authority(
246        &self,
247        authority: crate::db::executor::EntityAuthority,
248        schema_fingerprint: CommitSchemaFingerprint,
249        query: &StructuralQuery,
250    ) -> Result<(QueryPlanCacheEntry, QueryPlanCacheAttribution), QueryError> {
251        let visibility = self.query_plan_visibility_for_store_path(authority.store_path())?;
252        let cache_key =
253            QueryPlanCacheKey::for_authority(authority, schema_fingerprint, visibility, query);
254
255        {
256            let cached = self.with_query_plan_cache(|cache| cache.get(&cache_key).cloned());
257            if let Some(entry) = cached {
258                return Ok((entry, QueryPlanCacheAttribution::hit()));
259            }
260        }
261
262        let visible_indexes = Self::visible_indexes_for_model(authority.model(), visibility);
263        let plan = query.build_plan_with_visible_indexes(&visible_indexes)?;
264        let entry = QueryPlanCacheEntry::new(
265            plan.clone(),
266            SharedPreparedExecutionPlan::from_plan(authority, plan),
267        );
268        self.with_query_plan_cache(|cache| {
269            cache.insert(cache_key, entry.clone());
270        });
271
272        Ok((entry, QueryPlanCacheAttribution::miss()))
273    }
274
275    #[cfg(test)]
276    pub(in crate::db) fn query_plan_cache_key_for_tests(
277        authority: crate::db::executor::EntityAuthority,
278        schema_fingerprint: CommitSchemaFingerprint,
279        visibility: QueryPlanVisibility,
280        query: &StructuralQuery,
281        cache_method_version: u8,
282    ) -> QueryPlanCacheKey {
283        QueryPlanCacheKey::for_authority_with_method_version(
284            authority,
285            schema_fingerprint,
286            visibility,
287            query,
288            cache_method_version,
289        )
290    }
291
292    pub(in crate::db) fn cached_structural_plan_for_authority(
293        &self,
294        authority: crate::db::executor::EntityAuthority,
295        schema_fingerprint: CommitSchemaFingerprint,
296        query: &StructuralQuery,
297    ) -> Result<AccessPlannedQuery, QueryError> {
298        let (entry, _) =
299            self.cached_query_plan_entry_for_authority(authority, schema_fingerprint, query)?;
300
301        Ok(entry.logical_plan().clone())
302    }
303
304    // Resolve the planner-visible index slice for one typed query exactly once
305    // at the session boundary before handing execution/planning off to query-owned logic.
306    fn with_query_visible_indexes<E, T>(
307        &self,
308        query: &Query<E>,
309        op: impl FnOnce(
310            &Query<E>,
311            &crate::db::query::plan::VisibleIndexes<'static>,
312        ) -> Result<T, QueryError>,
313    ) -> Result<T, QueryError>
314    where
315        E: EntityKind<Canister = C>,
316    {
317        let visibility = self.query_plan_visibility_for_store_path(E::Store::PATH)?;
318        let visible_indexes = Self::visible_indexes_for_model(E::MODEL, visibility);
319
320        op(query, &visible_indexes)
321    }
322
323    // Resolve one typed structural query onto the shared lower plan cache so
324    // typed/fluent callers do not each duplicate the entity metadata plumbing.
325    fn cached_structural_plan_for_entity<E>(
326        &self,
327        query: &StructuralQuery,
328    ) -> Result<AccessPlannedQuery, QueryError>
329    where
330        E: EntityKind<Canister = C>,
331    {
332        self.cached_structural_plan_for_authority(
333            crate::db::executor::EntityAuthority::for_type::<E>(),
334            crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
335            query,
336        )
337    }
338
339    pub(in crate::db::session) fn cached_prepared_query_plan_for_entity<E>(
340        &self,
341        query: &StructuralQuery,
342    ) -> Result<(PreparedExecutionPlan<E>, QueryPlanCacheAttribution), QueryError>
343    where
344        E: EntityKind<Canister = C>,
345    {
346        let (entry, attribution) = self.cached_query_plan_entry_for_authority(
347            crate::db::executor::EntityAuthority::for_type::<E>(),
348            crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
349            query,
350        )?;
351
352        Ok((entry.typed_prepared_plan::<E>(), attribution))
353    }
354
355    // Compile one typed query using only the indexes currently visible for the
356    // query's recovered store.
357    pub(in crate::db) fn compile_query_with_visible_indexes<E>(
358        &self,
359        query: &Query<E>,
360    ) -> Result<CompiledQuery<E>, QueryError>
361    where
362        E: EntityKind<Canister = C>,
363    {
364        let plan = self.cached_structural_plan_for_entity::<E>(query.structural())?;
365
366        Ok(Query::<E>::compiled_query_from_plan(plan))
367    }
368
369    // Build one logical planned-query shell using only the indexes currently
370    // visible for the query's recovered store.
371    pub(in crate::db) fn planned_query_with_visible_indexes<E>(
372        &self,
373        query: &Query<E>,
374    ) -> Result<PlannedQuery<E>, QueryError>
375    where
376        E: EntityKind<Canister = C>,
377    {
378        let plan = self.cached_structural_plan_for_entity::<E>(query.structural())?;
379
380        Ok(Query::<E>::planned_query_from_plan(plan))
381    }
382
383    // Project one logical explain payload using only planner-visible indexes.
384    pub(in crate::db) fn explain_query_with_visible_indexes<E>(
385        &self,
386        query: &Query<E>,
387    ) -> Result<ExplainPlan, QueryError>
388    where
389        E: EntityKind<Canister = C>,
390    {
391        self.with_query_visible_indexes(query, |query, visible_indexes| {
392            query.explain_with_visible_indexes(visible_indexes)
393        })
394    }
395
396    // Hash one typed query plan using only the indexes currently visible for
397    // the query's recovered store.
398    pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
399        &self,
400        query: &Query<E>,
401    ) -> Result<String, QueryError>
402    where
403        E: EntityKind<Canister = C>,
404    {
405        self.with_query_visible_indexes(query, |query, visible_indexes| {
406            query.plan_hash_hex_with_visible_indexes(visible_indexes)
407        })
408    }
409
410    // Explain one load execution shape using only planner-visible
411    // indexes from the recovered store state.
412    pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
413        &self,
414        query: &Query<E>,
415    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
416    where
417        E: EntityValue + EntityKind<Canister = C>,
418    {
419        self.with_query_visible_indexes(query, |query, visible_indexes| {
420            query.explain_execution_with_visible_indexes(visible_indexes)
421        })
422    }
423
424    // Render one load execution descriptor as deterministic text using
425    // only planner-visible indexes from the recovered store state.
426    pub(in crate::db) fn explain_query_execution_text_with_visible_indexes<E>(
427        &self,
428        query: &Query<E>,
429    ) -> Result<String, QueryError>
430    where
431        E: EntityValue + EntityKind<Canister = C>,
432    {
433        self.with_query_visible_indexes(query, |query, visible_indexes| {
434            query.explain_execution_text_with_visible_indexes(visible_indexes)
435        })
436    }
437
438    // Render one load execution descriptor as canonical JSON using
439    // only planner-visible indexes from the recovered store state.
440    pub(in crate::db) fn explain_query_execution_json_with_visible_indexes<E>(
441        &self,
442        query: &Query<E>,
443    ) -> Result<String, QueryError>
444    where
445        E: EntityValue + EntityKind<Canister = C>,
446    {
447        self.with_query_visible_indexes(query, |query, visible_indexes| {
448            query.explain_execution_json_with_visible_indexes(visible_indexes)
449        })
450    }
451
452    // Render one load execution descriptor plus route diagnostics using
453    // only planner-visible indexes from the recovered store state.
454    pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
455        &self,
456        query: &Query<E>,
457    ) -> Result<String, QueryError>
458    where
459        E: EntityValue + EntityKind<Canister = C>,
460    {
461        self.with_query_visible_indexes(query, |query, visible_indexes| {
462            query.explain_execution_verbose_with_visible_indexes(visible_indexes)
463        })
464    }
465
466    // Explain one prepared fluent aggregate terminal using only
467    // planner-visible indexes from the recovered store state.
468    pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
469        &self,
470        query: &Query<E>,
471        strategy: &S,
472    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
473    where
474        E: EntityValue + EntityKind<Canister = C>,
475        S: PreparedFluentAggregateExplainStrategy,
476    {
477        self.with_query_visible_indexes(query, |query, visible_indexes| {
478            query
479                .explain_prepared_aggregate_terminal_with_visible_indexes(visible_indexes, strategy)
480        })
481    }
482
483    // Explain one `bytes_by(field)` terminal using only planner-visible
484    // indexes from the recovered store state.
485    pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
486        &self,
487        query: &Query<E>,
488        target_field: &str,
489    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
490    where
491        E: EntityValue + EntityKind<Canister = C>,
492    {
493        self.with_query_visible_indexes(query, |query, visible_indexes| {
494            query.explain_bytes_by_with_visible_indexes(visible_indexes, target_field)
495        })
496    }
497
498    // Explain one prepared fluent projection terminal using only
499    // planner-visible indexes from the recovered store state.
500    pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
501        &self,
502        query: &Query<E>,
503        strategy: &PreparedFluentProjectionStrategy,
504    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
505    where
506        E: EntityValue + EntityKind<Canister = C>,
507    {
508        self.with_query_visible_indexes(query, |query, visible_indexes| {
509            query.explain_prepared_projection_terminal_with_visible_indexes(
510                visible_indexes,
511                strategy,
512            )
513        })
514    }
515
516    // Validate that one execution strategy is admissible for scalar paged load
517    // execution and fail closed on grouped/primary-key-only routes.
518    fn ensure_scalar_paged_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
519        match family {
520            ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
521                CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
522            )),
523            ExecutionFamily::Ordered => Ok(()),
524            ExecutionFamily::Grouped => Err(QueryError::invariant(
525                "grouped queries execute via execute(), not page().execute()",
526            )),
527        }
528    }
529
530    // Validate that one execution strategy is admissible for the grouped
531    // execution surface.
532    fn ensure_grouped_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
533        match family {
534            ExecutionFamily::Grouped => Ok(()),
535            ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
536                "grouped execution requires grouped logical plans",
537            )),
538        }
539    }
540
541    /// Execute one scalar load/delete query and return materialized response rows.
542    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
543    where
544        E: PersistedRow<Canister = C> + EntityValue,
545    {
546        // Phase 1: compile typed intent into one prepared execution-plan contract.
547        let mode = query.mode();
548        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
549
550        // Phase 2: delegate execution to the shared compiled-plan entry path.
551        self.execute_query_dyn(mode, plan)
552    }
553
554    /// Execute one typed query while reporting the compile/execute split at
555    /// the shared fluent query seam.
556    #[cfg(feature = "perf-attribution")]
557    #[doc(hidden)]
558    pub fn execute_query_result_with_attribution<E>(
559        &self,
560        query: &Query<E>,
561    ) -> Result<(LoadQueryResult<E>, QueryExecutionAttribution), QueryError>
562    where
563        E: PersistedRow<Canister = C> + EntityValue,
564    {
565        // Phase 1: measure compile work at the typed/fluent boundary,
566        // including the shared lower query-plan cache lookup/build exactly
567        // once. This preserves honest hit/miss attribution without
568        // double-building plans on one-shot cache misses.
569        let (compile_local_instructions, plan_and_cache) = measure_query_stage(|| {
570            self.cached_prepared_query_plan_for_entity::<E>(query.structural())
571        });
572        let (plan, cache_attribution) = plan_and_cache?;
573
574        // Phase 2: execute one query result using the prepared plan produced
575        // by the compile/cache boundary above.
576        let (execute_local_instructions, result) = measure_query_stage(|| {
577            if query.has_grouping() {
578                self.execute_grouped_plan_with_trace(plan, None)
579                    .map(|(page, trace)| {
580                        let next_cursor = page
581                            .next_cursor
582                            .map(|token| {
583                                let Some(token) = token.as_grouped() else {
584                                    return Err(
585                                        QueryError::grouped_paged_emitted_scalar_continuation(),
586                                    );
587                                };
588
589                                token.encode().map_err(|err| {
590                                    QueryError::serialize_internal(format!(
591                                        "failed to serialize grouped continuation cursor: {err}"
592                                    ))
593                                })
594                            })
595                            .transpose()?;
596
597                        Ok::<(LoadQueryResult<E>, ScalarExecutePhaseAttribution, u64), QueryError>(
598                            (
599                                LoadQueryResult::Grouped(PagedGroupedExecutionWithTrace::new(
600                                    page.rows,
601                                    next_cursor,
602                                    trace,
603                                )),
604                                Self::empty_scalar_execute_phase_attribution(),
605                                0,
606                            ),
607                        )
608                    })?
609            } else {
610                match query.mode() {
611                    QueryMode::Load(_) => {
612                        let (rows, phase_attribution, response_decode_local_instructions) = self
613                            .load_executor::<E>()
614                            .execute_with_phase_attribution(plan)
615                            .map_err(QueryError::execute)?;
616
617                        Ok::<(LoadQueryResult<E>, ScalarExecutePhaseAttribution, u64), QueryError>(
618                            (
619                                LoadQueryResult::Rows(rows),
620                                phase_attribution,
621                                response_decode_local_instructions,
622                            ),
623                        )
624                    }
625                    QueryMode::Delete(_) => {
626                        let result = self.execute_query_dyn(query.mode(), plan)?;
627
628                        Ok((
629                            LoadQueryResult::Rows(result),
630                            Self::empty_scalar_execute_phase_attribution(),
631                            0,
632                        ))
633                    }
634                }
635            }
636        });
637        let (result, execute_phase_attribution, response_decode_local_instructions) = result?;
638        let total_local_instructions =
639            compile_local_instructions.saturating_add(execute_local_instructions);
640
641        Ok((
642            result,
643            QueryExecutionAttribution {
644                compile_local_instructions,
645                runtime_local_instructions: execute_phase_attribution.runtime_local_instructions,
646                finalize_local_instructions: execute_phase_attribution.finalize_local_instructions,
647                direct_data_row_scan_local_instructions: execute_phase_attribution
648                    .direct_data_row_scan_local_instructions,
649                direct_data_row_key_stream_local_instructions: execute_phase_attribution
650                    .direct_data_row_key_stream_local_instructions,
651                direct_data_row_row_read_local_instructions: execute_phase_attribution
652                    .direct_data_row_row_read_local_instructions,
653                direct_data_row_key_encode_local_instructions: execute_phase_attribution
654                    .direct_data_row_key_encode_local_instructions,
655                direct_data_row_store_get_local_instructions: execute_phase_attribution
656                    .direct_data_row_store_get_local_instructions,
657                direct_data_row_order_window_local_instructions: execute_phase_attribution
658                    .direct_data_row_order_window_local_instructions,
659                direct_data_row_page_window_local_instructions: execute_phase_attribution
660                    .direct_data_row_page_window_local_instructions,
661                response_decode_local_instructions,
662                execute_local_instructions,
663                total_local_instructions,
664                shared_query_plan_cache_hits: cache_attribution.hits,
665                shared_query_plan_cache_misses: cache_attribution.misses,
666            },
667        ))
668    }
669
670    // Execute one typed query through the unified row/grouped result surface so
671    // higher layers do not need to branch on grouped shape themselves.
672    #[doc(hidden)]
673    pub fn execute_query_result<E>(
674        &self,
675        query: &Query<E>,
676    ) -> Result<LoadQueryResult<E>, QueryError>
677    where
678        E: PersistedRow<Canister = C> + EntityValue,
679    {
680        if query.has_grouping() {
681            return self
682                .execute_grouped(query, None)
683                .map(LoadQueryResult::Grouped);
684        }
685
686        self.execute_query(query).map(LoadQueryResult::Rows)
687    }
688
689    /// Execute one typed delete query and return only the affected-row count.
690    #[doc(hidden)]
691    pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
692    where
693        E: PersistedRow<Canister = C> + EntityValue,
694    {
695        // Phase 1: fail closed if the caller routes a non-delete query here.
696        if !query.mode().is_delete() {
697            return Err(QueryError::unsupported_query(
698                "delete count execution requires delete query mode",
699            ));
700        }
701
702        // Phase 2: compile typed delete intent into one prepared execution-plan contract.
703        let plan = self
704            .compile_query_with_visible_indexes(query)?
705            .into_prepared_execution_plan();
706
707        // Phase 3: execute the shared delete core while skipping response-row materialization.
708        self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
709            .map_err(QueryError::execute)
710    }
711
712    /// Execute one scalar query from one pre-built prepared execution contract.
713    ///
714    /// This is the shared compiled-plan entry boundary used by the typed
715    /// `execute_query(...)` surface and adjacent query execution facades.
716    pub(in crate::db) fn execute_query_dyn<E>(
717        &self,
718        mode: QueryMode,
719        plan: PreparedExecutionPlan<E>,
720    ) -> Result<EntityResponse<E>, QueryError>
721    where
722        E: PersistedRow<Canister = C> + EntityValue,
723    {
724        let result = match mode {
725            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
726            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
727        };
728
729        result.map_err(QueryError::execute)
730    }
731
732    // Shared load-query terminal wrapper: build plan, run under metrics, map
733    // execution errors into query-facing errors.
734    pub(in crate::db) fn execute_load_query_with<E, T>(
735        &self,
736        query: &Query<E>,
737        op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
738    ) -> Result<T, QueryError>
739    where
740        E: PersistedRow<Canister = C> + EntityValue,
741    {
742        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
743
744        self.with_metrics(|| op(self.load_executor::<E>(), plan))
745            .map_err(QueryError::execute)
746    }
747
748    /// Build one trace payload for a query without executing it.
749    ///
750    /// This lightweight surface is intended for developer diagnostics:
751    /// plan hash, access strategy summary, and planner/executor route shape.
752    pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
753    where
754        E: EntityKind<Canister = C>,
755    {
756        let compiled = self.compile_query_with_visible_indexes(query)?;
757        let explain = compiled.explain();
758        let plan_hash = compiled.plan_hash_hex();
759
760        let (executable, _) =
761            self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
762        let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
763        let execution_family = match query.mode() {
764            QueryMode::Load(_) => Some(executable.execution_family().map_err(QueryError::execute)?),
765            QueryMode::Delete(_) => None,
766        };
767
768        Ok(QueryTracePlan::new(
769            plan_hash,
770            access_strategy,
771            execution_family,
772            explain,
773        ))
774    }
775
776    /// Execute one scalar paged load query and return optional continuation cursor plus trace.
777    pub(crate) fn execute_load_query_paged_with_trace<E>(
778        &self,
779        query: &Query<E>,
780        cursor_token: Option<&str>,
781    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
782    where
783        E: PersistedRow<Canister = C> + EntityValue,
784    {
785        // Phase 1: build/validate prepared execution plan and reject grouped plans.
786        let plan = self
787            .cached_prepared_query_plan_for_entity::<E>(query.structural())?
788            .0;
789        Self::ensure_scalar_paged_execution_family(
790            plan.execution_family().map_err(QueryError::execute)?,
791        )?;
792
793        // Phase 2: decode external cursor token and validate it against plan surface.
794        let cursor_bytes = decode_optional_cursor_token(cursor_token)
795            .map_err(QueryError::from_cursor_plan_error)?;
796        let cursor = plan
797            .prepare_cursor(cursor_bytes.as_deref())
798            .map_err(QueryError::from_executor_plan_error)?;
799
800        // Phase 3: execute one traced page and encode outbound continuation token.
801        let (page, trace) = self
802            .with_metrics(|| {
803                self.load_executor::<E>()
804                    .execute_paged_with_cursor_traced(plan, cursor)
805            })
806            .map_err(QueryError::execute)?;
807        let next_cursor = page
808            .next_cursor
809            .map(|token| {
810                let Some(token) = token.as_scalar() else {
811                    return Err(QueryError::scalar_paged_emitted_grouped_continuation());
812                };
813
814                token.encode().map_err(|err| {
815                    QueryError::serialize_internal(format!(
816                        "failed to serialize continuation cursor: {err}"
817                    ))
818                })
819            })
820            .transpose()?;
821
822        Ok(PagedLoadExecutionWithTrace::new(
823            page.items,
824            next_cursor,
825            trace,
826        ))
827    }
828
829    /// Execute one grouped query page with optional grouped continuation cursor.
830    ///
831    /// This is the explicit grouped execution boundary; scalar load APIs reject
832    /// grouped plans to preserve scalar response contracts.
833    pub(in crate::db) fn execute_grouped<E>(
834        &self,
835        query: &Query<E>,
836        cursor_token: Option<&str>,
837    ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
838    where
839        E: PersistedRow<Canister = C> + EntityValue,
840    {
841        let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
842        let next_cursor = page
843            .next_cursor
844            .map(|token| {
845                let Some(token) = token.as_grouped() else {
846                    return Err(QueryError::grouped_paged_emitted_scalar_continuation());
847                };
848
849                token.encode().map_err(|err| {
850                    QueryError::serialize_internal(format!(
851                        "failed to serialize grouped continuation cursor: {err}"
852                    ))
853                })
854            })
855            .transpose()?;
856
857        Ok(PagedGroupedExecutionWithTrace::new(
858            page.rows,
859            next_cursor,
860            trace,
861        ))
862    }
863
864    // Execute the canonical grouped query core and return the raw grouped page
865    // plus optional execution trace before outward cursor formatting.
866    fn execute_grouped_page_with_trace<E>(
867        &self,
868        query: &Query<E>,
869        cursor_token: Option<&str>,
870    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
871    where
872        E: PersistedRow<Canister = C> + EntityValue,
873    {
874        // Phase 1: build the prepared execution plan once from the typed query.
875        let plan = self
876            .cached_prepared_query_plan_for_entity::<E>(query.structural())?
877            .0;
878
879        // Phase 2: reuse the shared prepared grouped execution path.
880        self.execute_grouped_plan_with_trace(plan, cursor_token)
881    }
882
883    // Execute one grouped prepared plan page with optional grouped cursor.
884    fn execute_grouped_plan_with_trace<E>(
885        &self,
886        plan: PreparedExecutionPlan<E>,
887        cursor_token: Option<&str>,
888    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
889    where
890        E: PersistedRow<Canister = C> + EntityValue,
891    {
892        // Phase 1: validate the prepared plan shape before decoding cursors.
893        Self::ensure_grouped_execution_family(
894            plan.execution_family().map_err(QueryError::execute)?,
895        )?;
896
897        // Phase 2: decode external grouped cursor token and validate against plan.
898        let cursor = decode_optional_grouped_cursor_token(cursor_token)
899            .map_err(QueryError::from_cursor_plan_error)?;
900        let cursor = plan
901            .prepare_grouped_cursor_token(cursor)
902            .map_err(QueryError::from_executor_plan_error)?;
903
904        // Phase 3: execute one grouped page while preserving the structural
905        // grouped cursor payload for whichever outward cursor format the caller needs.
906        self.with_metrics(|| {
907            self.load_executor::<E>()
908                .execute_grouped_paged_with_cursor_traced(plan, cursor)
909        })
910        .map_err(QueryError::execute)
911    }
912}
913
914impl QueryPlanCacheKey {
915    fn for_authority(
916        authority: crate::db::executor::EntityAuthority,
917        schema_fingerprint: CommitSchemaFingerprint,
918        visibility: QueryPlanVisibility,
919        query: &StructuralQuery,
920    ) -> Self {
921        Self::for_authority_with_method_version(
922            authority,
923            schema_fingerprint,
924            visibility,
925            query,
926            SHARED_QUERY_PLAN_CACHE_METHOD_VERSION,
927        )
928    }
929
930    fn for_authority_with_method_version(
931        authority: crate::db::executor::EntityAuthority,
932        schema_fingerprint: CommitSchemaFingerprint,
933        visibility: QueryPlanVisibility,
934        query: &StructuralQuery,
935        cache_method_version: u8,
936    ) -> Self {
937        Self {
938            cache_method_version,
939            entity_path: authority.entity_path(),
940            schema_fingerprint,
941            visibility,
942            structural_query: query.structural_cache_key(),
943        }
944    }
945}