Skip to main content

icydb_core/db/session/
query.rs

1//! Module: db::session::query
2//! Responsibility: session-bound query planning, explain, and cursor execution
3//! helpers that recover store visibility before delegating to query-owned logic.
4//! Does not own: query intent construction or executor runtime semantics.
5//! Boundary: resolves session visibility and cursor policy before handing work to the planner/executor.
6
7#[cfg(feature = "diagnostics")]
8use crate::db::executor::{
9    GroupedCountAttribution, GroupedExecutePhaseAttribution, ScalarExecutePhaseAttribution,
10};
11use crate::{
12    db::{
13        DbSession, EntityResponse, LoadQueryResult, PagedGroupedExecutionWithTrace,
14        PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
15        access::AccessStrategy,
16        commit::CommitSchemaFingerprint,
17        cursor::{
18            CursorPlanError, decode_optional_cursor_token, decode_optional_grouped_cursor_token,
19        },
20        diagnostics::ExecutionTrace,
21        executor::{
22            ExecutionFamily, GroupedCursorPage, LoadExecutor, PreparedExecutionPlan,
23            SharedPreparedExecutionPlan,
24        },
25        predicate::predicate_fingerprint_normalized,
26        query::builder::{
27            PreparedFluentAggregateExplainStrategy, PreparedFluentProjectionStrategy,
28        },
29        query::explain::{
30            ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
31        },
32        query::{
33            intent::{CompiledQuery, PlannedQuery, StructuralQuery},
34            plan::{QueryMode, VisibleIndexes},
35        },
36    },
37    error::InternalError,
38    model::entity::EntityModel,
39    traits::{CanisterKind, EntityKind, EntityValue, Path},
40};
41#[cfg(feature = "diagnostics")]
42use candid::CandidType;
43use icydb_utils::Xxh3;
44#[cfg(feature = "diagnostics")]
45use serde::Deserialize;
46use std::{cell::RefCell, collections::HashMap, hash::BuildHasherDefault};
47
48type CacheBuildHasher = BuildHasherDefault<Xxh3>;
49
50// Bump this when the shared lower query-plan cache key meaning changes in a
51// way that must force old in-heap entries to miss instead of aliasing.
52const SHARED_QUERY_PLAN_CACHE_METHOD_VERSION: u8 = 1;
53
54#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
55pub(in crate::db) enum QueryPlanVisibility {
56    StoreNotReady,
57    StoreReady,
58}
59
60#[derive(Clone, Debug, Eq, Hash, PartialEq)]
61pub(in crate::db) struct QueryPlanCacheKey {
62    cache_method_version: u8,
63    entity_path: &'static str,
64    schema_fingerprint: CommitSchemaFingerprint,
65    visibility: QueryPlanVisibility,
66    structural_query: crate::db::query::intent::StructuralQueryCacheKey,
67}
68
69pub(in crate::db) type QueryPlanCache =
70    HashMap<QueryPlanCacheKey, SharedPreparedExecutionPlan, CacheBuildHasher>;
71
72thread_local! {
73    // Keep one in-heap query-plan cache per store registry so fresh `DbSession`
74    // facades can share prepared logical plans across update/query calls while
75    // tests and multi-registry host processes remain isolated by registry
76    // identity.
77    static QUERY_PLAN_CACHES: RefCell<HashMap<usize, QueryPlanCache, CacheBuildHasher>> =
78        RefCell::new(HashMap::default());
79}
80
81#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
82pub(in crate::db) struct QueryPlanCacheAttribution {
83    pub hits: u64,
84    pub misses: u64,
85}
86
87impl QueryPlanCacheAttribution {
88    #[must_use]
89    const fn hit() -> Self {
90        Self { hits: 1, misses: 0 }
91    }
92
93    #[must_use]
94    const fn miss() -> Self {
95        Self { hits: 0, misses: 1 }
96    }
97}
98
99///
100/// QueryExecutionAttribution
101///
102/// QueryExecutionAttribution records the top-level compile/execute split for
103/// typed/fluent query execution at the session boundary.
104///
105#[cfg(feature = "diagnostics")]
106#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
107pub struct QueryExecutionAttribution {
108    pub compile_local_instructions: u64,
109    pub runtime_local_instructions: u64,
110    pub finalize_local_instructions: u64,
111    pub direct_data_row_scan_local_instructions: u64,
112    pub direct_data_row_key_stream_local_instructions: u64,
113    pub direct_data_row_row_read_local_instructions: u64,
114    pub direct_data_row_key_encode_local_instructions: u64,
115    pub direct_data_row_store_get_local_instructions: u64,
116    pub direct_data_row_order_window_local_instructions: u64,
117    pub direct_data_row_page_window_local_instructions: u64,
118    pub grouped_stream_local_instructions: u64,
119    pub grouped_fold_local_instructions: u64,
120    pub grouped_finalize_local_instructions: u64,
121    pub grouped_count_borrowed_hash_computations: u64,
122    pub grouped_count_bucket_candidate_checks: u64,
123    pub grouped_count_existing_group_hits: u64,
124    pub grouped_count_new_group_inserts: u64,
125    pub grouped_count_row_materialization_local_instructions: u64,
126    pub grouped_count_group_lookup_local_instructions: u64,
127    pub grouped_count_existing_group_update_local_instructions: u64,
128    pub grouped_count_new_group_insert_local_instructions: u64,
129    pub response_decode_local_instructions: u64,
130    pub execute_local_instructions: u64,
131    pub total_local_instructions: u64,
132    pub shared_query_plan_cache_hits: u64,
133    pub shared_query_plan_cache_misses: u64,
134}
135
136#[cfg(feature = "diagnostics")]
137#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
138struct QueryExecutePhaseAttribution {
139    runtime_local_instructions: u64,
140    finalize_local_instructions: u64,
141    direct_data_row_scan_local_instructions: u64,
142    direct_data_row_key_stream_local_instructions: u64,
143    direct_data_row_row_read_local_instructions: u64,
144    direct_data_row_key_encode_local_instructions: u64,
145    direct_data_row_store_get_local_instructions: u64,
146    direct_data_row_order_window_local_instructions: u64,
147    direct_data_row_page_window_local_instructions: u64,
148    grouped_stream_local_instructions: u64,
149    grouped_fold_local_instructions: u64,
150    grouped_finalize_local_instructions: u64,
151    grouped_count: GroupedCountAttribution,
152}
153
154#[cfg(feature = "diagnostics")]
155#[expect(
156    clippy::missing_const_for_fn,
157    reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
158)]
159fn read_query_local_instruction_counter() -> u64 {
160    #[cfg(target_arch = "wasm32")]
161    {
162        canic_cdk::api::performance_counter(1)
163    }
164
165    #[cfg(not(target_arch = "wasm32"))]
166    {
167        0
168    }
169}
170
171#[cfg(feature = "diagnostics")]
172fn measure_query_stage<T, E>(run: impl FnOnce() -> Result<T, E>) -> (u64, Result<T, E>) {
173    let start = read_query_local_instruction_counter();
174    let result = run();
175    let delta = read_query_local_instruction_counter().saturating_sub(start);
176
177    (delta, result)
178}
179
180impl<C: CanisterKind> DbSession<C> {
181    #[cfg(feature = "diagnostics")]
182    const fn empty_query_execute_phase_attribution() -> QueryExecutePhaseAttribution {
183        QueryExecutePhaseAttribution {
184            runtime_local_instructions: 0,
185            finalize_local_instructions: 0,
186            direct_data_row_scan_local_instructions: 0,
187            direct_data_row_key_stream_local_instructions: 0,
188            direct_data_row_row_read_local_instructions: 0,
189            direct_data_row_key_encode_local_instructions: 0,
190            direct_data_row_store_get_local_instructions: 0,
191            direct_data_row_order_window_local_instructions: 0,
192            direct_data_row_page_window_local_instructions: 0,
193            grouped_stream_local_instructions: 0,
194            grouped_fold_local_instructions: 0,
195            grouped_finalize_local_instructions: 0,
196            grouped_count: GroupedCountAttribution::none(),
197        }
198    }
199
200    #[cfg(feature = "diagnostics")]
201    const fn scalar_query_execute_phase_attribution(
202        phase: ScalarExecutePhaseAttribution,
203    ) -> QueryExecutePhaseAttribution {
204        QueryExecutePhaseAttribution {
205            runtime_local_instructions: phase.runtime_local_instructions,
206            finalize_local_instructions: phase.finalize_local_instructions,
207            direct_data_row_scan_local_instructions: phase.direct_data_row_scan_local_instructions,
208            direct_data_row_key_stream_local_instructions: phase
209                .direct_data_row_key_stream_local_instructions,
210            direct_data_row_row_read_local_instructions: phase
211                .direct_data_row_row_read_local_instructions,
212            direct_data_row_key_encode_local_instructions: phase
213                .direct_data_row_key_encode_local_instructions,
214            direct_data_row_store_get_local_instructions: phase
215                .direct_data_row_store_get_local_instructions,
216            direct_data_row_order_window_local_instructions: phase
217                .direct_data_row_order_window_local_instructions,
218            direct_data_row_page_window_local_instructions: phase
219                .direct_data_row_page_window_local_instructions,
220            grouped_stream_local_instructions: 0,
221            grouped_fold_local_instructions: 0,
222            grouped_finalize_local_instructions: 0,
223            grouped_count: GroupedCountAttribution::none(),
224        }
225    }
226
227    #[cfg(feature = "diagnostics")]
228    const fn grouped_query_execute_phase_attribution(
229        phase: GroupedExecutePhaseAttribution,
230    ) -> QueryExecutePhaseAttribution {
231        QueryExecutePhaseAttribution {
232            runtime_local_instructions: phase
233                .stream_local_instructions
234                .saturating_add(phase.fold_local_instructions),
235            finalize_local_instructions: phase.finalize_local_instructions,
236            direct_data_row_scan_local_instructions: 0,
237            direct_data_row_key_stream_local_instructions: 0,
238            direct_data_row_row_read_local_instructions: 0,
239            direct_data_row_key_encode_local_instructions: 0,
240            direct_data_row_store_get_local_instructions: 0,
241            direct_data_row_order_window_local_instructions: 0,
242            direct_data_row_page_window_local_instructions: 0,
243            grouped_stream_local_instructions: phase.stream_local_instructions,
244            grouped_fold_local_instructions: phase.fold_local_instructions,
245            grouped_finalize_local_instructions: phase.finalize_local_instructions,
246            grouped_count: phase.grouped_count,
247        }
248    }
249
250    fn with_query_plan_cache<R>(&self, f: impl FnOnce(&mut QueryPlanCache) -> R) -> R {
251        let scope_id = self.db.cache_scope_id();
252
253        QUERY_PLAN_CACHES.with(|caches| {
254            let mut caches = caches.borrow_mut();
255            let cache = caches.entry(scope_id).or_default();
256
257            f(cache)
258        })
259    }
260
261    const fn visible_indexes_for_model(
262        model: &'static EntityModel,
263        visibility: QueryPlanVisibility,
264    ) -> VisibleIndexes<'static> {
265        match visibility {
266            QueryPlanVisibility::StoreReady => VisibleIndexes::planner_visible(model.indexes()),
267            QueryPlanVisibility::StoreNotReady => VisibleIndexes::none(),
268        }
269    }
270
271    #[cfg(test)]
272    pub(in crate::db) fn query_plan_cache_len(&self) -> usize {
273        self.with_query_plan_cache(|cache| cache.len())
274    }
275
276    #[cfg(test)]
277    pub(in crate::db) fn clear_query_plan_cache_for_tests(&self) {
278        self.with_query_plan_cache(QueryPlanCache::clear);
279    }
280
281    pub(in crate::db) fn query_plan_visibility_for_store_path(
282        &self,
283        store_path: &'static str,
284    ) -> Result<QueryPlanVisibility, QueryError> {
285        let store = self
286            .db
287            .recovered_store(store_path)
288            .map_err(QueryError::execute)?;
289        let visibility = if store.index_state() == crate::db::IndexState::Ready {
290            QueryPlanVisibility::StoreReady
291        } else {
292            QueryPlanVisibility::StoreNotReady
293        };
294
295        Ok(visibility)
296    }
297
298    pub(in crate::db) fn cached_shared_query_plan_for_authority(
299        &self,
300        authority: crate::db::executor::EntityAuthority,
301        schema_fingerprint: CommitSchemaFingerprint,
302        query: &StructuralQuery,
303    ) -> Result<(SharedPreparedExecutionPlan, QueryPlanCacheAttribution), QueryError> {
304        let visibility = self.query_plan_visibility_for_store_path(authority.store_path())?;
305        let visible_indexes = Self::visible_indexes_for_model(authority.model(), visibility);
306        let planning_state = query.prepare_scalar_planning_state()?;
307        let normalized_predicate_fingerprint = planning_state
308            .normalized_predicate()
309            .map(predicate_fingerprint_normalized);
310        let cache_key =
311            QueryPlanCacheKey::for_authority_with_normalized_predicate_fingerprint_and_method_version(
312                authority,
313                schema_fingerprint,
314                visibility,
315                query,
316                normalized_predicate_fingerprint,
317                SHARED_QUERY_PLAN_CACHE_METHOD_VERSION,
318            );
319
320        {
321            let cached = self.with_query_plan_cache(|cache| cache.get(&cache_key).cloned());
322            if let Some(prepared_plan) = cached {
323                return Ok((prepared_plan, QueryPlanCacheAttribution::hit()));
324            }
325        }
326
327        let plan = query.build_plan_with_visible_indexes_from_scalar_planning_state(
328            &visible_indexes,
329            planning_state,
330        )?;
331        let prepared_plan = SharedPreparedExecutionPlan::from_plan(authority, plan);
332        self.with_query_plan_cache(|cache| {
333            cache.insert(cache_key, prepared_plan.clone());
334        });
335
336        Ok((prepared_plan, QueryPlanCacheAttribution::miss()))
337    }
338
339    #[cfg(test)]
340    pub(in crate::db) fn query_plan_cache_key_for_tests(
341        authority: crate::db::executor::EntityAuthority,
342        schema_fingerprint: CommitSchemaFingerprint,
343        visibility: QueryPlanVisibility,
344        query: &StructuralQuery,
345        cache_method_version: u8,
346    ) -> QueryPlanCacheKey {
347        QueryPlanCacheKey::for_authority_with_method_version(
348            authority,
349            schema_fingerprint,
350            visibility,
351            query,
352            cache_method_version,
353        )
354    }
355
356    // Resolve the planner-visible index slice for one typed query exactly once
357    // at the session boundary before handing execution/planning off to query-owned logic.
358    fn with_query_visible_indexes<E, T>(
359        &self,
360        query: &Query<E>,
361        op: impl FnOnce(
362            &Query<E>,
363            &crate::db::query::plan::VisibleIndexes<'static>,
364        ) -> Result<T, QueryError>,
365    ) -> Result<T, QueryError>
366    where
367        E: EntityKind<Canister = C>,
368    {
369        let visibility = self.query_plan_visibility_for_store_path(E::Store::PATH)?;
370        let visible_indexes = Self::visible_indexes_for_model(E::MODEL, visibility);
371
372        op(query, &visible_indexes)
373    }
374
375    pub(in crate::db::session) fn cached_prepared_query_plan_for_entity<E>(
376        &self,
377        query: &Query<E>,
378    ) -> Result<(PreparedExecutionPlan<E>, QueryPlanCacheAttribution), QueryError>
379    where
380        E: EntityKind<Canister = C>,
381    {
382        let (prepared_plan, attribution) = self.cached_shared_query_plan_for_entity::<E>(query)?;
383
384        Ok((prepared_plan.typed_clone::<E>(), attribution))
385    }
386
387    // Resolve one typed query through the shared lower query-plan cache using
388    // the canonical authority and schema-fingerprint pair for that entity.
389    fn cached_shared_query_plan_for_entity<E>(
390        &self,
391        query: &Query<E>,
392    ) -> Result<(SharedPreparedExecutionPlan, QueryPlanCacheAttribution), QueryError>
393    where
394        E: EntityKind<Canister = C>,
395    {
396        self.cached_shared_query_plan_for_authority(
397            crate::db::executor::EntityAuthority::for_type::<E>(),
398            crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
399            query.structural(),
400        )
401    }
402
403    // Map one typed query onto one cached lower prepared plan so query-owned
404    // planned and compiled wrappers do not each repeat the same cache lookup.
405    fn map_cached_shared_query_plan_for_entity<E, T>(
406        &self,
407        query: &Query<E>,
408        map: impl FnOnce(SharedPreparedExecutionPlan) -> T,
409    ) -> Result<T, QueryError>
410    where
411        E: EntityKind<Canister = C>,
412    {
413        let (prepared_plan, _) = self.cached_shared_query_plan_for_entity::<E>(query)?;
414
415        Ok(map(prepared_plan))
416    }
417
418    // Compile one typed query using only the indexes currently visible for the
419    // query's recovered store.
420    pub(in crate::db) fn compile_query_with_visible_indexes<E>(
421        &self,
422        query: &Query<E>,
423    ) -> Result<CompiledQuery<E>, QueryError>
424    where
425        E: EntityKind<Canister = C>,
426    {
427        self.map_cached_shared_query_plan_for_entity(query, CompiledQuery::<E>::from_prepared_plan)
428    }
429
430    // Build one logical planned-query shell using only the indexes currently
431    // visible for the query's recovered store.
432    pub(in crate::db) fn planned_query_with_visible_indexes<E>(
433        &self,
434        query: &Query<E>,
435    ) -> Result<PlannedQuery<E>, QueryError>
436    where
437        E: EntityKind<Canister = C>,
438    {
439        self.map_cached_shared_query_plan_for_entity(query, PlannedQuery::<E>::from_prepared_plan)
440    }
441
442    // Project one logical explain payload using only planner-visible indexes.
443    pub(in crate::db) fn explain_query_with_visible_indexes<E>(
444        &self,
445        query: &Query<E>,
446    ) -> Result<ExplainPlan, QueryError>
447    where
448        E: EntityKind<Canister = C>,
449    {
450        self.with_query_visible_indexes(query, Query::<E>::explain_with_visible_indexes)
451    }
452
453    // Hash one typed query plan using only the indexes currently visible for
454    // the query's recovered store.
455    pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
456        &self,
457        query: &Query<E>,
458    ) -> Result<String, QueryError>
459    where
460        E: EntityKind<Canister = C>,
461    {
462        self.with_query_visible_indexes(query, Query::<E>::plan_hash_hex_with_visible_indexes)
463    }
464
465    // Explain one load execution shape using only planner-visible
466    // indexes from the recovered store state.
467    pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
468        &self,
469        query: &Query<E>,
470    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
471    where
472        E: EntityValue + EntityKind<Canister = C>,
473    {
474        self.with_query_visible_indexes(query, Query::<E>::explain_execution_with_visible_indexes)
475    }
476
477    // Render one load execution descriptor plus route diagnostics using
478    // only planner-visible indexes from the recovered store state.
479    pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
480        &self,
481        query: &Query<E>,
482    ) -> Result<String, QueryError>
483    where
484        E: EntityValue + EntityKind<Canister = C>,
485    {
486        self.with_query_visible_indexes(
487            query,
488            Query::<E>::explain_execution_verbose_with_visible_indexes,
489        )
490    }
491
492    // Explain one prepared fluent aggregate terminal using only
493    // planner-visible indexes from the recovered store state.
494    pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
495        &self,
496        query: &Query<E>,
497        strategy: &S,
498    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
499    where
500        E: EntityValue + EntityKind<Canister = C>,
501        S: PreparedFluentAggregateExplainStrategy,
502    {
503        self.with_query_visible_indexes(query, |query, visible_indexes| {
504            query
505                .explain_prepared_aggregate_terminal_with_visible_indexes(visible_indexes, strategy)
506        })
507    }
508
509    // Explain one `bytes_by(field)` terminal using only planner-visible
510    // indexes from the recovered store state.
511    pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
512        &self,
513        query: &Query<E>,
514        target_field: &str,
515    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
516    where
517        E: EntityValue + EntityKind<Canister = C>,
518    {
519        self.with_query_visible_indexes(query, |query, visible_indexes| {
520            query.explain_bytes_by_with_visible_indexes(visible_indexes, target_field)
521        })
522    }
523
524    // Explain one prepared fluent projection terminal using only
525    // planner-visible indexes from the recovered store state.
526    pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
527        &self,
528        query: &Query<E>,
529        strategy: &PreparedFluentProjectionStrategy,
530    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
531    where
532        E: EntityValue + EntityKind<Canister = C>,
533    {
534        self.with_query_visible_indexes(query, |query, visible_indexes| {
535            query.explain_prepared_projection_terminal_with_visible_indexes(
536                visible_indexes,
537                strategy,
538            )
539        })
540    }
541
542    // Validate that one execution strategy is admissible for scalar paged load
543    // execution and fail closed on grouped/primary-key-only routes.
544    fn ensure_scalar_paged_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
545        match family {
546            ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
547                CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
548            )),
549            ExecutionFamily::Ordered => Ok(()),
550            ExecutionFamily::Grouped => Err(QueryError::invariant(
551                "grouped queries execute via execute(), not page().execute()",
552            )),
553        }
554    }
555
556    // Validate that one execution strategy is admissible for the grouped
557    // execution surface.
558    fn ensure_grouped_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
559        match family {
560            ExecutionFamily::Grouped => Ok(()),
561            ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
562                "grouped execution requires grouped logical plans",
563            )),
564        }
565    }
566
567    // Finalize one grouped cursor page into the outward grouped execution
568    // payload so grouped cursor encoding and continuation-shape validation
569    // stay owned by the session boundary.
570    fn finalize_grouped_execution_page(
571        page: GroupedCursorPage,
572        trace: Option<ExecutionTrace>,
573    ) -> Result<PagedGroupedExecutionWithTrace, QueryError> {
574        let next_cursor = page
575            .next_cursor
576            .map(|token| {
577                let Some(token) = token.as_grouped() else {
578                    return Err(QueryError::grouped_paged_emitted_scalar_continuation());
579                };
580
581                token.encode().map_err(|err| {
582                    QueryError::serialize_internal(format!(
583                        "failed to serialize grouped continuation cursor: {err}"
584                    ))
585                })
586            })
587            .transpose()?;
588
589        Ok(PagedGroupedExecutionWithTrace::new(
590            page.rows,
591            next_cursor,
592            trace,
593        ))
594    }
595
596    /// Execute one scalar load/delete query and return materialized response rows.
597    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
598    where
599        E: PersistedRow<Canister = C> + EntityValue,
600    {
601        // Phase 1: compile typed intent into one prepared execution-plan contract.
602        let mode = query.mode();
603        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
604
605        // Phase 2: delegate execution to the shared compiled-plan entry path.
606        self.execute_query_dyn(mode, plan)
607    }
608
609    /// Execute one typed query while reporting the compile/execute split at
610    /// the shared fluent query seam.
611    #[cfg(feature = "diagnostics")]
612    #[doc(hidden)]
613    #[expect(
614        clippy::too_many_lines,
615        reason = "the diagnostics-only attribution path keeps grouped and scalar execution on one explicit compile/execute accounting seam"
616    )]
617    pub fn execute_query_result_with_attribution<E>(
618        &self,
619        query: &Query<E>,
620    ) -> Result<(LoadQueryResult<E>, QueryExecutionAttribution), QueryError>
621    where
622        E: PersistedRow<Canister = C> + EntityValue,
623    {
624        // Phase 1: measure compile work at the typed/fluent boundary,
625        // including the shared lower query-plan cache lookup/build exactly
626        // once. This preserves honest hit/miss attribution without
627        // double-building plans on one-shot cache misses.
628        let (compile_local_instructions, plan_and_cache) =
629            measure_query_stage(|| self.cached_prepared_query_plan_for_entity::<E>(query));
630        let (plan, cache_attribution) = plan_and_cache?;
631
632        // Phase 2: execute one query result using the prepared plan produced
633        // by the compile/cache boundary above.
634        let (execute_local_instructions, result) = measure_query_stage(
635            || -> Result<(LoadQueryResult<E>, QueryExecutePhaseAttribution, u64), QueryError> {
636                if query.has_grouping() {
637                    let (page, trace, phase_attribution) =
638                        self.execute_grouped_plan_with(plan, None, |executor, plan, cursor| {
639                            executor
640                                .execute_grouped_paged_with_cursor_traced_with_phase_attribution(
641                                    plan, cursor,
642                                )
643                        })?;
644                    let grouped = Self::finalize_grouped_execution_page(page, trace)?;
645
646                    Ok((
647                        LoadQueryResult::Grouped(grouped),
648                        Self::grouped_query_execute_phase_attribution(phase_attribution),
649                        0,
650                    ))
651                } else {
652                    match query.mode() {
653                        QueryMode::Load(_) => {
654                            let (rows, phase_attribution, response_decode_local_instructions) =
655                                self.load_executor::<E>()
656                                    .execute_with_phase_attribution(plan)
657                                    .map_err(QueryError::execute)?;
658
659                            Ok((
660                                LoadQueryResult::Rows(rows),
661                                Self::scalar_query_execute_phase_attribution(phase_attribution),
662                                response_decode_local_instructions,
663                            ))
664                        }
665                        QueryMode::Delete(_) => {
666                            let result = self.execute_query_dyn(query.mode(), plan)?;
667
668                            Ok((
669                                LoadQueryResult::Rows(result),
670                                Self::empty_query_execute_phase_attribution(),
671                                0,
672                            ))
673                        }
674                    }
675                }
676            },
677        );
678        let (result, execute_phase_attribution, response_decode_local_instructions) = result?;
679        let total_local_instructions =
680            compile_local_instructions.saturating_add(execute_local_instructions);
681
682        Ok((
683            result,
684            QueryExecutionAttribution {
685                compile_local_instructions,
686                runtime_local_instructions: execute_phase_attribution.runtime_local_instructions,
687                finalize_local_instructions: execute_phase_attribution.finalize_local_instructions,
688                direct_data_row_scan_local_instructions: execute_phase_attribution
689                    .direct_data_row_scan_local_instructions,
690                direct_data_row_key_stream_local_instructions: execute_phase_attribution
691                    .direct_data_row_key_stream_local_instructions,
692                direct_data_row_row_read_local_instructions: execute_phase_attribution
693                    .direct_data_row_row_read_local_instructions,
694                direct_data_row_key_encode_local_instructions: execute_phase_attribution
695                    .direct_data_row_key_encode_local_instructions,
696                direct_data_row_store_get_local_instructions: execute_phase_attribution
697                    .direct_data_row_store_get_local_instructions,
698                direct_data_row_order_window_local_instructions: execute_phase_attribution
699                    .direct_data_row_order_window_local_instructions,
700                direct_data_row_page_window_local_instructions: execute_phase_attribution
701                    .direct_data_row_page_window_local_instructions,
702                grouped_stream_local_instructions: execute_phase_attribution
703                    .grouped_stream_local_instructions,
704                grouped_fold_local_instructions: execute_phase_attribution
705                    .grouped_fold_local_instructions,
706                grouped_finalize_local_instructions: execute_phase_attribution
707                    .grouped_finalize_local_instructions,
708                grouped_count_borrowed_hash_computations: execute_phase_attribution
709                    .grouped_count
710                    .borrowed_hash_computations,
711                grouped_count_bucket_candidate_checks: execute_phase_attribution
712                    .grouped_count
713                    .bucket_candidate_checks,
714                grouped_count_existing_group_hits: execute_phase_attribution
715                    .grouped_count
716                    .existing_group_hits,
717                grouped_count_new_group_inserts: execute_phase_attribution
718                    .grouped_count
719                    .new_group_inserts,
720                grouped_count_row_materialization_local_instructions: execute_phase_attribution
721                    .grouped_count
722                    .row_materialization_local_instructions,
723                grouped_count_group_lookup_local_instructions: execute_phase_attribution
724                    .grouped_count
725                    .group_lookup_local_instructions,
726                grouped_count_existing_group_update_local_instructions: execute_phase_attribution
727                    .grouped_count
728                    .existing_group_update_local_instructions,
729                grouped_count_new_group_insert_local_instructions: execute_phase_attribution
730                    .grouped_count
731                    .new_group_insert_local_instructions,
732                response_decode_local_instructions,
733                execute_local_instructions,
734                total_local_instructions,
735                shared_query_plan_cache_hits: cache_attribution.hits,
736                shared_query_plan_cache_misses: cache_attribution.misses,
737            },
738        ))
739    }
740
741    // Execute one typed query through the unified row/grouped result surface so
742    // higher layers do not need to branch on grouped shape themselves.
743    #[doc(hidden)]
744    pub fn execute_query_result<E>(
745        &self,
746        query: &Query<E>,
747    ) -> Result<LoadQueryResult<E>, QueryError>
748    where
749        E: PersistedRow<Canister = C> + EntityValue,
750    {
751        if query.has_grouping() {
752            return self
753                .execute_grouped(query, None)
754                .map(LoadQueryResult::Grouped);
755        }
756
757        self.execute_query(query).map(LoadQueryResult::Rows)
758    }
759
760    /// Execute one typed delete query and return only the affected-row count.
761    #[doc(hidden)]
762    pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
763    where
764        E: PersistedRow<Canister = C> + EntityValue,
765    {
766        // Phase 1: fail closed if the caller routes a non-delete query here.
767        if !query.mode().is_delete() {
768            return Err(QueryError::unsupported_query(
769                "delete count execution requires delete query mode",
770            ));
771        }
772
773        // Phase 2: resolve one cached prepared execution-plan contract directly
774        // from the shared lower boundary instead of rebuilding it through the
775        // typed compiled-query wrapper.
776        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
777
778        // Phase 3: execute the shared delete core while skipping response-row materialization.
779        self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
780            .map_err(QueryError::execute)
781    }
782
783    /// Execute one scalar query from one pre-built prepared execution contract.
784    ///
785    /// This is the shared compiled-plan entry boundary used by the typed
786    /// `execute_query(...)` surface and adjacent query execution facades.
787    pub(in crate::db) fn execute_query_dyn<E>(
788        &self,
789        mode: QueryMode,
790        plan: PreparedExecutionPlan<E>,
791    ) -> Result<EntityResponse<E>, QueryError>
792    where
793        E: PersistedRow<Canister = C> + EntityValue,
794    {
795        let result = match mode {
796            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
797            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
798        };
799
800        result.map_err(QueryError::execute)
801    }
802
803    // Shared load-query terminal wrapper: build plan, run under metrics, map
804    // execution errors into query-facing errors.
805    pub(in crate::db) fn execute_load_query_with<E, T>(
806        &self,
807        query: &Query<E>,
808        op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
809    ) -> Result<T, QueryError>
810    where
811        E: PersistedRow<Canister = C> + EntityValue,
812    {
813        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
814
815        self.with_metrics(|| op(self.load_executor::<E>(), plan))
816            .map_err(QueryError::execute)
817    }
818
819    /// Build one trace payload for a query without executing it.
820    ///
821    /// This lightweight surface is intended for developer diagnostics:
822    /// plan hash, access strategy summary, and planner/executor route shape.
823    pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
824    where
825        E: EntityKind<Canister = C>,
826    {
827        let (prepared_plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
828        let logical_plan = prepared_plan.logical_plan();
829        let explain = logical_plan.explain();
830        let plan_hash = logical_plan.fingerprint().to_string();
831        let access_strategy = AccessStrategy::from_plan(prepared_plan.access()).debug_summary();
832        let execution_family = match query.mode() {
833            QueryMode::Load(_) => Some(
834                prepared_plan
835                    .execution_family()
836                    .map_err(QueryError::execute)?,
837            ),
838            QueryMode::Delete(_) => None,
839        };
840
841        Ok(QueryTracePlan::new(
842            plan_hash,
843            access_strategy,
844            execution_family,
845            explain,
846        ))
847    }
848
849    /// Execute one scalar paged load query and return optional continuation cursor plus trace.
850    pub(crate) fn execute_load_query_paged_with_trace<E>(
851        &self,
852        query: &Query<E>,
853        cursor_token: Option<&str>,
854    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
855    where
856        E: PersistedRow<Canister = C> + EntityValue,
857    {
858        // Phase 1: build/validate prepared execution plan and reject grouped plans.
859        let plan = self.cached_prepared_query_plan_for_entity::<E>(query)?.0;
860        Self::ensure_scalar_paged_execution_family(
861            plan.execution_family().map_err(QueryError::execute)?,
862        )?;
863
864        // Phase 2: decode external cursor token and validate it against plan surface.
865        let cursor_bytes = decode_optional_cursor_token(cursor_token)
866            .map_err(QueryError::from_cursor_plan_error)?;
867        let cursor = plan
868            .prepare_cursor(cursor_bytes.as_deref())
869            .map_err(QueryError::from_executor_plan_error)?;
870
871        // Phase 3: execute one traced page and encode outbound continuation token.
872        let (page, trace) = self
873            .with_metrics(|| {
874                self.load_executor::<E>()
875                    .execute_paged_with_cursor_traced(plan, cursor)
876            })
877            .map_err(QueryError::execute)?;
878        let next_cursor = page
879            .next_cursor
880            .map(|token| {
881                let Some(token) = token.as_scalar() else {
882                    return Err(QueryError::scalar_paged_emitted_grouped_continuation());
883                };
884
885                token.encode().map_err(|err| {
886                    QueryError::serialize_internal(format!(
887                        "failed to serialize continuation cursor: {err}"
888                    ))
889                })
890            })
891            .transpose()?;
892
893        Ok(PagedLoadExecutionWithTrace::new(
894            page.items,
895            next_cursor,
896            trace,
897        ))
898    }
899
900    /// Execute one grouped query page with optional grouped continuation cursor.
901    ///
902    /// This is the explicit grouped execution boundary; scalar load APIs reject
903    /// grouped plans to preserve scalar response contracts.
904    pub(in crate::db) fn execute_grouped<E>(
905        &self,
906        query: &Query<E>,
907        cursor_token: Option<&str>,
908    ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
909    where
910        E: PersistedRow<Canister = C> + EntityValue,
911    {
912        // Phase 1: build the prepared execution plan once from the typed query.
913        let plan = self.cached_prepared_query_plan_for_entity::<E>(query)?.0;
914
915        // Phase 2: reuse the shared prepared grouped execution path and then
916        // finalize the outward grouped payload at the session boundary.
917        let (page, trace) = self.execute_grouped_plan_with_trace(plan, cursor_token)?;
918
919        Self::finalize_grouped_execution_page(page, trace)
920    }
921
922    // Execute one grouped prepared plan page with optional grouped cursor
923    // while letting the caller choose the final grouped-runtime dispatch.
924    fn execute_grouped_plan_with<E, T>(
925        &self,
926        plan: PreparedExecutionPlan<E>,
927        cursor_token: Option<&str>,
928        op: impl FnOnce(
929            LoadExecutor<E>,
930            PreparedExecutionPlan<E>,
931            crate::db::cursor::GroupedPlannedCursor,
932        ) -> Result<T, InternalError>,
933    ) -> Result<T, QueryError>
934    where
935        E: PersistedRow<Canister = C> + EntityValue,
936    {
937        // Phase 1: validate the prepared plan shape before decoding cursors.
938        Self::ensure_grouped_execution_family(
939            plan.execution_family().map_err(QueryError::execute)?,
940        )?;
941
942        // Phase 2: decode external grouped cursor token and validate against plan.
943        let cursor = decode_optional_grouped_cursor_token(cursor_token)
944            .map_err(QueryError::from_cursor_plan_error)?;
945        let cursor = plan
946            .prepare_grouped_cursor_token(cursor)
947            .map_err(QueryError::from_executor_plan_error)?;
948
949        // Phase 3: execute one grouped page while preserving the structural
950        // grouped cursor payload for whichever outward cursor format the caller needs.
951        self.with_metrics(|| op(self.load_executor::<E>(), plan, cursor))
952            .map_err(QueryError::execute)
953    }
954
955    // Execute one grouped prepared plan page with optional grouped cursor.
956    fn execute_grouped_plan_with_trace<E>(
957        &self,
958        plan: PreparedExecutionPlan<E>,
959        cursor_token: Option<&str>,
960    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
961    where
962        E: PersistedRow<Canister = C> + EntityValue,
963    {
964        self.execute_grouped_plan_with(plan, cursor_token, |executor, plan, cursor| {
965            executor.execute_grouped_paged_with_cursor_traced(plan, cursor)
966        })
967    }
968}
969
970impl QueryPlanCacheKey {
971    // Assemble the canonical cache-key shell once so the test and
972    // normalized-predicate constructors only decide which structural query key
973    // they feed into the shared session cache identity.
974    const fn from_authority_parts(
975        authority: crate::db::executor::EntityAuthority,
976        schema_fingerprint: CommitSchemaFingerprint,
977        visibility: QueryPlanVisibility,
978        structural_query: crate::db::query::intent::StructuralQueryCacheKey,
979        cache_method_version: u8,
980    ) -> Self {
981        Self {
982            cache_method_version,
983            entity_path: authority.entity_path(),
984            schema_fingerprint,
985            visibility,
986            structural_query,
987        }
988    }
989
990    #[cfg(test)]
991    fn for_authority_with_method_version(
992        authority: crate::db::executor::EntityAuthority,
993        schema_fingerprint: CommitSchemaFingerprint,
994        visibility: QueryPlanVisibility,
995        query: &StructuralQuery,
996        cache_method_version: u8,
997    ) -> Self {
998        Self::from_authority_parts(
999            authority,
1000            schema_fingerprint,
1001            visibility,
1002            query.structural_cache_key(),
1003            cache_method_version,
1004        )
1005    }
1006
1007    fn for_authority_with_normalized_predicate_fingerprint_and_method_version(
1008        authority: crate::db::executor::EntityAuthority,
1009        schema_fingerprint: CommitSchemaFingerprint,
1010        visibility: QueryPlanVisibility,
1011        query: &StructuralQuery,
1012        normalized_predicate_fingerprint: Option<[u8; 32]>,
1013        cache_method_version: u8,
1014    ) -> Self {
1015        Self::from_authority_parts(
1016            authority,
1017            schema_fingerprint,
1018            visibility,
1019            query.structural_cache_key_with_normalized_predicate_fingerprint(
1020                normalized_predicate_fingerprint,
1021            ),
1022            cache_method_version,
1023        )
1024    }
1025}