Skip to main content

icydb_core/db/session/
query.rs

1//! Module: db::session::query
2//! Responsibility: session-bound query planning, explain, and cursor execution
3//! helpers that recover store visibility before delegating to query-owned logic.
4//! Does not own: query intent construction or executor runtime semantics.
5//! Boundary: resolves session visibility and cursor policy before handing work to the planner/executor.
6
7#[cfg(feature = "diagnostics")]
8use crate::db::executor::{
9    GroupedCountAttribution, GroupedExecutePhaseAttribution, ScalarExecutePhaseAttribution,
10};
11use crate::{
12    db::{
13        DbSession, EntityResponse, LoadQueryResult, PagedGroupedExecutionWithTrace,
14        PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
15        access::AccessStrategy,
16        commit::CommitSchemaFingerprint,
17        cursor::{
18            CursorPlanError, decode_optional_cursor_token, decode_optional_grouped_cursor_token,
19        },
20        diagnostics::ExecutionTrace,
21        executor::{
22            ExecutionFamily, GroupedCursorPage, LoadExecutor, PreparedExecutionPlan,
23            SharedPreparedExecutionPlan,
24        },
25        query::builder::{
26            PreparedFluentAggregateExplainStrategy, PreparedFluentProjectionStrategy,
27        },
28        query::explain::{
29            ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
30        },
31        query::{
32            intent::{CompiledQuery, PlannedQuery, StructuralQuery},
33            plan::{AccessPlannedQuery, QueryMode, VisibleIndexes},
34        },
35    },
36    error::InternalError,
37    model::entity::EntityModel,
38    traits::{CanisterKind, EntityKind, EntityValue, Path},
39};
40#[cfg(feature = "diagnostics")]
41use candid::CandidType;
42use icydb_utils::Xxh3;
43#[cfg(feature = "diagnostics")]
44use serde::Deserialize;
45use std::{cell::RefCell, collections::HashMap, hash::BuildHasherDefault};
46
47type CacheBuildHasher = BuildHasherDefault<Xxh3>;
48
49// Bump this when the shared lower query-plan cache key meaning changes in a
50// way that must force old in-heap entries to miss instead of aliasing.
51const SHARED_QUERY_PLAN_CACHE_METHOD_VERSION: u8 = 1;
52
53#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
54pub(in crate::db) enum QueryPlanVisibility {
55    StoreNotReady,
56    StoreReady,
57}
58
59#[derive(Clone, Debug, Eq, Hash, PartialEq)]
60pub(in crate::db) struct QueryPlanCacheKey {
61    cache_method_version: u8,
62    entity_path: &'static str,
63    schema_fingerprint: CommitSchemaFingerprint,
64    visibility: QueryPlanVisibility,
65    structural_query: crate::db::query::intent::StructuralQueryCacheKey,
66}
67
68#[derive(Clone, Debug)]
69pub(in crate::db) struct QueryPlanCacheEntry {
70    logical_plan: AccessPlannedQuery,
71    prepared_plan: SharedPreparedExecutionPlan,
72}
73
74impl QueryPlanCacheEntry {
75    #[must_use]
76    pub(in crate::db) const fn new(
77        logical_plan: AccessPlannedQuery,
78        prepared_plan: SharedPreparedExecutionPlan,
79    ) -> Self {
80        Self {
81            logical_plan,
82            prepared_plan,
83        }
84    }
85
86    #[must_use]
87    pub(in crate::db) const fn logical_plan(&self) -> &AccessPlannedQuery {
88        &self.logical_plan
89    }
90
91    #[must_use]
92    pub(in crate::db) fn typed_prepared_plan<E: EntityKind>(&self) -> PreparedExecutionPlan<E> {
93        self.prepared_plan.typed_clone::<E>()
94    }
95
96    #[must_use]
97    pub(in crate::db) const fn prepared_plan(&self) -> &SharedPreparedExecutionPlan {
98        &self.prepared_plan
99    }
100}
101
102pub(in crate::db) type QueryPlanCache =
103    HashMap<QueryPlanCacheKey, QueryPlanCacheEntry, CacheBuildHasher>;
104
105thread_local! {
106    // Keep one in-heap query-plan cache per store registry so fresh `DbSession`
107    // facades can share prepared logical plans across update/query calls while
108    // tests and multi-registry host processes remain isolated by registry
109    // identity.
110    static QUERY_PLAN_CACHES: RefCell<HashMap<usize, QueryPlanCache, CacheBuildHasher>> =
111        RefCell::new(HashMap::default());
112}
113
114#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
115pub(in crate::db) struct QueryPlanCacheAttribution {
116    pub hits: u64,
117    pub misses: u64,
118}
119
120impl QueryPlanCacheAttribution {
121    #[must_use]
122    const fn hit() -> Self {
123        Self { hits: 1, misses: 0 }
124    }
125
126    #[must_use]
127    const fn miss() -> Self {
128        Self { hits: 0, misses: 1 }
129    }
130}
131
132///
133/// QueryExecutionAttribution
134///
135/// QueryExecutionAttribution records the top-level compile/execute split for
136/// typed/fluent query execution at the session boundary.
137///
138#[cfg(feature = "diagnostics")]
139#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
140pub struct QueryExecutionAttribution {
141    pub compile_local_instructions: u64,
142    pub runtime_local_instructions: u64,
143    pub finalize_local_instructions: u64,
144    pub direct_data_row_scan_local_instructions: u64,
145    pub direct_data_row_key_stream_local_instructions: u64,
146    pub direct_data_row_row_read_local_instructions: u64,
147    pub direct_data_row_key_encode_local_instructions: u64,
148    pub direct_data_row_store_get_local_instructions: u64,
149    pub direct_data_row_order_window_local_instructions: u64,
150    pub direct_data_row_page_window_local_instructions: u64,
151    pub grouped_stream_local_instructions: u64,
152    pub grouped_fold_local_instructions: u64,
153    pub grouped_finalize_local_instructions: u64,
154    pub grouped_count_borrowed_hash_computations: u64,
155    pub grouped_count_bucket_candidate_checks: u64,
156    pub grouped_count_existing_group_hits: u64,
157    pub grouped_count_new_group_inserts: u64,
158    pub grouped_count_row_materialization_local_instructions: u64,
159    pub grouped_count_group_lookup_local_instructions: u64,
160    pub grouped_count_existing_group_update_local_instructions: u64,
161    pub grouped_count_new_group_insert_local_instructions: u64,
162    pub response_decode_local_instructions: u64,
163    pub execute_local_instructions: u64,
164    pub total_local_instructions: u64,
165    pub shared_query_plan_cache_hits: u64,
166    pub shared_query_plan_cache_misses: u64,
167}
168
169#[cfg(feature = "diagnostics")]
170#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
171struct QueryExecutePhaseAttribution {
172    runtime_local_instructions: u64,
173    finalize_local_instructions: u64,
174    direct_data_row_scan_local_instructions: u64,
175    direct_data_row_key_stream_local_instructions: u64,
176    direct_data_row_row_read_local_instructions: u64,
177    direct_data_row_key_encode_local_instructions: u64,
178    direct_data_row_store_get_local_instructions: u64,
179    direct_data_row_order_window_local_instructions: u64,
180    direct_data_row_page_window_local_instructions: u64,
181    grouped_stream_local_instructions: u64,
182    grouped_fold_local_instructions: u64,
183    grouped_finalize_local_instructions: u64,
184    grouped_count: GroupedCountAttribution,
185}
186
187#[cfg(feature = "diagnostics")]
188#[expect(
189    clippy::missing_const_for_fn,
190    reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
191)]
192fn read_query_local_instruction_counter() -> u64 {
193    #[cfg(target_arch = "wasm32")]
194    {
195        canic_cdk::api::performance_counter(1)
196    }
197
198    #[cfg(not(target_arch = "wasm32"))]
199    {
200        0
201    }
202}
203
204#[cfg(feature = "diagnostics")]
205fn measure_query_stage<T, E>(run: impl FnOnce() -> Result<T, E>) -> (u64, Result<T, E>) {
206    let start = read_query_local_instruction_counter();
207    let result = run();
208    let delta = read_query_local_instruction_counter().saturating_sub(start);
209
210    (delta, result)
211}
212
213impl<C: CanisterKind> DbSession<C> {
214    #[cfg(feature = "diagnostics")]
215    const fn empty_query_execute_phase_attribution() -> QueryExecutePhaseAttribution {
216        QueryExecutePhaseAttribution {
217            runtime_local_instructions: 0,
218            finalize_local_instructions: 0,
219            direct_data_row_scan_local_instructions: 0,
220            direct_data_row_key_stream_local_instructions: 0,
221            direct_data_row_row_read_local_instructions: 0,
222            direct_data_row_key_encode_local_instructions: 0,
223            direct_data_row_store_get_local_instructions: 0,
224            direct_data_row_order_window_local_instructions: 0,
225            direct_data_row_page_window_local_instructions: 0,
226            grouped_stream_local_instructions: 0,
227            grouped_fold_local_instructions: 0,
228            grouped_finalize_local_instructions: 0,
229            grouped_count: GroupedCountAttribution::none(),
230        }
231    }
232
233    #[cfg(feature = "diagnostics")]
234    const fn scalar_query_execute_phase_attribution(
235        phase: ScalarExecutePhaseAttribution,
236    ) -> QueryExecutePhaseAttribution {
237        QueryExecutePhaseAttribution {
238            runtime_local_instructions: phase.runtime_local_instructions,
239            finalize_local_instructions: phase.finalize_local_instructions,
240            direct_data_row_scan_local_instructions: phase.direct_data_row_scan_local_instructions,
241            direct_data_row_key_stream_local_instructions: phase
242                .direct_data_row_key_stream_local_instructions,
243            direct_data_row_row_read_local_instructions: phase
244                .direct_data_row_row_read_local_instructions,
245            direct_data_row_key_encode_local_instructions: phase
246                .direct_data_row_key_encode_local_instructions,
247            direct_data_row_store_get_local_instructions: phase
248                .direct_data_row_store_get_local_instructions,
249            direct_data_row_order_window_local_instructions: phase
250                .direct_data_row_order_window_local_instructions,
251            direct_data_row_page_window_local_instructions: phase
252                .direct_data_row_page_window_local_instructions,
253            grouped_stream_local_instructions: 0,
254            grouped_fold_local_instructions: 0,
255            grouped_finalize_local_instructions: 0,
256            grouped_count: GroupedCountAttribution::none(),
257        }
258    }
259
260    #[cfg(feature = "diagnostics")]
261    const fn grouped_query_execute_phase_attribution(
262        phase: GroupedExecutePhaseAttribution,
263    ) -> QueryExecutePhaseAttribution {
264        QueryExecutePhaseAttribution {
265            runtime_local_instructions: phase
266                .stream_local_instructions
267                .saturating_add(phase.fold_local_instructions),
268            finalize_local_instructions: phase.finalize_local_instructions,
269            direct_data_row_scan_local_instructions: 0,
270            direct_data_row_key_stream_local_instructions: 0,
271            direct_data_row_row_read_local_instructions: 0,
272            direct_data_row_key_encode_local_instructions: 0,
273            direct_data_row_store_get_local_instructions: 0,
274            direct_data_row_order_window_local_instructions: 0,
275            direct_data_row_page_window_local_instructions: 0,
276            grouped_stream_local_instructions: phase.stream_local_instructions,
277            grouped_fold_local_instructions: phase.fold_local_instructions,
278            grouped_finalize_local_instructions: phase.finalize_local_instructions,
279            grouped_count: phase.grouped_count,
280        }
281    }
282
283    fn with_query_plan_cache<R>(&self, f: impl FnOnce(&mut QueryPlanCache) -> R) -> R {
284        let scope_id = self.db.cache_scope_id();
285
286        QUERY_PLAN_CACHES.with(|caches| {
287            let mut caches = caches.borrow_mut();
288            let cache = caches.entry(scope_id).or_default();
289
290            f(cache)
291        })
292    }
293
294    const fn visible_indexes_for_model(
295        model: &'static EntityModel,
296        visibility: QueryPlanVisibility,
297    ) -> VisibleIndexes<'static> {
298        match visibility {
299            QueryPlanVisibility::StoreReady => VisibleIndexes::planner_visible(model.indexes()),
300            QueryPlanVisibility::StoreNotReady => VisibleIndexes::none(),
301        }
302    }
303
304    #[cfg(test)]
305    pub(in crate::db) fn query_plan_cache_len(&self) -> usize {
306        self.with_query_plan_cache(|cache| cache.len())
307    }
308
309    #[cfg(test)]
310    pub(in crate::db) fn clear_query_plan_cache_for_tests(&self) {
311        self.with_query_plan_cache(QueryPlanCache::clear);
312    }
313
314    pub(in crate::db) fn query_plan_visibility_for_store_path(
315        &self,
316        store_path: &'static str,
317    ) -> Result<QueryPlanVisibility, QueryError> {
318        let store = self
319            .db
320            .recovered_store(store_path)
321            .map_err(QueryError::execute)?;
322        let visibility = if store.index_state() == crate::db::IndexState::Ready {
323            QueryPlanVisibility::StoreReady
324        } else {
325            QueryPlanVisibility::StoreNotReady
326        };
327
328        Ok(visibility)
329    }
330
331    pub(in crate::db) fn cached_query_plan_entry_for_authority(
332        &self,
333        authority: crate::db::executor::EntityAuthority,
334        schema_fingerprint: CommitSchemaFingerprint,
335        query: &StructuralQuery,
336    ) -> Result<(QueryPlanCacheEntry, QueryPlanCacheAttribution), QueryError> {
337        let visibility = self.query_plan_visibility_for_store_path(authority.store_path())?;
338        let visible_indexes = Self::visible_indexes_for_model(authority.model(), visibility);
339        let normalized_predicate = query.prepare_normalized_scalar_predicate()?;
340        let cache_key =
341            QueryPlanCacheKey::for_authority_with_normalized_predicate_and_method_version(
342                authority,
343                schema_fingerprint,
344                visibility,
345                query,
346                normalized_predicate.as_ref(),
347                SHARED_QUERY_PLAN_CACHE_METHOD_VERSION,
348            );
349
350        {
351            let cached = self.with_query_plan_cache(|cache| cache.get(&cache_key).cloned());
352            if let Some(entry) = cached {
353                return Ok((entry, QueryPlanCacheAttribution::hit()));
354            }
355        }
356
357        let plan = query.build_plan_with_visible_indexes_from_normalized_predicate(
358            &visible_indexes,
359            normalized_predicate,
360        )?;
361        let entry = QueryPlanCacheEntry::new(
362            plan.clone(),
363            SharedPreparedExecutionPlan::from_plan(authority, plan),
364        );
365        self.with_query_plan_cache(|cache| {
366            cache.insert(cache_key, entry.clone());
367        });
368
369        Ok((entry, QueryPlanCacheAttribution::miss()))
370    }
371
372    #[cfg(test)]
373    pub(in crate::db) fn query_plan_cache_key_for_tests(
374        authority: crate::db::executor::EntityAuthority,
375        schema_fingerprint: CommitSchemaFingerprint,
376        visibility: QueryPlanVisibility,
377        query: &StructuralQuery,
378        cache_method_version: u8,
379    ) -> QueryPlanCacheKey {
380        QueryPlanCacheKey::for_authority_with_method_version(
381            authority,
382            schema_fingerprint,
383            visibility,
384            query,
385            cache_method_version,
386        )
387    }
388
389    // Resolve the planner-visible index slice for one typed query exactly once
390    // at the session boundary before handing execution/planning off to query-owned logic.
391    fn with_query_visible_indexes<E, T>(
392        &self,
393        query: &Query<E>,
394        op: impl FnOnce(
395            &Query<E>,
396            &crate::db::query::plan::VisibleIndexes<'static>,
397        ) -> Result<T, QueryError>,
398    ) -> Result<T, QueryError>
399    where
400        E: EntityKind<Canister = C>,
401    {
402        let visibility = self.query_plan_visibility_for_store_path(E::Store::PATH)?;
403        let visible_indexes = Self::visible_indexes_for_model(E::MODEL, visibility);
404
405        op(query, &visible_indexes)
406    }
407
408    pub(in crate::db::session) fn cached_prepared_query_plan_for_entity<E>(
409        &self,
410        query: &StructuralQuery,
411    ) -> Result<(PreparedExecutionPlan<E>, QueryPlanCacheAttribution), QueryError>
412    where
413        E: EntityKind<Canister = C>,
414    {
415        let (entry, attribution) = self.cached_query_plan_entry_for_authority(
416            crate::db::executor::EntityAuthority::for_type::<E>(),
417            crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
418            query,
419        )?;
420
421        Ok((entry.typed_prepared_plan::<E>(), attribution))
422    }
423
424    // Compile one typed query using only the indexes currently visible for the
425    // query's recovered store.
426    pub(in crate::db) fn compile_query_with_visible_indexes<E>(
427        &self,
428        query: &Query<E>,
429    ) -> Result<CompiledQuery<E>, QueryError>
430    where
431        E: EntityKind<Canister = C>,
432    {
433        let (entry, _) = self.cached_query_plan_entry_for_authority(
434            crate::db::executor::EntityAuthority::for_type::<E>(),
435            crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
436            query.structural(),
437        )?;
438
439        Ok(Query::<E>::compiled_query_from_plan(
440            entry.logical_plan().clone(),
441        ))
442    }
443
444    // Build one logical planned-query shell using only the indexes currently
445    // visible for the query's recovered store.
446    pub(in crate::db) fn planned_query_with_visible_indexes<E>(
447        &self,
448        query: &Query<E>,
449    ) -> Result<PlannedQuery<E>, QueryError>
450    where
451        E: EntityKind<Canister = C>,
452    {
453        let (entry, _) = self.cached_query_plan_entry_for_authority(
454            crate::db::executor::EntityAuthority::for_type::<E>(),
455            crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
456            query.structural(),
457        )?;
458
459        Ok(Query::<E>::planned_query_from_plan(
460            entry.logical_plan().clone(),
461        ))
462    }
463
464    // Project one logical explain payload using only planner-visible indexes.
465    pub(in crate::db) fn explain_query_with_visible_indexes<E>(
466        &self,
467        query: &Query<E>,
468    ) -> Result<ExplainPlan, QueryError>
469    where
470        E: EntityKind<Canister = C>,
471    {
472        self.with_query_visible_indexes(query, Query::<E>::explain_with_visible_indexes)
473    }
474
475    // Hash one typed query plan using only the indexes currently visible for
476    // the query's recovered store.
477    pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
478        &self,
479        query: &Query<E>,
480    ) -> Result<String, QueryError>
481    where
482        E: EntityKind<Canister = C>,
483    {
484        self.with_query_visible_indexes(query, Query::<E>::plan_hash_hex_with_visible_indexes)
485    }
486
487    // Explain one load execution shape using only planner-visible
488    // indexes from the recovered store state.
489    pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
490        &self,
491        query: &Query<E>,
492    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
493    where
494        E: EntityValue + EntityKind<Canister = C>,
495    {
496        self.with_query_visible_indexes(query, Query::<E>::explain_execution_with_visible_indexes)
497    }
498
499    // Render one load execution descriptor plus route diagnostics using
500    // only planner-visible indexes from the recovered store state.
501    pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
502        &self,
503        query: &Query<E>,
504    ) -> Result<String, QueryError>
505    where
506        E: EntityValue + EntityKind<Canister = C>,
507    {
508        self.with_query_visible_indexes(
509            query,
510            Query::<E>::explain_execution_verbose_with_visible_indexes,
511        )
512    }
513
514    // Explain one prepared fluent aggregate terminal using only
515    // planner-visible indexes from the recovered store state.
516    pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
517        &self,
518        query: &Query<E>,
519        strategy: &S,
520    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
521    where
522        E: EntityValue + EntityKind<Canister = C>,
523        S: PreparedFluentAggregateExplainStrategy,
524    {
525        self.with_query_visible_indexes(query, |query, visible_indexes| {
526            query
527                .explain_prepared_aggregate_terminal_with_visible_indexes(visible_indexes, strategy)
528        })
529    }
530
531    // Explain one `bytes_by(field)` terminal using only planner-visible
532    // indexes from the recovered store state.
533    pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
534        &self,
535        query: &Query<E>,
536        target_field: &str,
537    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
538    where
539        E: EntityValue + EntityKind<Canister = C>,
540    {
541        self.with_query_visible_indexes(query, |query, visible_indexes| {
542            query.explain_bytes_by_with_visible_indexes(visible_indexes, target_field)
543        })
544    }
545
546    // Explain one prepared fluent projection terminal using only
547    // planner-visible indexes from the recovered store state.
548    pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
549        &self,
550        query: &Query<E>,
551        strategy: &PreparedFluentProjectionStrategy,
552    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
553    where
554        E: EntityValue + EntityKind<Canister = C>,
555    {
556        self.with_query_visible_indexes(query, |query, visible_indexes| {
557            query.explain_prepared_projection_terminal_with_visible_indexes(
558                visible_indexes,
559                strategy,
560            )
561        })
562    }
563
564    // Validate that one execution strategy is admissible for scalar paged load
565    // execution and fail closed on grouped/primary-key-only routes.
566    fn ensure_scalar_paged_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
567        match family {
568            ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
569                CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
570            )),
571            ExecutionFamily::Ordered => Ok(()),
572            ExecutionFamily::Grouped => Err(QueryError::invariant(
573                "grouped queries execute via execute(), not page().execute()",
574            )),
575        }
576    }
577
578    // Validate that one execution strategy is admissible for the grouped
579    // execution surface.
580    fn ensure_grouped_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
581        match family {
582            ExecutionFamily::Grouped => Ok(()),
583            ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
584                "grouped execution requires grouped logical plans",
585            )),
586        }
587    }
588
589    // Finalize one grouped cursor page into the outward grouped execution
590    // payload so grouped cursor encoding and continuation-shape validation
591    // stay owned by the session boundary.
592    fn finalize_grouped_execution_page(
593        page: GroupedCursorPage,
594        trace: Option<ExecutionTrace>,
595    ) -> Result<PagedGroupedExecutionWithTrace, QueryError> {
596        let next_cursor = page
597            .next_cursor
598            .map(|token| {
599                let Some(token) = token.as_grouped() else {
600                    return Err(QueryError::grouped_paged_emitted_scalar_continuation());
601                };
602
603                token.encode().map_err(|err| {
604                    QueryError::serialize_internal(format!(
605                        "failed to serialize grouped continuation cursor: {err}"
606                    ))
607                })
608            })
609            .transpose()?;
610
611        Ok(PagedGroupedExecutionWithTrace::new(
612            page.rows,
613            next_cursor,
614            trace,
615        ))
616    }
617
618    // Execute one prepared grouped query result while returning the same empty
619    // scalar-attribution shell used by the grouped execution path in the
620    // diagnostics surface.
621    #[cfg(feature = "diagnostics")]
622    fn execute_grouped_query_result_with_attribution<E>(
623        &self,
624        plan: PreparedExecutionPlan<E>,
625    ) -> Result<(LoadQueryResult<E>, QueryExecutePhaseAttribution, u64), QueryError>
626    where
627        E: PersistedRow<Canister = C> + EntityValue,
628    {
629        let (page, trace, phase_attribution) =
630            self.execute_grouped_plan_with_trace_with_phase_attribution(plan, None)?;
631        let grouped = Self::finalize_grouped_execution_page(page, trace)?;
632
633        Ok((
634            LoadQueryResult::Grouped(grouped),
635            Self::grouped_query_execute_phase_attribution(phase_attribution),
636            0,
637        ))
638    }
639
640    // Execute one non-grouped prepared query result while normalizing load vs
641    // delete output into the shared outward load-result contract plus the
642    // scalar execution attribution tuple expected by the perf surface.
643    #[cfg(feature = "diagnostics")]
644    fn execute_scalar_query_result_with_attribution<E>(
645        &self,
646        mode: QueryMode,
647        plan: PreparedExecutionPlan<E>,
648    ) -> Result<(LoadQueryResult<E>, QueryExecutePhaseAttribution, u64), QueryError>
649    where
650        E: PersistedRow<Canister = C> + EntityValue,
651    {
652        match mode {
653            QueryMode::Load(_) => {
654                let (rows, phase_attribution, response_decode_local_instructions) = self
655                    .load_executor::<E>()
656                    .execute_with_phase_attribution(plan)
657                    .map_err(QueryError::execute)?;
658
659                Ok((
660                    LoadQueryResult::Rows(rows),
661                    Self::scalar_query_execute_phase_attribution(phase_attribution),
662                    response_decode_local_instructions,
663                ))
664            }
665            QueryMode::Delete(_) => {
666                let result = self.execute_query_dyn(mode, plan)?;
667
668                Ok((
669                    LoadQueryResult::Rows(result),
670                    Self::empty_query_execute_phase_attribution(),
671                    0,
672                ))
673            }
674        }
675    }
676
677    /// Execute one scalar load/delete query and return materialized response rows.
678    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
679    where
680        E: PersistedRow<Canister = C> + EntityValue,
681    {
682        // Phase 1: compile typed intent into one prepared execution-plan contract.
683        let mode = query.mode();
684        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
685
686        // Phase 2: delegate execution to the shared compiled-plan entry path.
687        self.execute_query_dyn(mode, plan)
688    }
689
690    /// Execute one typed query while reporting the compile/execute split at
691    /// the shared fluent query seam.
692    #[cfg(feature = "diagnostics")]
693    #[doc(hidden)]
694    pub fn execute_query_result_with_attribution<E>(
695        &self,
696        query: &Query<E>,
697    ) -> Result<(LoadQueryResult<E>, QueryExecutionAttribution), QueryError>
698    where
699        E: PersistedRow<Canister = C> + EntityValue,
700    {
701        // Phase 1: measure compile work at the typed/fluent boundary,
702        // including the shared lower query-plan cache lookup/build exactly
703        // once. This preserves honest hit/miss attribution without
704        // double-building plans on one-shot cache misses.
705        let (compile_local_instructions, plan_and_cache) = measure_query_stage(|| {
706            self.cached_prepared_query_plan_for_entity::<E>(query.structural())
707        });
708        let (plan, cache_attribution) = plan_and_cache?;
709
710        // Phase 2: execute one query result using the prepared plan produced
711        // by the compile/cache boundary above.
712        let (execute_local_instructions, result) = measure_query_stage(|| {
713            if query.has_grouping() {
714                self.execute_grouped_query_result_with_attribution(plan)
715            } else {
716                self.execute_scalar_query_result_with_attribution(query.mode(), plan)
717            }
718        });
719        let (result, execute_phase_attribution, response_decode_local_instructions) = result?;
720        let total_local_instructions =
721            compile_local_instructions.saturating_add(execute_local_instructions);
722
723        Ok((
724            result,
725            QueryExecutionAttribution {
726                compile_local_instructions,
727                runtime_local_instructions: execute_phase_attribution.runtime_local_instructions,
728                finalize_local_instructions: execute_phase_attribution.finalize_local_instructions,
729                direct_data_row_scan_local_instructions: execute_phase_attribution
730                    .direct_data_row_scan_local_instructions,
731                direct_data_row_key_stream_local_instructions: execute_phase_attribution
732                    .direct_data_row_key_stream_local_instructions,
733                direct_data_row_row_read_local_instructions: execute_phase_attribution
734                    .direct_data_row_row_read_local_instructions,
735                direct_data_row_key_encode_local_instructions: execute_phase_attribution
736                    .direct_data_row_key_encode_local_instructions,
737                direct_data_row_store_get_local_instructions: execute_phase_attribution
738                    .direct_data_row_store_get_local_instructions,
739                direct_data_row_order_window_local_instructions: execute_phase_attribution
740                    .direct_data_row_order_window_local_instructions,
741                direct_data_row_page_window_local_instructions: execute_phase_attribution
742                    .direct_data_row_page_window_local_instructions,
743                grouped_stream_local_instructions: execute_phase_attribution
744                    .grouped_stream_local_instructions,
745                grouped_fold_local_instructions: execute_phase_attribution
746                    .grouped_fold_local_instructions,
747                grouped_finalize_local_instructions: execute_phase_attribution
748                    .grouped_finalize_local_instructions,
749                grouped_count_borrowed_hash_computations: execute_phase_attribution
750                    .grouped_count
751                    .borrowed_hash_computations,
752                grouped_count_bucket_candidate_checks: execute_phase_attribution
753                    .grouped_count
754                    .bucket_candidate_checks,
755                grouped_count_existing_group_hits: execute_phase_attribution
756                    .grouped_count
757                    .existing_group_hits,
758                grouped_count_new_group_inserts: execute_phase_attribution
759                    .grouped_count
760                    .new_group_inserts,
761                grouped_count_row_materialization_local_instructions: execute_phase_attribution
762                    .grouped_count
763                    .row_materialization_local_instructions,
764                grouped_count_group_lookup_local_instructions: execute_phase_attribution
765                    .grouped_count
766                    .group_lookup_local_instructions,
767                grouped_count_existing_group_update_local_instructions: execute_phase_attribution
768                    .grouped_count
769                    .existing_group_update_local_instructions,
770                grouped_count_new_group_insert_local_instructions: execute_phase_attribution
771                    .grouped_count
772                    .new_group_insert_local_instructions,
773                response_decode_local_instructions,
774                execute_local_instructions,
775                total_local_instructions,
776                shared_query_plan_cache_hits: cache_attribution.hits,
777                shared_query_plan_cache_misses: cache_attribution.misses,
778            },
779        ))
780    }
781
782    // Execute one typed query through the unified row/grouped result surface so
783    // higher layers do not need to branch on grouped shape themselves.
784    #[doc(hidden)]
785    pub fn execute_query_result<E>(
786        &self,
787        query: &Query<E>,
788    ) -> Result<LoadQueryResult<E>, QueryError>
789    where
790        E: PersistedRow<Canister = C> + EntityValue,
791    {
792        if query.has_grouping() {
793            return self
794                .execute_grouped(query, None)
795                .map(LoadQueryResult::Grouped);
796        }
797
798        self.execute_query(query).map(LoadQueryResult::Rows)
799    }
800
801    /// Execute one typed delete query and return only the affected-row count.
802    #[doc(hidden)]
803    pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
804    where
805        E: PersistedRow<Canister = C> + EntityValue,
806    {
807        // Phase 1: fail closed if the caller routes a non-delete query here.
808        if !query.mode().is_delete() {
809            return Err(QueryError::unsupported_query(
810                "delete count execution requires delete query mode",
811            ));
812        }
813
814        // Phase 2: compile typed delete intent into one prepared execution-plan contract.
815        let plan = self
816            .compile_query_with_visible_indexes(query)?
817            .into_prepared_execution_plan();
818
819        // Phase 3: execute the shared delete core while skipping response-row materialization.
820        self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
821            .map_err(QueryError::execute)
822    }
823
824    /// Execute one scalar query from one pre-built prepared execution contract.
825    ///
826    /// This is the shared compiled-plan entry boundary used by the typed
827    /// `execute_query(...)` surface and adjacent query execution facades.
828    pub(in crate::db) fn execute_query_dyn<E>(
829        &self,
830        mode: QueryMode,
831        plan: PreparedExecutionPlan<E>,
832    ) -> Result<EntityResponse<E>, QueryError>
833    where
834        E: PersistedRow<Canister = C> + EntityValue,
835    {
836        let result = match mode {
837            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
838            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
839        };
840
841        result.map_err(QueryError::execute)
842    }
843
844    // Shared load-query terminal wrapper: build plan, run under metrics, map
845    // execution errors into query-facing errors.
846    pub(in crate::db) fn execute_load_query_with<E, T>(
847        &self,
848        query: &Query<E>,
849        op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
850    ) -> Result<T, QueryError>
851    where
852        E: PersistedRow<Canister = C> + EntityValue,
853    {
854        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
855
856        self.with_metrics(|| op(self.load_executor::<E>(), plan))
857            .map_err(QueryError::execute)
858    }
859
860    /// Build one trace payload for a query without executing it.
861    ///
862    /// This lightweight surface is intended for developer diagnostics:
863    /// plan hash, access strategy summary, and planner/executor route shape.
864    pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
865    where
866        E: EntityKind<Canister = C>,
867    {
868        let compiled = self.compile_query_with_visible_indexes(query)?;
869        let explain = compiled.explain();
870        let plan_hash = compiled.plan_hash_hex();
871
872        let (executable, _) =
873            self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
874        let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
875        let execution_family = match query.mode() {
876            QueryMode::Load(_) => Some(executable.execution_family().map_err(QueryError::execute)?),
877            QueryMode::Delete(_) => None,
878        };
879
880        Ok(QueryTracePlan::new(
881            plan_hash,
882            access_strategy,
883            execution_family,
884            explain,
885        ))
886    }
887
888    /// Execute one scalar paged load query and return optional continuation cursor plus trace.
889    pub(crate) fn execute_load_query_paged_with_trace<E>(
890        &self,
891        query: &Query<E>,
892        cursor_token: Option<&str>,
893    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
894    where
895        E: PersistedRow<Canister = C> + EntityValue,
896    {
897        // Phase 1: build/validate prepared execution plan and reject grouped plans.
898        let plan = self
899            .cached_prepared_query_plan_for_entity::<E>(query.structural())?
900            .0;
901        Self::ensure_scalar_paged_execution_family(
902            plan.execution_family().map_err(QueryError::execute)?,
903        )?;
904
905        // Phase 2: decode external cursor token and validate it against plan surface.
906        let cursor_bytes = decode_optional_cursor_token(cursor_token)
907            .map_err(QueryError::from_cursor_plan_error)?;
908        let cursor = plan
909            .prepare_cursor(cursor_bytes.as_deref())
910            .map_err(QueryError::from_executor_plan_error)?;
911
912        // Phase 3: execute one traced page and encode outbound continuation token.
913        let (page, trace) = self
914            .with_metrics(|| {
915                self.load_executor::<E>()
916                    .execute_paged_with_cursor_traced(plan, cursor)
917            })
918            .map_err(QueryError::execute)?;
919        let next_cursor = page
920            .next_cursor
921            .map(|token| {
922                let Some(token) = token.as_scalar() else {
923                    return Err(QueryError::scalar_paged_emitted_grouped_continuation());
924                };
925
926                token.encode().map_err(|err| {
927                    QueryError::serialize_internal(format!(
928                        "failed to serialize continuation cursor: {err}"
929                    ))
930                })
931            })
932            .transpose()?;
933
934        Ok(PagedLoadExecutionWithTrace::new(
935            page.items,
936            next_cursor,
937            trace,
938        ))
939    }
940
941    /// Execute one grouped query page with optional grouped continuation cursor.
942    ///
943    /// This is the explicit grouped execution boundary; scalar load APIs reject
944    /// grouped plans to preserve scalar response contracts.
945    pub(in crate::db) fn execute_grouped<E>(
946        &self,
947        query: &Query<E>,
948        cursor_token: Option<&str>,
949    ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
950    where
951        E: PersistedRow<Canister = C> + EntityValue,
952    {
953        // Phase 1: build the prepared execution plan once from the typed query.
954        let plan = self
955            .cached_prepared_query_plan_for_entity::<E>(query.structural())?
956            .0;
957
958        // Phase 2: reuse the shared prepared grouped execution path and then
959        // finalize the outward grouped payload at the session boundary.
960        let (page, trace) = self.execute_grouped_plan_with_trace(plan, cursor_token)?;
961
962        Self::finalize_grouped_execution_page(page, trace)
963    }
964
965    // Execute one grouped prepared plan page with optional grouped cursor.
966    fn execute_grouped_plan_with_trace<E>(
967        &self,
968        plan: PreparedExecutionPlan<E>,
969        cursor_token: Option<&str>,
970    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
971    where
972        E: PersistedRow<Canister = C> + EntityValue,
973    {
974        // Phase 1: validate the prepared plan shape before decoding cursors.
975        Self::ensure_grouped_execution_family(
976            plan.execution_family().map_err(QueryError::execute)?,
977        )?;
978
979        // Phase 2: decode external grouped cursor token and validate against plan.
980        let cursor = decode_optional_grouped_cursor_token(cursor_token)
981            .map_err(QueryError::from_cursor_plan_error)?;
982        let cursor = plan
983            .prepare_grouped_cursor_token(cursor)
984            .map_err(QueryError::from_executor_plan_error)?;
985
986        // Phase 3: execute one grouped page while preserving the structural
987        // grouped cursor payload for whichever outward cursor format the caller needs.
988        self.with_metrics(|| {
989            self.load_executor::<E>()
990                .execute_grouped_paged_with_cursor_traced(plan, cursor)
991        })
992        .map_err(QueryError::execute)
993    }
994
995    #[cfg(feature = "diagnostics")]
996    fn execute_grouped_plan_with_trace_with_phase_attribution<E>(
997        &self,
998        plan: PreparedExecutionPlan<E>,
999        cursor_token: Option<&str>,
1000    ) -> Result<
1001        (
1002            GroupedCursorPage,
1003            Option<ExecutionTrace>,
1004            GroupedExecutePhaseAttribution,
1005        ),
1006        QueryError,
1007    >
1008    where
1009        E: PersistedRow<Canister = C> + EntityValue,
1010    {
1011        Self::ensure_grouped_execution_family(
1012            plan.execution_family().map_err(QueryError::execute)?,
1013        )?;
1014
1015        let cursor = decode_optional_grouped_cursor_token(cursor_token)
1016            .map_err(QueryError::from_cursor_plan_error)?;
1017        let cursor = plan
1018            .prepare_grouped_cursor_token(cursor)
1019            .map_err(QueryError::from_executor_plan_error)?;
1020
1021        self.with_metrics(|| {
1022            self.load_executor::<E>()
1023                .execute_grouped_paged_with_cursor_traced_with_phase_attribution(plan, cursor)
1024        })
1025        .map_err(QueryError::execute)
1026    }
1027}
1028
1029impl QueryPlanCacheKey {
1030    // Assemble the canonical cache-key shell once so the test and
1031    // normalized-predicate constructors only decide which structural query key
1032    // they feed into the shared session cache identity.
1033    const fn from_authority_parts(
1034        authority: crate::db::executor::EntityAuthority,
1035        schema_fingerprint: CommitSchemaFingerprint,
1036        visibility: QueryPlanVisibility,
1037        structural_query: crate::db::query::intent::StructuralQueryCacheKey,
1038        cache_method_version: u8,
1039    ) -> Self {
1040        Self {
1041            cache_method_version,
1042            entity_path: authority.entity_path(),
1043            schema_fingerprint,
1044            visibility,
1045            structural_query,
1046        }
1047    }
1048
1049    #[cfg(test)]
1050    fn for_authority_with_method_version(
1051        authority: crate::db::executor::EntityAuthority,
1052        schema_fingerprint: CommitSchemaFingerprint,
1053        visibility: QueryPlanVisibility,
1054        query: &StructuralQuery,
1055        cache_method_version: u8,
1056    ) -> Self {
1057        Self::from_authority_parts(
1058            authority,
1059            schema_fingerprint,
1060            visibility,
1061            query.structural_cache_key(),
1062            cache_method_version,
1063        )
1064    }
1065
1066    fn for_authority_with_normalized_predicate_and_method_version(
1067        authority: crate::db::executor::EntityAuthority,
1068        schema_fingerprint: CommitSchemaFingerprint,
1069        visibility: QueryPlanVisibility,
1070        query: &StructuralQuery,
1071        normalized_predicate: Option<&crate::db::predicate::Predicate>,
1072        cache_method_version: u8,
1073    ) -> Self {
1074        Self::from_authority_parts(
1075            authority,
1076            schema_fingerprint,
1077            visibility,
1078            query.structural_cache_key_with_normalized_predicate(normalized_predicate),
1079            cache_method_version,
1080        )
1081    }
1082}