Skip to main content

icydb_core/db/session/
query.rs

1//! Module: db::session::query
2//! Responsibility: session-bound query planning, explain, and cursor execution
3//! helpers that recover store visibility before delegating to query-owned logic.
4//! Does not own: query intent construction or executor runtime semantics.
5//! Boundary: resolves session visibility and cursor policy before handing work to the planner/executor.
6
7#[cfg(feature = "perf-attribution")]
8use crate::db::executor::ScalarExecutePhaseAttribution;
9use crate::{
10    db::{
11        DbSession, EntityResponse, LoadQueryResult, PagedGroupedExecutionWithTrace,
12        PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
13        access::AccessStrategy,
14        commit::CommitSchemaFingerprint,
15        cursor::{
16            CursorPlanError, decode_optional_cursor_token, decode_optional_grouped_cursor_token,
17        },
18        diagnostics::ExecutionTrace,
19        executor::{
20            ExecutionFamily, GroupedCursorPage, LoadExecutor, PreparedExecutionPlan,
21            SharedPreparedExecutionPlan,
22        },
23        query::builder::{
24            PreparedFluentAggregateExplainStrategy, PreparedFluentProjectionStrategy,
25        },
26        query::explain::{
27            ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
28        },
29        query::{
30            intent::{CompiledQuery, PlannedQuery, StructuralQuery},
31            plan::{AccessPlannedQuery, QueryMode, VisibleIndexes},
32        },
33    },
34    error::InternalError,
35    model::entity::EntityModel,
36    traits::{CanisterKind, EntityKind, EntityValue, Path},
37};
38#[cfg(feature = "perf-attribution")]
39use candid::CandidType;
40use icydb_utils::Xxh3;
41#[cfg(feature = "perf-attribution")]
42use serde::Deserialize;
43use std::{cell::RefCell, collections::HashMap, hash::BuildHasherDefault};
44
45type CacheBuildHasher = BuildHasherDefault<Xxh3>;
46
47// Bump this when the shared lower query-plan cache key meaning changes in a
48// way that must force old in-heap entries to miss instead of aliasing.
49const SHARED_QUERY_PLAN_CACHE_METHOD_VERSION: u8 = 1;
50
51#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
52pub(in crate::db) enum QueryPlanVisibility {
53    StoreNotReady,
54    StoreReady,
55}
56
57#[derive(Clone, Debug, Eq, Hash, PartialEq)]
58pub(in crate::db) struct QueryPlanCacheKey {
59    cache_method_version: u8,
60    entity_path: &'static str,
61    schema_fingerprint: CommitSchemaFingerprint,
62    visibility: QueryPlanVisibility,
63    structural_query: crate::db::query::intent::StructuralQueryCacheKey,
64}
65
66#[derive(Clone, Debug)]
67pub(in crate::db) struct QueryPlanCacheEntry {
68    logical_plan: AccessPlannedQuery,
69    prepared_plan: SharedPreparedExecutionPlan,
70}
71
72impl QueryPlanCacheEntry {
73    #[must_use]
74    pub(in crate::db) const fn new(
75        logical_plan: AccessPlannedQuery,
76        prepared_plan: SharedPreparedExecutionPlan,
77    ) -> Self {
78        Self {
79            logical_plan,
80            prepared_plan,
81        }
82    }
83
84    #[must_use]
85    pub(in crate::db) const fn logical_plan(&self) -> &AccessPlannedQuery {
86        &self.logical_plan
87    }
88
89    #[must_use]
90    pub(in crate::db) fn typed_prepared_plan<E: EntityKind>(&self) -> PreparedExecutionPlan<E> {
91        self.prepared_plan.typed_clone::<E>()
92    }
93
94    #[must_use]
95    pub(in crate::db) const fn prepared_plan(&self) -> &SharedPreparedExecutionPlan {
96        &self.prepared_plan
97    }
98}
99
100pub(in crate::db) type QueryPlanCache =
101    HashMap<QueryPlanCacheKey, QueryPlanCacheEntry, CacheBuildHasher>;
102
103thread_local! {
104    // Keep one in-heap query-plan cache per store registry so fresh `DbSession`
105    // facades can share prepared logical plans across update/query calls while
106    // tests and multi-registry host processes remain isolated by registry
107    // identity.
108    static QUERY_PLAN_CACHES: RefCell<HashMap<usize, QueryPlanCache, CacheBuildHasher>> =
109        RefCell::new(HashMap::default());
110}
111
112#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
113pub(in crate::db) struct QueryPlanCacheAttribution {
114    pub hits: u64,
115    pub misses: u64,
116}
117
118impl QueryPlanCacheAttribution {
119    #[must_use]
120    const fn hit() -> Self {
121        Self { hits: 1, misses: 0 }
122    }
123
124    #[must_use]
125    const fn miss() -> Self {
126        Self { hits: 0, misses: 1 }
127    }
128}
129
130///
131/// QueryExecutionAttribution
132///
133/// QueryExecutionAttribution records the top-level compile/execute split for
134/// typed/fluent query execution at the session boundary.
135///
136#[cfg(feature = "perf-attribution")]
137#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
138pub struct QueryExecutionAttribution {
139    pub compile_local_instructions: u64,
140    pub runtime_local_instructions: u64,
141    pub finalize_local_instructions: u64,
142    pub direct_data_row_scan_local_instructions: u64,
143    pub direct_data_row_key_stream_local_instructions: u64,
144    pub direct_data_row_row_read_local_instructions: u64,
145    pub direct_data_row_key_encode_local_instructions: u64,
146    pub direct_data_row_store_get_local_instructions: u64,
147    pub direct_data_row_order_window_local_instructions: u64,
148    pub direct_data_row_page_window_local_instructions: u64,
149    pub response_decode_local_instructions: u64,
150    pub execute_local_instructions: u64,
151    pub total_local_instructions: u64,
152    pub shared_query_plan_cache_hits: u64,
153    pub shared_query_plan_cache_misses: u64,
154}
155
156#[cfg(feature = "perf-attribution")]
157#[expect(
158    clippy::missing_const_for_fn,
159    reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
160)]
161fn read_query_local_instruction_counter() -> u64 {
162    #[cfg(target_arch = "wasm32")]
163    {
164        canic_cdk::api::performance_counter(1)
165    }
166
167    #[cfg(not(target_arch = "wasm32"))]
168    {
169        0
170    }
171}
172
173#[cfg(feature = "perf-attribution")]
174fn measure_query_stage<T, E>(run: impl FnOnce() -> Result<T, E>) -> (u64, Result<T, E>) {
175    let start = read_query_local_instruction_counter();
176    let result = run();
177    let delta = read_query_local_instruction_counter().saturating_sub(start);
178
179    (delta, result)
180}
181
182impl<C: CanisterKind> DbSession<C> {
183    #[cfg(feature = "perf-attribution")]
184    const fn empty_scalar_execute_phase_attribution() -> ScalarExecutePhaseAttribution {
185        ScalarExecutePhaseAttribution {
186            runtime_local_instructions: 0,
187            finalize_local_instructions: 0,
188            direct_data_row_scan_local_instructions: 0,
189            direct_data_row_key_stream_local_instructions: 0,
190            direct_data_row_row_read_local_instructions: 0,
191            direct_data_row_key_encode_local_instructions: 0,
192            direct_data_row_store_get_local_instructions: 0,
193            direct_data_row_order_window_local_instructions: 0,
194            direct_data_row_page_window_local_instructions: 0,
195        }
196    }
197
198    fn query_plan_cache_scope_id(&self) -> usize {
199        self.db.cache_scope_id()
200    }
201
202    fn with_query_plan_cache<R>(&self, f: impl FnOnce(&mut QueryPlanCache) -> R) -> R {
203        let scope_id = self.query_plan_cache_scope_id();
204
205        QUERY_PLAN_CACHES.with(|caches| {
206            let mut caches = caches.borrow_mut();
207            let cache = caches.entry(scope_id).or_default();
208
209            f(cache)
210        })
211    }
212
213    const fn visible_indexes_for_model(
214        model: &'static EntityModel,
215        visibility: QueryPlanVisibility,
216    ) -> VisibleIndexes<'static> {
217        match visibility {
218            QueryPlanVisibility::StoreReady => VisibleIndexes::planner_visible(model.indexes()),
219            QueryPlanVisibility::StoreNotReady => VisibleIndexes::none(),
220        }
221    }
222
223    #[cfg(test)]
224    pub(in crate::db) fn query_plan_cache_len(&self) -> usize {
225        self.with_query_plan_cache(|cache| cache.len())
226    }
227
228    #[cfg(test)]
229    pub(in crate::db) fn clear_query_plan_cache_for_tests(&self) {
230        self.with_query_plan_cache(QueryPlanCache::clear);
231    }
232
233    pub(in crate::db) fn query_plan_visibility_for_store_path(
234        &self,
235        store_path: &'static str,
236    ) -> Result<QueryPlanVisibility, QueryError> {
237        let store = self
238            .db
239            .recovered_store(store_path)
240            .map_err(QueryError::execute)?;
241        let visibility = if store.index_state() == crate::db::IndexState::Ready {
242            QueryPlanVisibility::StoreReady
243        } else {
244            QueryPlanVisibility::StoreNotReady
245        };
246
247        Ok(visibility)
248    }
249
250    pub(in crate::db) fn cached_query_plan_entry_for_authority(
251        &self,
252        authority: crate::db::executor::EntityAuthority,
253        schema_fingerprint: CommitSchemaFingerprint,
254        query: &StructuralQuery,
255    ) -> Result<(QueryPlanCacheEntry, QueryPlanCacheAttribution), QueryError> {
256        let visibility = self.query_plan_visibility_for_store_path(authority.store_path())?;
257        let visible_indexes = Self::visible_indexes_for_model(authority.model(), visibility);
258        let normalized_predicate = query.prepare_normalized_scalar_predicate()?;
259        let cache_key = QueryPlanCacheKey::for_authority_with_normalized_predicate(
260            authority,
261            schema_fingerprint,
262            visibility,
263            query,
264            normalized_predicate.as_ref(),
265        );
266
267        {
268            let cached = self.with_query_plan_cache(|cache| cache.get(&cache_key).cloned());
269            if let Some(entry) = cached {
270                return Ok((entry, QueryPlanCacheAttribution::hit()));
271            }
272        }
273
274        let plan = query.build_plan_with_visible_indexes_from_normalized_predicate(
275            &visible_indexes,
276            normalized_predicate,
277        )?;
278        let entry = QueryPlanCacheEntry::new(
279            plan.clone(),
280            SharedPreparedExecutionPlan::from_plan(authority, plan),
281        );
282        self.with_query_plan_cache(|cache| {
283            cache.insert(cache_key, entry.clone());
284        });
285
286        Ok((entry, QueryPlanCacheAttribution::miss()))
287    }
288
289    #[cfg(test)]
290    pub(in crate::db) fn query_plan_cache_key_for_tests(
291        authority: crate::db::executor::EntityAuthority,
292        schema_fingerprint: CommitSchemaFingerprint,
293        visibility: QueryPlanVisibility,
294        query: &StructuralQuery,
295        cache_method_version: u8,
296    ) -> QueryPlanCacheKey {
297        QueryPlanCacheKey::for_authority_with_method_version(
298            authority,
299            schema_fingerprint,
300            visibility,
301            query,
302            cache_method_version,
303        )
304    }
305
306    pub(in crate::db) fn cached_structural_plan_for_authority(
307        &self,
308        authority: crate::db::executor::EntityAuthority,
309        schema_fingerprint: CommitSchemaFingerprint,
310        query: &StructuralQuery,
311    ) -> Result<AccessPlannedQuery, QueryError> {
312        let (entry, _) =
313            self.cached_query_plan_entry_for_authority(authority, schema_fingerprint, query)?;
314
315        Ok(entry.logical_plan().clone())
316    }
317
318    // Resolve the planner-visible index slice for one typed query exactly once
319    // at the session boundary before handing execution/planning off to query-owned logic.
320    fn with_query_visible_indexes<E, T>(
321        &self,
322        query: &Query<E>,
323        op: impl FnOnce(
324            &Query<E>,
325            &crate::db::query::plan::VisibleIndexes<'static>,
326        ) -> Result<T, QueryError>,
327    ) -> Result<T, QueryError>
328    where
329        E: EntityKind<Canister = C>,
330    {
331        let visibility = self.query_plan_visibility_for_store_path(E::Store::PATH)?;
332        let visible_indexes = Self::visible_indexes_for_model(E::MODEL, visibility);
333
334        op(query, &visible_indexes)
335    }
336
337    // Resolve one typed structural query onto the shared lower plan cache so
338    // typed/fluent callers do not each duplicate the entity metadata plumbing.
339    fn cached_structural_plan_for_entity<E>(
340        &self,
341        query: &StructuralQuery,
342    ) -> Result<AccessPlannedQuery, QueryError>
343    where
344        E: EntityKind<Canister = C>,
345    {
346        self.cached_structural_plan_for_authority(
347            crate::db::executor::EntityAuthority::for_type::<E>(),
348            crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
349            query,
350        )
351    }
352
353    // Resolve one cached structural plan for the typed entity and then
354    // project it into one caller-owned wrapper so planned vs compiled session
355    // surfaces do not each duplicate the same cache lookup and entity wiring.
356    fn map_cached_structural_plan_for_entity<E, T>(
357        &self,
358        query: &StructuralQuery,
359        map: impl FnOnce(AccessPlannedQuery) -> T,
360    ) -> Result<T, QueryError>
361    where
362        E: EntityKind<Canister = C>,
363    {
364        let plan = self.cached_structural_plan_for_entity::<E>(query)?;
365
366        Ok(map(plan))
367    }
368
369    pub(in crate::db::session) fn cached_prepared_query_plan_for_entity<E>(
370        &self,
371        query: &StructuralQuery,
372    ) -> Result<(PreparedExecutionPlan<E>, QueryPlanCacheAttribution), QueryError>
373    where
374        E: EntityKind<Canister = C>,
375    {
376        let (entry, attribution) = self.cached_query_plan_entry_for_authority(
377            crate::db::executor::EntityAuthority::for_type::<E>(),
378            crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
379            query,
380        )?;
381
382        Ok((entry.typed_prepared_plan::<E>(), attribution))
383    }
384
385    // Compile one typed query using only the indexes currently visible for the
386    // query's recovered store.
387    pub(in crate::db) fn compile_query_with_visible_indexes<E>(
388        &self,
389        query: &Query<E>,
390    ) -> Result<CompiledQuery<E>, QueryError>
391    where
392        E: EntityKind<Canister = C>,
393    {
394        self.map_cached_structural_plan_for_entity::<E, _>(
395            query.structural(),
396            Query::<E>::compiled_query_from_plan,
397        )
398    }
399
400    // Build one logical planned-query shell using only the indexes currently
401    // visible for the query's recovered store.
402    pub(in crate::db) fn planned_query_with_visible_indexes<E>(
403        &self,
404        query: &Query<E>,
405    ) -> Result<PlannedQuery<E>, QueryError>
406    where
407        E: EntityKind<Canister = C>,
408    {
409        self.map_cached_structural_plan_for_entity::<E, _>(
410            query.structural(),
411            Query::<E>::planned_query_from_plan,
412        )
413    }
414
415    // Project one logical explain payload using only planner-visible indexes.
416    pub(in crate::db) fn explain_query_with_visible_indexes<E>(
417        &self,
418        query: &Query<E>,
419    ) -> Result<ExplainPlan, QueryError>
420    where
421        E: EntityKind<Canister = C>,
422    {
423        self.with_query_visible_indexes(query, Query::<E>::explain_with_visible_indexes)
424    }
425
426    // Hash one typed query plan using only the indexes currently visible for
427    // the query's recovered store.
428    pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
429        &self,
430        query: &Query<E>,
431    ) -> Result<String, QueryError>
432    where
433        E: EntityKind<Canister = C>,
434    {
435        self.with_query_visible_indexes(query, Query::<E>::plan_hash_hex_with_visible_indexes)
436    }
437
438    // Explain one load execution shape using only planner-visible
439    // indexes from the recovered store state.
440    pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
441        &self,
442        query: &Query<E>,
443    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
444    where
445        E: EntityValue + EntityKind<Canister = C>,
446    {
447        self.with_query_visible_indexes(query, Query::<E>::explain_execution_with_visible_indexes)
448    }
449
450    // Render one load execution descriptor plus route diagnostics using
451    // only planner-visible indexes from the recovered store state.
452    pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
453        &self,
454        query: &Query<E>,
455    ) -> Result<String, QueryError>
456    where
457        E: EntityValue + EntityKind<Canister = C>,
458    {
459        self.with_query_visible_indexes(
460            query,
461            Query::<E>::explain_execution_verbose_with_visible_indexes,
462        )
463    }
464
465    // Explain one prepared fluent aggregate terminal using only
466    // planner-visible indexes from the recovered store state.
467    pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
468        &self,
469        query: &Query<E>,
470        strategy: &S,
471    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
472    where
473        E: EntityValue + EntityKind<Canister = C>,
474        S: PreparedFluentAggregateExplainStrategy,
475    {
476        self.with_query_visible_indexes(query, |query, visible_indexes| {
477            query
478                .explain_prepared_aggregate_terminal_with_visible_indexes(visible_indexes, strategy)
479        })
480    }
481
482    // Explain one `bytes_by(field)` terminal using only planner-visible
483    // indexes from the recovered store state.
484    pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
485        &self,
486        query: &Query<E>,
487        target_field: &str,
488    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
489    where
490        E: EntityValue + EntityKind<Canister = C>,
491    {
492        self.with_query_visible_indexes(query, |query, visible_indexes| {
493            query.explain_bytes_by_with_visible_indexes(visible_indexes, target_field)
494        })
495    }
496
497    // Explain one prepared fluent projection terminal using only
498    // planner-visible indexes from the recovered store state.
499    pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
500        &self,
501        query: &Query<E>,
502        strategy: &PreparedFluentProjectionStrategy,
503    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
504    where
505        E: EntityValue + EntityKind<Canister = C>,
506    {
507        self.with_query_visible_indexes(query, |query, visible_indexes| {
508            query.explain_prepared_projection_terminal_with_visible_indexes(
509                visible_indexes,
510                strategy,
511            )
512        })
513    }
514
515    // Validate that one execution strategy is admissible for scalar paged load
516    // execution and fail closed on grouped/primary-key-only routes.
517    fn ensure_scalar_paged_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
518        match family {
519            ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
520                CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
521            )),
522            ExecutionFamily::Ordered => Ok(()),
523            ExecutionFamily::Grouped => Err(QueryError::invariant(
524                "grouped queries execute via execute(), not page().execute()",
525            )),
526        }
527    }
528
529    // Validate that one execution strategy is admissible for the grouped
530    // execution surface.
531    fn ensure_grouped_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
532        match family {
533            ExecutionFamily::Grouped => Ok(()),
534            ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
535                "grouped execution requires grouped logical plans",
536            )),
537        }
538    }
539
540    // Finalize one grouped cursor page into the outward grouped execution
541    // payload so grouped cursor encoding and continuation-shape validation
542    // stay owned by the session boundary.
543    fn finalize_grouped_execution_page(
544        page: GroupedCursorPage,
545        trace: Option<ExecutionTrace>,
546    ) -> Result<PagedGroupedExecutionWithTrace, QueryError> {
547        let next_cursor = page
548            .next_cursor
549            .map(|token| {
550                let Some(token) = token.as_grouped() else {
551                    return Err(QueryError::grouped_paged_emitted_scalar_continuation());
552                };
553
554                token.encode().map_err(|err| {
555                    QueryError::serialize_internal(format!(
556                        "failed to serialize grouped continuation cursor: {err}"
557                    ))
558                })
559            })
560            .transpose()?;
561
562        Ok(PagedGroupedExecutionWithTrace::new(
563            page.rows,
564            next_cursor,
565            trace,
566        ))
567    }
568
569    // Execute one prepared grouped query result and finalize the outward
570    // grouped payload at the session boundary so fluent callers do not
571    // reassemble grouped result wrappers themselves.
572    fn execute_grouped_query_result<E>(
573        &self,
574        query: &Query<E>,
575        cursor_token: Option<&str>,
576    ) -> Result<LoadQueryResult<E>, QueryError>
577    where
578        E: PersistedRow<Canister = C> + EntityValue,
579    {
580        self.execute_grouped(query, cursor_token)
581            .map(LoadQueryResult::grouped)
582    }
583
584    // Execute one prepared grouped query result while returning the same empty
585    // scalar-attribution shell used by the grouped execution path in the
586    // perf-attribution surface.
587    #[cfg(feature = "perf-attribution")]
588    fn execute_grouped_query_result_with_attribution<E>(
589        &self,
590        plan: PreparedExecutionPlan<E>,
591    ) -> Result<(LoadQueryResult<E>, ScalarExecutePhaseAttribution, u64), QueryError>
592    where
593        E: PersistedRow<Canister = C> + EntityValue,
594    {
595        let (page, trace) = self.execute_grouped_plan_with_trace(plan, None)?;
596        let grouped = Self::finalize_grouped_execution_page(page, trace)?;
597
598        Ok((
599            LoadQueryResult::grouped(grouped),
600            Self::empty_scalar_execute_phase_attribution(),
601            0,
602        ))
603    }
604
605    // Execute one non-grouped prepared query result while normalizing load vs
606    // delete output into the shared outward load-result contract plus the
607    // scalar execution attribution tuple expected by the perf surface.
608    #[cfg(feature = "perf-attribution")]
609    fn execute_scalar_query_result_with_attribution<E>(
610        &self,
611        mode: QueryMode,
612        plan: PreparedExecutionPlan<E>,
613    ) -> Result<(LoadQueryResult<E>, ScalarExecutePhaseAttribution, u64), QueryError>
614    where
615        E: PersistedRow<Canister = C> + EntityValue,
616    {
617        match mode {
618            QueryMode::Load(_) => {
619                let (rows, phase_attribution, response_decode_local_instructions) = self
620                    .load_executor::<E>()
621                    .execute_with_phase_attribution(plan)
622                    .map_err(QueryError::execute)?;
623
624                Ok((
625                    LoadQueryResult::rows(rows),
626                    phase_attribution,
627                    response_decode_local_instructions,
628                ))
629            }
630            QueryMode::Delete(_) => {
631                let result = self.execute_query_dyn(mode, plan)?;
632
633                Ok((
634                    LoadQueryResult::rows(result),
635                    Self::empty_scalar_execute_phase_attribution(),
636                    0,
637                ))
638            }
639        }
640    }
641
642    /// Execute one scalar load/delete query and return materialized response rows.
643    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
644    where
645        E: PersistedRow<Canister = C> + EntityValue,
646    {
647        // Phase 1: compile typed intent into one prepared execution-plan contract.
648        let mode = query.mode();
649        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
650
651        // Phase 2: delegate execution to the shared compiled-plan entry path.
652        self.execute_query_dyn(mode, plan)
653    }
654
655    /// Execute one typed query while reporting the compile/execute split at
656    /// the shared fluent query seam.
657    #[cfg(feature = "perf-attribution")]
658    #[doc(hidden)]
659    pub fn execute_query_result_with_attribution<E>(
660        &self,
661        query: &Query<E>,
662    ) -> Result<(LoadQueryResult<E>, QueryExecutionAttribution), QueryError>
663    where
664        E: PersistedRow<Canister = C> + EntityValue,
665    {
666        // Phase 1: measure compile work at the typed/fluent boundary,
667        // including the shared lower query-plan cache lookup/build exactly
668        // once. This preserves honest hit/miss attribution without
669        // double-building plans on one-shot cache misses.
670        let (compile_local_instructions, plan_and_cache) = measure_query_stage(|| {
671            self.cached_prepared_query_plan_for_entity::<E>(query.structural())
672        });
673        let (plan, cache_attribution) = plan_and_cache?;
674
675        // Phase 2: execute one query result using the prepared plan produced
676        // by the compile/cache boundary above.
677        let (execute_local_instructions, result) = measure_query_stage(|| {
678            if query.has_grouping() {
679                self.execute_grouped_query_result_with_attribution(plan)
680            } else {
681                self.execute_scalar_query_result_with_attribution(query.mode(), plan)
682            }
683        });
684        let (result, execute_phase_attribution, response_decode_local_instructions) = result?;
685        let total_local_instructions =
686            compile_local_instructions.saturating_add(execute_local_instructions);
687
688        Ok((
689            result,
690            QueryExecutionAttribution {
691                compile_local_instructions,
692                runtime_local_instructions: execute_phase_attribution.runtime_local_instructions,
693                finalize_local_instructions: execute_phase_attribution.finalize_local_instructions,
694                direct_data_row_scan_local_instructions: execute_phase_attribution
695                    .direct_data_row_scan_local_instructions,
696                direct_data_row_key_stream_local_instructions: execute_phase_attribution
697                    .direct_data_row_key_stream_local_instructions,
698                direct_data_row_row_read_local_instructions: execute_phase_attribution
699                    .direct_data_row_row_read_local_instructions,
700                direct_data_row_key_encode_local_instructions: execute_phase_attribution
701                    .direct_data_row_key_encode_local_instructions,
702                direct_data_row_store_get_local_instructions: execute_phase_attribution
703                    .direct_data_row_store_get_local_instructions,
704                direct_data_row_order_window_local_instructions: execute_phase_attribution
705                    .direct_data_row_order_window_local_instructions,
706                direct_data_row_page_window_local_instructions: execute_phase_attribution
707                    .direct_data_row_page_window_local_instructions,
708                response_decode_local_instructions,
709                execute_local_instructions,
710                total_local_instructions,
711                shared_query_plan_cache_hits: cache_attribution.hits,
712                shared_query_plan_cache_misses: cache_attribution.misses,
713            },
714        ))
715    }
716
717    // Execute one typed query through the unified row/grouped result surface so
718    // higher layers do not need to branch on grouped shape themselves.
719    #[doc(hidden)]
720    pub fn execute_query_result<E>(
721        &self,
722        query: &Query<E>,
723    ) -> Result<LoadQueryResult<E>, QueryError>
724    where
725        E: PersistedRow<Canister = C> + EntityValue,
726    {
727        if query.has_grouping() {
728            return self.execute_grouped_query_result(query, None);
729        }
730
731        self.execute_query(query).map(LoadQueryResult::rows)
732    }
733
734    /// Execute one typed delete query and return only the affected-row count.
735    #[doc(hidden)]
736    pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
737    where
738        E: PersistedRow<Canister = C> + EntityValue,
739    {
740        // Phase 1: fail closed if the caller routes a non-delete query here.
741        if !query.mode().is_delete() {
742            return Err(QueryError::unsupported_query(
743                "delete count execution requires delete query mode",
744            ));
745        }
746
747        // Phase 2: compile typed delete intent into one prepared execution-plan contract.
748        let plan = self
749            .compile_query_with_visible_indexes(query)?
750            .into_prepared_execution_plan();
751
752        // Phase 3: execute the shared delete core while skipping response-row materialization.
753        self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
754            .map_err(QueryError::execute)
755    }
756
757    /// Execute one scalar query from one pre-built prepared execution contract.
758    ///
759    /// This is the shared compiled-plan entry boundary used by the typed
760    /// `execute_query(...)` surface and adjacent query execution facades.
761    pub(in crate::db) fn execute_query_dyn<E>(
762        &self,
763        mode: QueryMode,
764        plan: PreparedExecutionPlan<E>,
765    ) -> Result<EntityResponse<E>, QueryError>
766    where
767        E: PersistedRow<Canister = C> + EntityValue,
768    {
769        let result = match mode {
770            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
771            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
772        };
773
774        result.map_err(QueryError::execute)
775    }
776
777    // Shared load-query terminal wrapper: build plan, run under metrics, map
778    // execution errors into query-facing errors.
779    pub(in crate::db) fn execute_load_query_with<E, T>(
780        &self,
781        query: &Query<E>,
782        op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
783    ) -> Result<T, QueryError>
784    where
785        E: PersistedRow<Canister = C> + EntityValue,
786    {
787        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
788
789        self.with_metrics(|| op(self.load_executor::<E>(), plan))
790            .map_err(QueryError::execute)
791    }
792
793    /// Build one trace payload for a query without executing it.
794    ///
795    /// This lightweight surface is intended for developer diagnostics:
796    /// plan hash, access strategy summary, and planner/executor route shape.
797    pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
798    where
799        E: EntityKind<Canister = C>,
800    {
801        let compiled = self.compile_query_with_visible_indexes(query)?;
802        let explain = compiled.explain();
803        let plan_hash = compiled.plan_hash_hex();
804
805        let (executable, _) =
806            self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
807        let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
808        let execution_family = match query.mode() {
809            QueryMode::Load(_) => Some(executable.execution_family().map_err(QueryError::execute)?),
810            QueryMode::Delete(_) => None,
811        };
812
813        Ok(QueryTracePlan::new(
814            plan_hash,
815            access_strategy,
816            execution_family,
817            explain,
818        ))
819    }
820
821    /// Execute one scalar paged load query and return optional continuation cursor plus trace.
822    pub(crate) fn execute_load_query_paged_with_trace<E>(
823        &self,
824        query: &Query<E>,
825        cursor_token: Option<&str>,
826    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
827    where
828        E: PersistedRow<Canister = C> + EntityValue,
829    {
830        // Phase 1: build/validate prepared execution plan and reject grouped plans.
831        let plan = self
832            .cached_prepared_query_plan_for_entity::<E>(query.structural())?
833            .0;
834        Self::ensure_scalar_paged_execution_family(
835            plan.execution_family().map_err(QueryError::execute)?,
836        )?;
837
838        // Phase 2: decode external cursor token and validate it against plan surface.
839        let cursor_bytes = decode_optional_cursor_token(cursor_token)
840            .map_err(QueryError::from_cursor_plan_error)?;
841        let cursor = plan
842            .prepare_cursor(cursor_bytes.as_deref())
843            .map_err(QueryError::from_executor_plan_error)?;
844
845        // Phase 3: execute one traced page and encode outbound continuation token.
846        let (page, trace) = self
847            .with_metrics(|| {
848                self.load_executor::<E>()
849                    .execute_paged_with_cursor_traced(plan, cursor)
850            })
851            .map_err(QueryError::execute)?;
852        let next_cursor = page
853            .next_cursor
854            .map(|token| {
855                let Some(token) = token.as_scalar() else {
856                    return Err(QueryError::scalar_paged_emitted_grouped_continuation());
857                };
858
859                token.encode().map_err(|err| {
860                    QueryError::serialize_internal(format!(
861                        "failed to serialize continuation cursor: {err}"
862                    ))
863                })
864            })
865            .transpose()?;
866
867        Ok(PagedLoadExecutionWithTrace::new(
868            page.items,
869            next_cursor,
870            trace,
871        ))
872    }
873
874    /// Execute one grouped query page with optional grouped continuation cursor.
875    ///
876    /// This is the explicit grouped execution boundary; scalar load APIs reject
877    /// grouped plans to preserve scalar response contracts.
878    pub(in crate::db) fn execute_grouped<E>(
879        &self,
880        query: &Query<E>,
881        cursor_token: Option<&str>,
882    ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
883    where
884        E: PersistedRow<Canister = C> + EntityValue,
885    {
886        let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
887        Self::finalize_grouped_execution_page(page, trace)
888    }
889
890    // Execute the canonical grouped query core and return the raw grouped page
891    // plus optional execution trace before outward cursor formatting.
892    fn execute_grouped_page_with_trace<E>(
893        &self,
894        query: &Query<E>,
895        cursor_token: Option<&str>,
896    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
897    where
898        E: PersistedRow<Canister = C> + EntityValue,
899    {
900        // Phase 1: build the prepared execution plan once from the typed query.
901        let plan = self
902            .cached_prepared_query_plan_for_entity::<E>(query.structural())?
903            .0;
904
905        // Phase 2: reuse the shared prepared grouped execution path.
906        self.execute_grouped_plan_with_trace(plan, cursor_token)
907    }
908
909    // Execute one grouped prepared plan page with optional grouped cursor.
910    fn execute_grouped_plan_with_trace<E>(
911        &self,
912        plan: PreparedExecutionPlan<E>,
913        cursor_token: Option<&str>,
914    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
915    where
916        E: PersistedRow<Canister = C> + EntityValue,
917    {
918        // Phase 1: validate the prepared plan shape before decoding cursors.
919        Self::ensure_grouped_execution_family(
920            plan.execution_family().map_err(QueryError::execute)?,
921        )?;
922
923        // Phase 2: decode external grouped cursor token and validate against plan.
924        let cursor = decode_optional_grouped_cursor_token(cursor_token)
925            .map_err(QueryError::from_cursor_plan_error)?;
926        let cursor = plan
927            .prepare_grouped_cursor_token(cursor)
928            .map_err(QueryError::from_executor_plan_error)?;
929
930        // Phase 3: execute one grouped page while preserving the structural
931        // grouped cursor payload for whichever outward cursor format the caller needs.
932        self.with_metrics(|| {
933            self.load_executor::<E>()
934                .execute_grouped_paged_with_cursor_traced(plan, cursor)
935        })
936        .map_err(QueryError::execute)
937    }
938}
939
940impl QueryPlanCacheKey {
941    fn for_authority_with_normalized_predicate(
942        authority: crate::db::executor::EntityAuthority,
943        schema_fingerprint: CommitSchemaFingerprint,
944        visibility: QueryPlanVisibility,
945        query: &StructuralQuery,
946        normalized_predicate: Option<&crate::db::predicate::Predicate>,
947    ) -> Self {
948        Self::for_authority_with_normalized_predicate_and_method_version(
949            authority,
950            schema_fingerprint,
951            visibility,
952            query,
953            normalized_predicate,
954            SHARED_QUERY_PLAN_CACHE_METHOD_VERSION,
955        )
956    }
957
958    // Assemble the canonical cache-key shell once so the test and
959    // normalized-predicate constructors only decide which structural query key
960    // they feed into the shared session cache identity.
961    const fn from_authority_parts(
962        authority: crate::db::executor::EntityAuthority,
963        schema_fingerprint: CommitSchemaFingerprint,
964        visibility: QueryPlanVisibility,
965        structural_query: crate::db::query::intent::StructuralQueryCacheKey,
966        cache_method_version: u8,
967    ) -> Self {
968        Self {
969            cache_method_version,
970            entity_path: authority.entity_path(),
971            schema_fingerprint,
972            visibility,
973            structural_query,
974        }
975    }
976
977    #[cfg(test)]
978    fn for_authority_with_method_version(
979        authority: crate::db::executor::EntityAuthority,
980        schema_fingerprint: CommitSchemaFingerprint,
981        visibility: QueryPlanVisibility,
982        query: &StructuralQuery,
983        cache_method_version: u8,
984    ) -> Self {
985        Self::from_authority_parts(
986            authority,
987            schema_fingerprint,
988            visibility,
989            query.structural_cache_key(),
990            cache_method_version,
991        )
992    }
993
994    fn for_authority_with_normalized_predicate_and_method_version(
995        authority: crate::db::executor::EntityAuthority,
996        schema_fingerprint: CommitSchemaFingerprint,
997        visibility: QueryPlanVisibility,
998        query: &StructuralQuery,
999        normalized_predicate: Option<&crate::db::predicate::Predicate>,
1000        cache_method_version: u8,
1001    ) -> Self {
1002        Self::from_authority_parts(
1003            authority,
1004            schema_fingerprint,
1005            visibility,
1006            query.structural_cache_key_with_normalized_predicate(normalized_predicate),
1007            cache_method_version,
1008        )
1009    }
1010}