Skip to main content

icydb_core/db/session/
query.rs

1//! Module: db::session::query
2//! Responsibility: session-bound query planning, explain, and cursor execution
3//! helpers that recover store visibility before delegating to query-owned logic.
4//! Does not own: query intent construction or executor runtime semantics.
5//! Boundary: resolves session visibility and cursor policy before handing work to the planner/executor.
6
7#[cfg(feature = "diagnostics")]
8use crate::db::executor::{
9    GroupedCountAttribution, GroupedExecutePhaseAttribution, ScalarExecutePhaseAttribution,
10};
11use crate::{
12    db::{
13        DbSession, EntityResponse, LoadQueryResult, PagedGroupedExecutionWithTrace,
14        PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
15        access::AccessStrategy,
16        commit::CommitSchemaFingerprint,
17        cursor::{
18            CursorPlanError, decode_optional_cursor_token, decode_optional_grouped_cursor_token,
19        },
20        diagnostics::ExecutionTrace,
21        executor::{
22            ExecutionFamily, GroupedCursorPage, LoadExecutor, PreparedExecutionPlan,
23            SharedPreparedExecutionPlan,
24        },
25        query::builder::{
26            PreparedFluentAggregateExplainStrategy, PreparedFluentProjectionStrategy,
27        },
28        query::explain::{
29            ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
30        },
31        query::{
32            intent::{CompiledQuery, PlannedQuery, StructuralQuery},
33            plan::{AccessPlannedQuery, QueryMode, VisibleIndexes},
34        },
35    },
36    error::InternalError,
37    model::entity::EntityModel,
38    traits::{CanisterKind, EntityKind, EntityValue, Path},
39};
40#[cfg(feature = "diagnostics")]
41use candid::CandidType;
42use icydb_utils::Xxh3;
43#[cfg(feature = "diagnostics")]
44use serde::Deserialize;
45use std::{cell::RefCell, collections::HashMap, hash::BuildHasherDefault};
46
47type CacheBuildHasher = BuildHasherDefault<Xxh3>;
48
49// Bump this when the shared lower query-plan cache key meaning changes in a
50// way that must force old in-heap entries to miss instead of aliasing.
51const SHARED_QUERY_PLAN_CACHE_METHOD_VERSION: u8 = 1;
52
53#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
54pub(in crate::db) enum QueryPlanVisibility {
55    StoreNotReady,
56    StoreReady,
57}
58
59#[derive(Clone, Debug, Eq, Hash, PartialEq)]
60pub(in crate::db) struct QueryPlanCacheKey {
61    cache_method_version: u8,
62    entity_path: &'static str,
63    schema_fingerprint: CommitSchemaFingerprint,
64    visibility: QueryPlanVisibility,
65    structural_query: crate::db::query::intent::StructuralQueryCacheKey,
66}
67
68#[derive(Clone, Debug)]
69pub(in crate::db) struct QueryPlanCacheEntry {
70    logical_plan: AccessPlannedQuery,
71    prepared_plan: SharedPreparedExecutionPlan,
72}
73
74impl QueryPlanCacheEntry {
75    #[must_use]
76    pub(in crate::db) const fn new(
77        logical_plan: AccessPlannedQuery,
78        prepared_plan: SharedPreparedExecutionPlan,
79    ) -> Self {
80        Self {
81            logical_plan,
82            prepared_plan,
83        }
84    }
85
86    #[must_use]
87    pub(in crate::db) const fn logical_plan(&self) -> &AccessPlannedQuery {
88        &self.logical_plan
89    }
90
91    #[must_use]
92    pub(in crate::db) fn typed_prepared_plan<E: EntityKind>(&self) -> PreparedExecutionPlan<E> {
93        self.prepared_plan.typed_clone::<E>()
94    }
95
96    #[must_use]
97    pub(in crate::db) const fn prepared_plan(&self) -> &SharedPreparedExecutionPlan {
98        &self.prepared_plan
99    }
100}
101
102pub(in crate::db) type QueryPlanCache =
103    HashMap<QueryPlanCacheKey, QueryPlanCacheEntry, CacheBuildHasher>;
104
105thread_local! {
106    // Keep one in-heap query-plan cache per store registry so fresh `DbSession`
107    // facades can share prepared logical plans across update/query calls while
108    // tests and multi-registry host processes remain isolated by registry
109    // identity.
110    static QUERY_PLAN_CACHES: RefCell<HashMap<usize, QueryPlanCache, CacheBuildHasher>> =
111        RefCell::new(HashMap::default());
112}
113
114#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
115pub(in crate::db) struct QueryPlanCacheAttribution {
116    pub hits: u64,
117    pub misses: u64,
118}
119
120impl QueryPlanCacheAttribution {
121    #[must_use]
122    const fn hit() -> Self {
123        Self { hits: 1, misses: 0 }
124    }
125
126    #[must_use]
127    const fn miss() -> Self {
128        Self { hits: 0, misses: 1 }
129    }
130}
131
132///
133/// QueryExecutionAttribution
134///
135/// QueryExecutionAttribution records the top-level compile/execute split for
136/// typed/fluent query execution at the session boundary.
137///
138#[cfg(feature = "diagnostics")]
139#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
140pub struct QueryExecutionAttribution {
141    pub compile_local_instructions: u64,
142    pub runtime_local_instructions: u64,
143    pub finalize_local_instructions: u64,
144    pub direct_data_row_scan_local_instructions: u64,
145    pub direct_data_row_key_stream_local_instructions: u64,
146    pub direct_data_row_row_read_local_instructions: u64,
147    pub direct_data_row_key_encode_local_instructions: u64,
148    pub direct_data_row_store_get_local_instructions: u64,
149    pub direct_data_row_order_window_local_instructions: u64,
150    pub direct_data_row_page_window_local_instructions: u64,
151    pub grouped_stream_local_instructions: u64,
152    pub grouped_fold_local_instructions: u64,
153    pub grouped_finalize_local_instructions: u64,
154    pub grouped_count_borrowed_hash_computations: u64,
155    pub grouped_count_bucket_candidate_checks: u64,
156    pub grouped_count_existing_group_hits: u64,
157    pub grouped_count_new_group_inserts: u64,
158    pub grouped_count_row_materialization_local_instructions: u64,
159    pub grouped_count_group_lookup_local_instructions: u64,
160    pub grouped_count_existing_group_update_local_instructions: u64,
161    pub grouped_count_new_group_insert_local_instructions: u64,
162    pub response_decode_local_instructions: u64,
163    pub execute_local_instructions: u64,
164    pub total_local_instructions: u64,
165    pub shared_query_plan_cache_hits: u64,
166    pub shared_query_plan_cache_misses: u64,
167}
168
169#[cfg(feature = "diagnostics")]
170#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
171struct QueryExecutePhaseAttribution {
172    runtime_local_instructions: u64,
173    finalize_local_instructions: u64,
174    direct_data_row_scan_local_instructions: u64,
175    direct_data_row_key_stream_local_instructions: u64,
176    direct_data_row_row_read_local_instructions: u64,
177    direct_data_row_key_encode_local_instructions: u64,
178    direct_data_row_store_get_local_instructions: u64,
179    direct_data_row_order_window_local_instructions: u64,
180    direct_data_row_page_window_local_instructions: u64,
181    grouped_stream_local_instructions: u64,
182    grouped_fold_local_instructions: u64,
183    grouped_finalize_local_instructions: u64,
184    grouped_count: GroupedCountAttribution,
185}
186
187#[cfg(feature = "diagnostics")]
188#[expect(
189    clippy::missing_const_for_fn,
190    reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
191)]
192fn read_query_local_instruction_counter() -> u64 {
193    #[cfg(target_arch = "wasm32")]
194    {
195        canic_cdk::api::performance_counter(1)
196    }
197
198    #[cfg(not(target_arch = "wasm32"))]
199    {
200        0
201    }
202}
203
204#[cfg(feature = "diagnostics")]
205fn measure_query_stage<T, E>(run: impl FnOnce() -> Result<T, E>) -> (u64, Result<T, E>) {
206    let start = read_query_local_instruction_counter();
207    let result = run();
208    let delta = read_query_local_instruction_counter().saturating_sub(start);
209
210    (delta, result)
211}
212
213impl<C: CanisterKind> DbSession<C> {
214    #[cfg(feature = "diagnostics")]
215    const fn empty_query_execute_phase_attribution() -> QueryExecutePhaseAttribution {
216        QueryExecutePhaseAttribution {
217            runtime_local_instructions: 0,
218            finalize_local_instructions: 0,
219            direct_data_row_scan_local_instructions: 0,
220            direct_data_row_key_stream_local_instructions: 0,
221            direct_data_row_row_read_local_instructions: 0,
222            direct_data_row_key_encode_local_instructions: 0,
223            direct_data_row_store_get_local_instructions: 0,
224            direct_data_row_order_window_local_instructions: 0,
225            direct_data_row_page_window_local_instructions: 0,
226            grouped_stream_local_instructions: 0,
227            grouped_fold_local_instructions: 0,
228            grouped_finalize_local_instructions: 0,
229            grouped_count: GroupedCountAttribution::none(),
230        }
231    }
232
233    #[cfg(feature = "diagnostics")]
234    const fn scalar_query_execute_phase_attribution(
235        phase: ScalarExecutePhaseAttribution,
236    ) -> QueryExecutePhaseAttribution {
237        QueryExecutePhaseAttribution {
238            runtime_local_instructions: phase.runtime_local_instructions,
239            finalize_local_instructions: phase.finalize_local_instructions,
240            direct_data_row_scan_local_instructions: phase.direct_data_row_scan_local_instructions,
241            direct_data_row_key_stream_local_instructions: phase
242                .direct_data_row_key_stream_local_instructions,
243            direct_data_row_row_read_local_instructions: phase
244                .direct_data_row_row_read_local_instructions,
245            direct_data_row_key_encode_local_instructions: phase
246                .direct_data_row_key_encode_local_instructions,
247            direct_data_row_store_get_local_instructions: phase
248                .direct_data_row_store_get_local_instructions,
249            direct_data_row_order_window_local_instructions: phase
250                .direct_data_row_order_window_local_instructions,
251            direct_data_row_page_window_local_instructions: phase
252                .direct_data_row_page_window_local_instructions,
253            grouped_stream_local_instructions: 0,
254            grouped_fold_local_instructions: 0,
255            grouped_finalize_local_instructions: 0,
256            grouped_count: GroupedCountAttribution::none(),
257        }
258    }
259
260    #[cfg(feature = "diagnostics")]
261    const fn grouped_query_execute_phase_attribution(
262        phase: GroupedExecutePhaseAttribution,
263    ) -> QueryExecutePhaseAttribution {
264        QueryExecutePhaseAttribution {
265            runtime_local_instructions: phase
266                .stream_local_instructions
267                .saturating_add(phase.fold_local_instructions),
268            finalize_local_instructions: phase.finalize_local_instructions,
269            direct_data_row_scan_local_instructions: 0,
270            direct_data_row_key_stream_local_instructions: 0,
271            direct_data_row_row_read_local_instructions: 0,
272            direct_data_row_key_encode_local_instructions: 0,
273            direct_data_row_store_get_local_instructions: 0,
274            direct_data_row_order_window_local_instructions: 0,
275            direct_data_row_page_window_local_instructions: 0,
276            grouped_stream_local_instructions: phase.stream_local_instructions,
277            grouped_fold_local_instructions: phase.fold_local_instructions,
278            grouped_finalize_local_instructions: phase.finalize_local_instructions,
279            grouped_count: phase.grouped_count,
280        }
281    }
282
283    fn with_query_plan_cache<R>(&self, f: impl FnOnce(&mut QueryPlanCache) -> R) -> R {
284        let scope_id = self.db.cache_scope_id();
285
286        QUERY_PLAN_CACHES.with(|caches| {
287            let mut caches = caches.borrow_mut();
288            let cache = caches.entry(scope_id).or_default();
289
290            f(cache)
291        })
292    }
293
294    const fn visible_indexes_for_model(
295        model: &'static EntityModel,
296        visibility: QueryPlanVisibility,
297    ) -> VisibleIndexes<'static> {
298        match visibility {
299            QueryPlanVisibility::StoreReady => VisibleIndexes::planner_visible(model.indexes()),
300            QueryPlanVisibility::StoreNotReady => VisibleIndexes::none(),
301        }
302    }
303
304    #[cfg(test)]
305    pub(in crate::db) fn query_plan_cache_len(&self) -> usize {
306        self.with_query_plan_cache(|cache| cache.len())
307    }
308
309    #[cfg(test)]
310    pub(in crate::db) fn clear_query_plan_cache_for_tests(&self) {
311        self.with_query_plan_cache(QueryPlanCache::clear);
312    }
313
314    pub(in crate::db) fn query_plan_visibility_for_store_path(
315        &self,
316        store_path: &'static str,
317    ) -> Result<QueryPlanVisibility, QueryError> {
318        let store = self
319            .db
320            .recovered_store(store_path)
321            .map_err(QueryError::execute)?;
322        let visibility = if store.index_state() == crate::db::IndexState::Ready {
323            QueryPlanVisibility::StoreReady
324        } else {
325            QueryPlanVisibility::StoreNotReady
326        };
327
328        Ok(visibility)
329    }
330
331    pub(in crate::db) fn cached_query_plan_entry_for_authority(
332        &self,
333        authority: crate::db::executor::EntityAuthority,
334        schema_fingerprint: CommitSchemaFingerprint,
335        query: &StructuralQuery,
336    ) -> Result<(QueryPlanCacheEntry, QueryPlanCacheAttribution), QueryError> {
337        let visibility = self.query_plan_visibility_for_store_path(authority.store_path())?;
338        let visible_indexes = Self::visible_indexes_for_model(authority.model(), visibility);
339        let normalized_predicate = query.prepare_normalized_scalar_predicate()?;
340        let cache_key =
341            QueryPlanCacheKey::for_authority_with_normalized_predicate_and_method_version(
342                authority,
343                schema_fingerprint,
344                visibility,
345                query,
346                normalized_predicate.as_ref(),
347                SHARED_QUERY_PLAN_CACHE_METHOD_VERSION,
348            );
349
350        {
351            let cached = self.with_query_plan_cache(|cache| cache.get(&cache_key).cloned());
352            if let Some(entry) = cached {
353                return Ok((entry, QueryPlanCacheAttribution::hit()));
354            }
355        }
356
357        let plan = query.build_plan_with_visible_indexes_from_normalized_predicate(
358            &visible_indexes,
359            normalized_predicate,
360        )?;
361        let entry = QueryPlanCacheEntry::new(
362            plan.clone(),
363            SharedPreparedExecutionPlan::from_plan(authority, plan),
364        );
365        self.with_query_plan_cache(|cache| {
366            cache.insert(cache_key, entry.clone());
367        });
368
369        Ok((entry, QueryPlanCacheAttribution::miss()))
370    }
371
372    #[cfg(test)]
373    pub(in crate::db) fn query_plan_cache_key_for_tests(
374        authority: crate::db::executor::EntityAuthority,
375        schema_fingerprint: CommitSchemaFingerprint,
376        visibility: QueryPlanVisibility,
377        query: &StructuralQuery,
378        cache_method_version: u8,
379    ) -> QueryPlanCacheKey {
380        QueryPlanCacheKey::for_authority_with_method_version(
381            authority,
382            schema_fingerprint,
383            visibility,
384            query,
385            cache_method_version,
386        )
387    }
388
389    // Resolve the planner-visible index slice for one typed query exactly once
390    // at the session boundary before handing execution/planning off to query-owned logic.
391    fn with_query_visible_indexes<E, T>(
392        &self,
393        query: &Query<E>,
394        op: impl FnOnce(
395            &Query<E>,
396            &crate::db::query::plan::VisibleIndexes<'static>,
397        ) -> Result<T, QueryError>,
398    ) -> Result<T, QueryError>
399    where
400        E: EntityKind<Canister = C>,
401    {
402        let visibility = self.query_plan_visibility_for_store_path(E::Store::PATH)?;
403        let visible_indexes = Self::visible_indexes_for_model(E::MODEL, visibility);
404
405        op(query, &visible_indexes)
406    }
407
408    pub(in crate::db::session) fn cached_prepared_query_plan_for_entity<E>(
409        &self,
410        query: &StructuralQuery,
411    ) -> Result<(PreparedExecutionPlan<E>, QueryPlanCacheAttribution), QueryError>
412    where
413        E: EntityKind<Canister = C>,
414    {
415        let (entry, attribution) = self.cached_query_plan_entry_for_authority(
416            crate::db::executor::EntityAuthority::for_type::<E>(),
417            crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
418            query,
419        )?;
420
421        Ok((entry.typed_prepared_plan::<E>(), attribution))
422    }
423
424    // Compile one typed query using only the indexes currently visible for the
425    // query's recovered store.
426    pub(in crate::db) fn compile_query_with_visible_indexes<E>(
427        &self,
428        query: &Query<E>,
429    ) -> Result<CompiledQuery<E>, QueryError>
430    where
431        E: EntityKind<Canister = C>,
432    {
433        let (entry, _) = self.cached_query_plan_entry_for_authority(
434            crate::db::executor::EntityAuthority::for_type::<E>(),
435            crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
436            query.structural(),
437        )?;
438
439        Ok(Query::<E>::compiled_query_from_plan(
440            entry.logical_plan().clone(),
441        ))
442    }
443
444    // Build one logical planned-query shell using only the indexes currently
445    // visible for the query's recovered store.
446    pub(in crate::db) fn planned_query_with_visible_indexes<E>(
447        &self,
448        query: &Query<E>,
449    ) -> Result<PlannedQuery<E>, QueryError>
450    where
451        E: EntityKind<Canister = C>,
452    {
453        let (entry, _) = self.cached_query_plan_entry_for_authority(
454            crate::db::executor::EntityAuthority::for_type::<E>(),
455            crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
456            query.structural(),
457        )?;
458
459        Ok(Query::<E>::planned_query_from_plan(
460            entry.logical_plan().clone(),
461        ))
462    }
463
464    // Project one logical explain payload using only planner-visible indexes.
465    pub(in crate::db) fn explain_query_with_visible_indexes<E>(
466        &self,
467        query: &Query<E>,
468    ) -> Result<ExplainPlan, QueryError>
469    where
470        E: EntityKind<Canister = C>,
471    {
472        self.with_query_visible_indexes(query, Query::<E>::explain_with_visible_indexes)
473    }
474
475    // Hash one typed query plan using only the indexes currently visible for
476    // the query's recovered store.
477    pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
478        &self,
479        query: &Query<E>,
480    ) -> Result<String, QueryError>
481    where
482        E: EntityKind<Canister = C>,
483    {
484        self.with_query_visible_indexes(query, Query::<E>::plan_hash_hex_with_visible_indexes)
485    }
486
487    // Explain one load execution shape using only planner-visible
488    // indexes from the recovered store state.
489    pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
490        &self,
491        query: &Query<E>,
492    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
493    where
494        E: EntityValue + EntityKind<Canister = C>,
495    {
496        self.with_query_visible_indexes(query, Query::<E>::explain_execution_with_visible_indexes)
497    }
498
499    // Render one load execution descriptor plus route diagnostics using
500    // only planner-visible indexes from the recovered store state.
501    pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
502        &self,
503        query: &Query<E>,
504    ) -> Result<String, QueryError>
505    where
506        E: EntityValue + EntityKind<Canister = C>,
507    {
508        self.with_query_visible_indexes(
509            query,
510            Query::<E>::explain_execution_verbose_with_visible_indexes,
511        )
512    }
513
514    // Explain one prepared fluent aggregate terminal using only
515    // planner-visible indexes from the recovered store state.
516    pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
517        &self,
518        query: &Query<E>,
519        strategy: &S,
520    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
521    where
522        E: EntityValue + EntityKind<Canister = C>,
523        S: PreparedFluentAggregateExplainStrategy,
524    {
525        self.with_query_visible_indexes(query, |query, visible_indexes| {
526            query
527                .explain_prepared_aggregate_terminal_with_visible_indexes(visible_indexes, strategy)
528        })
529    }
530
531    // Explain one `bytes_by(field)` terminal using only planner-visible
532    // indexes from the recovered store state.
533    pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
534        &self,
535        query: &Query<E>,
536        target_field: &str,
537    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
538    where
539        E: EntityValue + EntityKind<Canister = C>,
540    {
541        self.with_query_visible_indexes(query, |query, visible_indexes| {
542            query.explain_bytes_by_with_visible_indexes(visible_indexes, target_field)
543        })
544    }
545
546    // Explain one prepared fluent projection terminal using only
547    // planner-visible indexes from the recovered store state.
548    pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
549        &self,
550        query: &Query<E>,
551        strategy: &PreparedFluentProjectionStrategy,
552    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
553    where
554        E: EntityValue + EntityKind<Canister = C>,
555    {
556        self.with_query_visible_indexes(query, |query, visible_indexes| {
557            query.explain_prepared_projection_terminal_with_visible_indexes(
558                visible_indexes,
559                strategy,
560            )
561        })
562    }
563
564    // Validate that one execution strategy is admissible for scalar paged load
565    // execution and fail closed on grouped/primary-key-only routes.
566    fn ensure_scalar_paged_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
567        match family {
568            ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
569                CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
570            )),
571            ExecutionFamily::Ordered => Ok(()),
572            ExecutionFamily::Grouped => Err(QueryError::invariant(
573                "grouped queries execute via execute(), not page().execute()",
574            )),
575        }
576    }
577
578    // Validate that one execution strategy is admissible for the grouped
579    // execution surface.
580    fn ensure_grouped_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
581        match family {
582            ExecutionFamily::Grouped => Ok(()),
583            ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
584                "grouped execution requires grouped logical plans",
585            )),
586        }
587    }
588
589    // Finalize one grouped cursor page into the outward grouped execution
590    // payload so grouped cursor encoding and continuation-shape validation
591    // stay owned by the session boundary.
592    fn finalize_grouped_execution_page(
593        page: GroupedCursorPage,
594        trace: Option<ExecutionTrace>,
595    ) -> Result<PagedGroupedExecutionWithTrace, QueryError> {
596        let next_cursor = page
597            .next_cursor
598            .map(|token| {
599                let Some(token) = token.as_grouped() else {
600                    return Err(QueryError::grouped_paged_emitted_scalar_continuation());
601                };
602
603                token.encode().map_err(|err| {
604                    QueryError::serialize_internal(format!(
605                        "failed to serialize grouped continuation cursor: {err}"
606                    ))
607                })
608            })
609            .transpose()?;
610
611        Ok(PagedGroupedExecutionWithTrace::new(
612            page.rows,
613            next_cursor,
614            trace,
615        ))
616    }
617
618    /// Execute one scalar load/delete query and return materialized response rows.
619    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
620    where
621        E: PersistedRow<Canister = C> + EntityValue,
622    {
623        // Phase 1: compile typed intent into one prepared execution-plan contract.
624        let mode = query.mode();
625        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
626
627        // Phase 2: delegate execution to the shared compiled-plan entry path.
628        self.execute_query_dyn(mode, plan)
629    }
630
631    /// Execute one typed query while reporting the compile/execute split at
632    /// the shared fluent query seam.
633    #[cfg(feature = "diagnostics")]
634    #[doc(hidden)]
635    #[expect(
636        clippy::too_many_lines,
637        reason = "the diagnostics-only attribution path keeps grouped and scalar execution on one explicit compile/execute accounting seam"
638    )]
639    pub fn execute_query_result_with_attribution<E>(
640        &self,
641        query: &Query<E>,
642    ) -> Result<(LoadQueryResult<E>, QueryExecutionAttribution), QueryError>
643    where
644        E: PersistedRow<Canister = C> + EntityValue,
645    {
646        // Phase 1: measure compile work at the typed/fluent boundary,
647        // including the shared lower query-plan cache lookup/build exactly
648        // once. This preserves honest hit/miss attribution without
649        // double-building plans on one-shot cache misses.
650        let (compile_local_instructions, plan_and_cache) = measure_query_stage(|| {
651            self.cached_prepared_query_plan_for_entity::<E>(query.structural())
652        });
653        let (plan, cache_attribution) = plan_and_cache?;
654
655        // Phase 2: execute one query result using the prepared plan produced
656        // by the compile/cache boundary above.
657        let (execute_local_instructions, result) = measure_query_stage(
658            || -> Result<(LoadQueryResult<E>, QueryExecutePhaseAttribution, u64), QueryError> {
659                if query.has_grouping() {
660                    let (page, trace, phase_attribution) =
661                        self.execute_grouped_plan_with(plan, None, |executor, plan, cursor| {
662                            executor
663                                .execute_grouped_paged_with_cursor_traced_with_phase_attribution(
664                                    plan, cursor,
665                                )
666                        })?;
667                    let grouped = Self::finalize_grouped_execution_page(page, trace)?;
668
669                    Ok((
670                        LoadQueryResult::Grouped(grouped),
671                        Self::grouped_query_execute_phase_attribution(phase_attribution),
672                        0,
673                    ))
674                } else {
675                    match query.mode() {
676                        QueryMode::Load(_) => {
677                            let (rows, phase_attribution, response_decode_local_instructions) =
678                                self.load_executor::<E>()
679                                    .execute_with_phase_attribution(plan)
680                                    .map_err(QueryError::execute)?;
681
682                            Ok((
683                                LoadQueryResult::Rows(rows),
684                                Self::scalar_query_execute_phase_attribution(phase_attribution),
685                                response_decode_local_instructions,
686                            ))
687                        }
688                        QueryMode::Delete(_) => {
689                            let result = self.execute_query_dyn(query.mode(), plan)?;
690
691                            Ok((
692                                LoadQueryResult::Rows(result),
693                                Self::empty_query_execute_phase_attribution(),
694                                0,
695                            ))
696                        }
697                    }
698                }
699            },
700        );
701        let (result, execute_phase_attribution, response_decode_local_instructions) = result?;
702        let total_local_instructions =
703            compile_local_instructions.saturating_add(execute_local_instructions);
704
705        Ok((
706            result,
707            QueryExecutionAttribution {
708                compile_local_instructions,
709                runtime_local_instructions: execute_phase_attribution.runtime_local_instructions,
710                finalize_local_instructions: execute_phase_attribution.finalize_local_instructions,
711                direct_data_row_scan_local_instructions: execute_phase_attribution
712                    .direct_data_row_scan_local_instructions,
713                direct_data_row_key_stream_local_instructions: execute_phase_attribution
714                    .direct_data_row_key_stream_local_instructions,
715                direct_data_row_row_read_local_instructions: execute_phase_attribution
716                    .direct_data_row_row_read_local_instructions,
717                direct_data_row_key_encode_local_instructions: execute_phase_attribution
718                    .direct_data_row_key_encode_local_instructions,
719                direct_data_row_store_get_local_instructions: execute_phase_attribution
720                    .direct_data_row_store_get_local_instructions,
721                direct_data_row_order_window_local_instructions: execute_phase_attribution
722                    .direct_data_row_order_window_local_instructions,
723                direct_data_row_page_window_local_instructions: execute_phase_attribution
724                    .direct_data_row_page_window_local_instructions,
725                grouped_stream_local_instructions: execute_phase_attribution
726                    .grouped_stream_local_instructions,
727                grouped_fold_local_instructions: execute_phase_attribution
728                    .grouped_fold_local_instructions,
729                grouped_finalize_local_instructions: execute_phase_attribution
730                    .grouped_finalize_local_instructions,
731                grouped_count_borrowed_hash_computations: execute_phase_attribution
732                    .grouped_count
733                    .borrowed_hash_computations,
734                grouped_count_bucket_candidate_checks: execute_phase_attribution
735                    .grouped_count
736                    .bucket_candidate_checks,
737                grouped_count_existing_group_hits: execute_phase_attribution
738                    .grouped_count
739                    .existing_group_hits,
740                grouped_count_new_group_inserts: execute_phase_attribution
741                    .grouped_count
742                    .new_group_inserts,
743                grouped_count_row_materialization_local_instructions: execute_phase_attribution
744                    .grouped_count
745                    .row_materialization_local_instructions,
746                grouped_count_group_lookup_local_instructions: execute_phase_attribution
747                    .grouped_count
748                    .group_lookup_local_instructions,
749                grouped_count_existing_group_update_local_instructions: execute_phase_attribution
750                    .grouped_count
751                    .existing_group_update_local_instructions,
752                grouped_count_new_group_insert_local_instructions: execute_phase_attribution
753                    .grouped_count
754                    .new_group_insert_local_instructions,
755                response_decode_local_instructions,
756                execute_local_instructions,
757                total_local_instructions,
758                shared_query_plan_cache_hits: cache_attribution.hits,
759                shared_query_plan_cache_misses: cache_attribution.misses,
760            },
761        ))
762    }
763
764    // Execute one typed query through the unified row/grouped result surface so
765    // higher layers do not need to branch on grouped shape themselves.
766    #[doc(hidden)]
767    pub fn execute_query_result<E>(
768        &self,
769        query: &Query<E>,
770    ) -> Result<LoadQueryResult<E>, QueryError>
771    where
772        E: PersistedRow<Canister = C> + EntityValue,
773    {
774        if query.has_grouping() {
775            return self
776                .execute_grouped(query, None)
777                .map(LoadQueryResult::Grouped);
778        }
779
780        self.execute_query(query).map(LoadQueryResult::Rows)
781    }
782
783    /// Execute one typed delete query and return only the affected-row count.
784    #[doc(hidden)]
785    pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
786    where
787        E: PersistedRow<Canister = C> + EntityValue,
788    {
789        // Phase 1: fail closed if the caller routes a non-delete query here.
790        if !query.mode().is_delete() {
791            return Err(QueryError::unsupported_query(
792                "delete count execution requires delete query mode",
793            ));
794        }
795
796        // Phase 2: compile typed delete intent into one prepared execution-plan contract.
797        let plan = self
798            .compile_query_with_visible_indexes(query)?
799            .into_prepared_execution_plan();
800
801        // Phase 3: execute the shared delete core while skipping response-row materialization.
802        self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
803            .map_err(QueryError::execute)
804    }
805
806    /// Execute one scalar query from one pre-built prepared execution contract.
807    ///
808    /// This is the shared compiled-plan entry boundary used by the typed
809    /// `execute_query(...)` surface and adjacent query execution facades.
810    pub(in crate::db) fn execute_query_dyn<E>(
811        &self,
812        mode: QueryMode,
813        plan: PreparedExecutionPlan<E>,
814    ) -> Result<EntityResponse<E>, QueryError>
815    where
816        E: PersistedRow<Canister = C> + EntityValue,
817    {
818        let result = match mode {
819            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
820            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
821        };
822
823        result.map_err(QueryError::execute)
824    }
825
826    // Shared load-query terminal wrapper: build plan, run under metrics, map
827    // execution errors into query-facing errors.
828    pub(in crate::db) fn execute_load_query_with<E, T>(
829        &self,
830        query: &Query<E>,
831        op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
832    ) -> Result<T, QueryError>
833    where
834        E: PersistedRow<Canister = C> + EntityValue,
835    {
836        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
837
838        self.with_metrics(|| op(self.load_executor::<E>(), plan))
839            .map_err(QueryError::execute)
840    }
841
842    /// Build one trace payload for a query without executing it.
843    ///
844    /// This lightweight surface is intended for developer diagnostics:
845    /// plan hash, access strategy summary, and planner/executor route shape.
846    pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
847    where
848        E: EntityKind<Canister = C>,
849    {
850        let compiled = self.compile_query_with_visible_indexes(query)?;
851        let explain = compiled.explain();
852        let plan_hash = compiled.plan_hash_hex();
853
854        let (executable, _) =
855            self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
856        let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
857        let execution_family = match query.mode() {
858            QueryMode::Load(_) => Some(executable.execution_family().map_err(QueryError::execute)?),
859            QueryMode::Delete(_) => None,
860        };
861
862        Ok(QueryTracePlan::new(
863            plan_hash,
864            access_strategy,
865            execution_family,
866            explain,
867        ))
868    }
869
870    /// Execute one scalar paged load query and return optional continuation cursor plus trace.
871    pub(crate) fn execute_load_query_paged_with_trace<E>(
872        &self,
873        query: &Query<E>,
874        cursor_token: Option<&str>,
875    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
876    where
877        E: PersistedRow<Canister = C> + EntityValue,
878    {
879        // Phase 1: build/validate prepared execution plan and reject grouped plans.
880        let plan = self
881            .cached_prepared_query_plan_for_entity::<E>(query.structural())?
882            .0;
883        Self::ensure_scalar_paged_execution_family(
884            plan.execution_family().map_err(QueryError::execute)?,
885        )?;
886
887        // Phase 2: decode external cursor token and validate it against plan surface.
888        let cursor_bytes = decode_optional_cursor_token(cursor_token)
889            .map_err(QueryError::from_cursor_plan_error)?;
890        let cursor = plan
891            .prepare_cursor(cursor_bytes.as_deref())
892            .map_err(QueryError::from_executor_plan_error)?;
893
894        // Phase 3: execute one traced page and encode outbound continuation token.
895        let (page, trace) = self
896            .with_metrics(|| {
897                self.load_executor::<E>()
898                    .execute_paged_with_cursor_traced(plan, cursor)
899            })
900            .map_err(QueryError::execute)?;
901        let next_cursor = page
902            .next_cursor
903            .map(|token| {
904                let Some(token) = token.as_scalar() else {
905                    return Err(QueryError::scalar_paged_emitted_grouped_continuation());
906                };
907
908                token.encode().map_err(|err| {
909                    QueryError::serialize_internal(format!(
910                        "failed to serialize continuation cursor: {err}"
911                    ))
912                })
913            })
914            .transpose()?;
915
916        Ok(PagedLoadExecutionWithTrace::new(
917            page.items,
918            next_cursor,
919            trace,
920        ))
921    }
922
923    /// Execute one grouped query page with optional grouped continuation cursor.
924    ///
925    /// This is the explicit grouped execution boundary; scalar load APIs reject
926    /// grouped plans to preserve scalar response contracts.
927    pub(in crate::db) fn execute_grouped<E>(
928        &self,
929        query: &Query<E>,
930        cursor_token: Option<&str>,
931    ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
932    where
933        E: PersistedRow<Canister = C> + EntityValue,
934    {
935        // Phase 1: build the prepared execution plan once from the typed query.
936        let plan = self
937            .cached_prepared_query_plan_for_entity::<E>(query.structural())?
938            .0;
939
940        // Phase 2: reuse the shared prepared grouped execution path and then
941        // finalize the outward grouped payload at the session boundary.
942        let (page, trace) = self.execute_grouped_plan_with_trace(plan, cursor_token)?;
943
944        Self::finalize_grouped_execution_page(page, trace)
945    }
946
947    // Execute one grouped prepared plan page with optional grouped cursor
948    // while letting the caller choose the final grouped-runtime dispatch.
949    fn execute_grouped_plan_with<E, T>(
950        &self,
951        plan: PreparedExecutionPlan<E>,
952        cursor_token: Option<&str>,
953        op: impl FnOnce(
954            LoadExecutor<E>,
955            PreparedExecutionPlan<E>,
956            crate::db::cursor::GroupedPlannedCursor,
957        ) -> Result<T, InternalError>,
958    ) -> Result<T, QueryError>
959    where
960        E: PersistedRow<Canister = C> + EntityValue,
961    {
962        // Phase 1: validate the prepared plan shape before decoding cursors.
963        Self::ensure_grouped_execution_family(
964            plan.execution_family().map_err(QueryError::execute)?,
965        )?;
966
967        // Phase 2: decode external grouped cursor token and validate against plan.
968        let cursor = decode_optional_grouped_cursor_token(cursor_token)
969            .map_err(QueryError::from_cursor_plan_error)?;
970        let cursor = plan
971            .prepare_grouped_cursor_token(cursor)
972            .map_err(QueryError::from_executor_plan_error)?;
973
974        // Phase 3: execute one grouped page while preserving the structural
975        // grouped cursor payload for whichever outward cursor format the caller needs.
976        self.with_metrics(|| op(self.load_executor::<E>(), plan, cursor))
977            .map_err(QueryError::execute)
978    }
979
980    // Execute one grouped prepared plan page with optional grouped cursor.
981    fn execute_grouped_plan_with_trace<E>(
982        &self,
983        plan: PreparedExecutionPlan<E>,
984        cursor_token: Option<&str>,
985    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
986    where
987        E: PersistedRow<Canister = C> + EntityValue,
988    {
989        self.execute_grouped_plan_with(plan, cursor_token, |executor, plan, cursor| {
990            executor.execute_grouped_paged_with_cursor_traced(plan, cursor)
991        })
992    }
993}
994
995impl QueryPlanCacheKey {
996    // Assemble the canonical cache-key shell once so the test and
997    // normalized-predicate constructors only decide which structural query key
998    // they feed into the shared session cache identity.
999    const fn from_authority_parts(
1000        authority: crate::db::executor::EntityAuthority,
1001        schema_fingerprint: CommitSchemaFingerprint,
1002        visibility: QueryPlanVisibility,
1003        structural_query: crate::db::query::intent::StructuralQueryCacheKey,
1004        cache_method_version: u8,
1005    ) -> Self {
1006        Self {
1007            cache_method_version,
1008            entity_path: authority.entity_path(),
1009            schema_fingerprint,
1010            visibility,
1011            structural_query,
1012        }
1013    }
1014
1015    #[cfg(test)]
1016    fn for_authority_with_method_version(
1017        authority: crate::db::executor::EntityAuthority,
1018        schema_fingerprint: CommitSchemaFingerprint,
1019        visibility: QueryPlanVisibility,
1020        query: &StructuralQuery,
1021        cache_method_version: u8,
1022    ) -> Self {
1023        Self::from_authority_parts(
1024            authority,
1025            schema_fingerprint,
1026            visibility,
1027            query.structural_cache_key(),
1028            cache_method_version,
1029        )
1030    }
1031
1032    fn for_authority_with_normalized_predicate_and_method_version(
1033        authority: crate::db::executor::EntityAuthority,
1034        schema_fingerprint: CommitSchemaFingerprint,
1035        visibility: QueryPlanVisibility,
1036        query: &StructuralQuery,
1037        normalized_predicate: Option<&crate::db::predicate::Predicate>,
1038        cache_method_version: u8,
1039    ) -> Self {
1040        Self::from_authority_parts(
1041            authority,
1042            schema_fingerprint,
1043            visibility,
1044            query.structural_cache_key_with_normalized_predicate(normalized_predicate),
1045            cache_method_version,
1046        )
1047    }
1048}