Skip to main content

icydb_core/db/session/
query.rs

1//! Module: db::session::query
2//! Responsibility: session-bound query planning, explain, and cursor execution
3//! helpers that recover store visibility before delegating to query-owned logic.
4//! Does not own: query intent construction or executor runtime semantics.
5//! Boundary: resolves session visibility and cursor policy before handing work to the planner/executor.
6
7#[cfg(feature = "diagnostics")]
8use crate::db::executor::{
9    GroupedCountAttribution, GroupedExecutePhaseAttribution, ScalarExecutePhaseAttribution,
10};
11use crate::{
12    db::{
13        DbSession, EntityResponse, LoadQueryResult, PagedGroupedExecutionWithTrace,
14        PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
15        TraceReuseArtifactClass, TraceReuseEvent,
16        access::AccessStrategy,
17        commit::CommitSchemaFingerprint,
18        cursor::{
19            CursorPlanError, decode_optional_cursor_token, decode_optional_grouped_cursor_token,
20        },
21        diagnostics::ExecutionTrace,
22        executor::{
23            ExecutionFamily, GroupedCursorPage, LoadExecutor, PreparedExecutionPlan,
24            SharedPreparedExecutionPlan,
25        },
26        predicate::predicate_fingerprint_normalized,
27        query::builder::{
28            PreparedFluentAggregateExplainStrategy, PreparedFluentProjectionStrategy,
29        },
30        query::explain::{
31            ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
32        },
33        query::{
34            intent::{CompiledQuery, PlannedQuery, StructuralQuery},
35            plan::{QueryMode, VisibleIndexes},
36        },
37    },
38    error::InternalError,
39    model::entity::EntityModel,
40    traits::{CanisterKind, EntityKind, EntityValue, Path},
41};
42#[cfg(feature = "diagnostics")]
43use candid::CandidType;
44#[cfg(feature = "diagnostics")]
45use serde::Deserialize;
46use std::{cell::RefCell, collections::HashMap};
47
48// Bump this when the shared lower query-plan cache key meaning changes in a
49// way that must force old in-heap entries to miss instead of aliasing.
50const SHARED_QUERY_PLAN_CACHE_METHOD_VERSION: u8 = 2;
51
52#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
53pub(in crate::db) enum QueryPlanVisibility {
54    StoreNotReady,
55    StoreReady,
56}
57
58#[derive(Clone, Debug, Eq, Hash, PartialEq)]
59pub(in crate::db) struct QueryPlanCacheKey {
60    cache_method_version: u8,
61    entity_path: &'static str,
62    schema_fingerprint: CommitSchemaFingerprint,
63    visibility: QueryPlanVisibility,
64    structural_query: crate::db::query::intent::StructuralQueryCacheKey,
65}
66
67pub(in crate::db) type QueryPlanCache = HashMap<QueryPlanCacheKey, SharedPreparedExecutionPlan>;
68
69thread_local! {
70    // Keep one in-heap query-plan cache per store registry so fresh `DbSession`
71    // facades can share prepared logical plans across update/query calls while
72    // tests and multi-registry host processes remain isolated by registry
73    // identity.
74    static QUERY_PLAN_CACHES: RefCell<HashMap<usize, QueryPlanCache>> =
75        RefCell::new(HashMap::default());
76}
77
78#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
79pub(in crate::db) struct QueryPlanCacheAttribution {
80    pub hits: u64,
81    pub misses: u64,
82}
83
84impl QueryPlanCacheAttribution {
85    #[must_use]
86    const fn hit() -> Self {
87        Self { hits: 1, misses: 0 }
88    }
89
90    #[must_use]
91    const fn miss() -> Self {
92        Self { hits: 0, misses: 1 }
93    }
94}
95
96// Map one shared query-plan cache attribution outcome onto the explicit reuse
97// event shipped in `0.109.0`.
98pub(in crate::db::session) const fn query_plan_cache_reuse_event(
99    attribution: QueryPlanCacheAttribution,
100) -> TraceReuseEvent {
101    if attribution.hits > 0 {
102        TraceReuseEvent::hit(TraceReuseArtifactClass::SharedPreparedQueryPlan)
103    } else {
104        TraceReuseEvent::miss(TraceReuseArtifactClass::SharedPreparedQueryPlan)
105    }
106}
107
108///
109/// QueryExecutionAttribution
110///
111/// QueryExecutionAttribution records the top-level compile/execute split for
112/// typed/fluent query execution at the session boundary.
113///
114#[cfg(feature = "diagnostics")]
115#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
116pub struct QueryExecutionAttribution {
117    pub compile_local_instructions: u64,
118    pub runtime_local_instructions: u64,
119    pub finalize_local_instructions: u64,
120    pub direct_data_row_scan_local_instructions: u64,
121    pub direct_data_row_key_stream_local_instructions: u64,
122    pub direct_data_row_row_read_local_instructions: u64,
123    pub direct_data_row_key_encode_local_instructions: u64,
124    pub direct_data_row_store_get_local_instructions: u64,
125    pub direct_data_row_order_window_local_instructions: u64,
126    pub direct_data_row_page_window_local_instructions: u64,
127    pub grouped_stream_local_instructions: u64,
128    pub grouped_fold_local_instructions: u64,
129    pub grouped_finalize_local_instructions: u64,
130    pub grouped_count_borrowed_hash_computations: u64,
131    pub grouped_count_bucket_candidate_checks: u64,
132    pub grouped_count_existing_group_hits: u64,
133    pub grouped_count_new_group_inserts: u64,
134    pub grouped_count_row_materialization_local_instructions: u64,
135    pub grouped_count_group_lookup_local_instructions: u64,
136    pub grouped_count_existing_group_update_local_instructions: u64,
137    pub grouped_count_new_group_insert_local_instructions: u64,
138    pub response_decode_local_instructions: u64,
139    pub execute_local_instructions: u64,
140    pub total_local_instructions: u64,
141    pub shared_query_plan_cache_hits: u64,
142    pub shared_query_plan_cache_misses: u64,
143}
144
145#[cfg(feature = "diagnostics")]
146#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
147struct QueryExecutePhaseAttribution {
148    runtime_local_instructions: u64,
149    finalize_local_instructions: u64,
150    direct_data_row_scan_local_instructions: u64,
151    direct_data_row_key_stream_local_instructions: u64,
152    direct_data_row_row_read_local_instructions: u64,
153    direct_data_row_key_encode_local_instructions: u64,
154    direct_data_row_store_get_local_instructions: u64,
155    direct_data_row_order_window_local_instructions: u64,
156    direct_data_row_page_window_local_instructions: u64,
157    grouped_stream_local_instructions: u64,
158    grouped_fold_local_instructions: u64,
159    grouped_finalize_local_instructions: u64,
160    grouped_count: GroupedCountAttribution,
161}
162
163#[cfg(feature = "diagnostics")]
164#[expect(
165    clippy::missing_const_for_fn,
166    reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
167)]
168fn read_query_local_instruction_counter() -> u64 {
169    #[cfg(target_arch = "wasm32")]
170    {
171        canic_cdk::api::performance_counter(1)
172    }
173
174    #[cfg(not(target_arch = "wasm32"))]
175    {
176        0
177    }
178}
179
180#[cfg(feature = "diagnostics")]
181fn measure_query_stage<T, E>(run: impl FnOnce() -> Result<T, E>) -> (u64, Result<T, E>) {
182    let start = read_query_local_instruction_counter();
183    let result = run();
184    let delta = read_query_local_instruction_counter().saturating_sub(start);
185
186    (delta, result)
187}
188
189impl<C: CanisterKind> DbSession<C> {
190    #[cfg(feature = "diagnostics")]
191    const fn empty_query_execute_phase_attribution() -> QueryExecutePhaseAttribution {
192        QueryExecutePhaseAttribution {
193            runtime_local_instructions: 0,
194            finalize_local_instructions: 0,
195            direct_data_row_scan_local_instructions: 0,
196            direct_data_row_key_stream_local_instructions: 0,
197            direct_data_row_row_read_local_instructions: 0,
198            direct_data_row_key_encode_local_instructions: 0,
199            direct_data_row_store_get_local_instructions: 0,
200            direct_data_row_order_window_local_instructions: 0,
201            direct_data_row_page_window_local_instructions: 0,
202            grouped_stream_local_instructions: 0,
203            grouped_fold_local_instructions: 0,
204            grouped_finalize_local_instructions: 0,
205            grouped_count: GroupedCountAttribution::none(),
206        }
207    }
208
209    #[cfg(feature = "diagnostics")]
210    const fn scalar_query_execute_phase_attribution(
211        phase: ScalarExecutePhaseAttribution,
212    ) -> QueryExecutePhaseAttribution {
213        QueryExecutePhaseAttribution {
214            runtime_local_instructions: phase.runtime_local_instructions,
215            finalize_local_instructions: phase.finalize_local_instructions,
216            direct_data_row_scan_local_instructions: phase.direct_data_row_scan_local_instructions,
217            direct_data_row_key_stream_local_instructions: phase
218                .direct_data_row_key_stream_local_instructions,
219            direct_data_row_row_read_local_instructions: phase
220                .direct_data_row_row_read_local_instructions,
221            direct_data_row_key_encode_local_instructions: phase
222                .direct_data_row_key_encode_local_instructions,
223            direct_data_row_store_get_local_instructions: phase
224                .direct_data_row_store_get_local_instructions,
225            direct_data_row_order_window_local_instructions: phase
226                .direct_data_row_order_window_local_instructions,
227            direct_data_row_page_window_local_instructions: phase
228                .direct_data_row_page_window_local_instructions,
229            grouped_stream_local_instructions: 0,
230            grouped_fold_local_instructions: 0,
231            grouped_finalize_local_instructions: 0,
232            grouped_count: GroupedCountAttribution::none(),
233        }
234    }
235
236    #[cfg(feature = "diagnostics")]
237    const fn grouped_query_execute_phase_attribution(
238        phase: GroupedExecutePhaseAttribution,
239    ) -> QueryExecutePhaseAttribution {
240        QueryExecutePhaseAttribution {
241            runtime_local_instructions: phase
242                .stream_local_instructions
243                .saturating_add(phase.fold_local_instructions),
244            finalize_local_instructions: phase.finalize_local_instructions,
245            direct_data_row_scan_local_instructions: 0,
246            direct_data_row_key_stream_local_instructions: 0,
247            direct_data_row_row_read_local_instructions: 0,
248            direct_data_row_key_encode_local_instructions: 0,
249            direct_data_row_store_get_local_instructions: 0,
250            direct_data_row_order_window_local_instructions: 0,
251            direct_data_row_page_window_local_instructions: 0,
252            grouped_stream_local_instructions: phase.stream_local_instructions,
253            grouped_fold_local_instructions: phase.fold_local_instructions,
254            grouped_finalize_local_instructions: phase.finalize_local_instructions,
255            grouped_count: phase.grouped_count,
256        }
257    }
258
259    fn with_query_plan_cache<R>(&self, f: impl FnOnce(&mut QueryPlanCache) -> R) -> R {
260        let scope_id = self.db.cache_scope_id();
261
262        QUERY_PLAN_CACHES.with(|caches| {
263            let mut caches = caches.borrow_mut();
264            let cache = caches.entry(scope_id).or_default();
265
266            f(cache)
267        })
268    }
269
270    const fn visible_indexes_for_model(
271        model: &'static EntityModel,
272        visibility: QueryPlanVisibility,
273    ) -> VisibleIndexes<'static> {
274        match visibility {
275            QueryPlanVisibility::StoreReady => VisibleIndexes::planner_visible(model.indexes()),
276            QueryPlanVisibility::StoreNotReady => VisibleIndexes::none(),
277        }
278    }
279
280    #[cfg(test)]
281    pub(in crate::db) fn query_plan_cache_len(&self) -> usize {
282        self.with_query_plan_cache(|cache| cache.len())
283    }
284
285    #[cfg(test)]
286    pub(in crate::db) fn clear_query_plan_cache_for_tests(&self) {
287        self.with_query_plan_cache(QueryPlanCache::clear);
288    }
289
290    pub(in crate::db) fn query_plan_visibility_for_store_path(
291        &self,
292        store_path: &'static str,
293    ) -> Result<QueryPlanVisibility, QueryError> {
294        let store = self
295            .db
296            .recovered_store(store_path)
297            .map_err(QueryError::execute)?;
298        let visibility = if store.index_state() == crate::db::IndexState::Ready {
299            QueryPlanVisibility::StoreReady
300        } else {
301            QueryPlanVisibility::StoreNotReady
302        };
303
304        Ok(visibility)
305    }
306
307    pub(in crate::db) fn cached_shared_query_plan_for_authority(
308        &self,
309        authority: crate::db::executor::EntityAuthority,
310        schema_fingerprint: CommitSchemaFingerprint,
311        query: &StructuralQuery,
312    ) -> Result<(SharedPreparedExecutionPlan, QueryPlanCacheAttribution), QueryError> {
313        let visibility = self.query_plan_visibility_for_store_path(authority.store_path())?;
314        let visible_indexes = Self::visible_indexes_for_model(authority.model(), visibility);
315        let planning_state = query.prepare_scalar_planning_state()?;
316        let normalized_predicate_fingerprint = planning_state
317            .normalized_predicate()
318            .map(predicate_fingerprint_normalized);
319        let cache_key =
320            QueryPlanCacheKey::for_authority_with_normalized_predicate_fingerprint_and_method_version(
321                authority,
322                schema_fingerprint,
323                visibility,
324                query,
325                normalized_predicate_fingerprint,
326                SHARED_QUERY_PLAN_CACHE_METHOD_VERSION,
327            );
328
329        {
330            let cached = self.with_query_plan_cache(|cache| cache.get(&cache_key).cloned());
331            if let Some(prepared_plan) = cached {
332                return Ok((prepared_plan, QueryPlanCacheAttribution::hit()));
333            }
334        }
335
336        let plan = query.build_plan_with_visible_indexes_from_scalar_planning_state(
337            &visible_indexes,
338            planning_state,
339        )?;
340        let prepared_plan = SharedPreparedExecutionPlan::from_plan(authority, plan);
341        self.with_query_plan_cache(|cache| {
342            cache.insert(cache_key, prepared_plan.clone());
343        });
344
345        Ok((prepared_plan, QueryPlanCacheAttribution::miss()))
346    }
347
348    #[cfg(test)]
349    pub(in crate::db) fn query_plan_cache_key_for_tests(
350        authority: crate::db::executor::EntityAuthority,
351        schema_fingerprint: CommitSchemaFingerprint,
352        visibility: QueryPlanVisibility,
353        query: &StructuralQuery,
354        cache_method_version: u8,
355    ) -> QueryPlanCacheKey {
356        QueryPlanCacheKey::for_authority_with_method_version(
357            authority,
358            schema_fingerprint,
359            visibility,
360            query,
361            cache_method_version,
362        )
363    }
364
365    // Resolve the planner-visible index slice for one typed query exactly once
366    // at the session boundary before handing execution/planning off to query-owned logic.
367    fn with_query_visible_indexes<E, T>(
368        &self,
369        query: &Query<E>,
370        op: impl FnOnce(
371            &Query<E>,
372            &crate::db::query::plan::VisibleIndexes<'static>,
373        ) -> Result<T, QueryError>,
374    ) -> Result<T, QueryError>
375    where
376        E: EntityKind<Canister = C>,
377    {
378        let visibility = self.query_plan_visibility_for_store_path(E::Store::PATH)?;
379        let visible_indexes = Self::visible_indexes_for_model(E::MODEL, visibility);
380
381        op(query, &visible_indexes)
382    }
383
384    pub(in crate::db::session) fn cached_prepared_query_plan_for_entity<E>(
385        &self,
386        query: &Query<E>,
387    ) -> Result<(PreparedExecutionPlan<E>, QueryPlanCacheAttribution), QueryError>
388    where
389        E: EntityKind<Canister = C>,
390    {
391        let (prepared_plan, attribution) = self.cached_shared_query_plan_for_entity::<E>(query)?;
392
393        Ok((prepared_plan.typed_clone::<E>(), attribution))
394    }
395
396    // Resolve one typed query through the shared lower query-plan cache using
397    // the canonical authority and schema-fingerprint pair for that entity.
398    fn cached_shared_query_plan_for_entity<E>(
399        &self,
400        query: &Query<E>,
401    ) -> Result<(SharedPreparedExecutionPlan, QueryPlanCacheAttribution), QueryError>
402    where
403        E: EntityKind<Canister = C>,
404    {
405        self.cached_shared_query_plan_for_authority(
406            crate::db::executor::EntityAuthority::for_type::<E>(),
407            crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
408            query.structural(),
409        )
410    }
411
412    // Map one typed query onto one cached lower prepared plan so query-owned
413    // planned and compiled wrappers do not each repeat the same cache lookup.
414    fn map_cached_shared_query_plan_for_entity<E, T>(
415        &self,
416        query: &Query<E>,
417        map: impl FnOnce(SharedPreparedExecutionPlan) -> T,
418    ) -> Result<T, QueryError>
419    where
420        E: EntityKind<Canister = C>,
421    {
422        let (prepared_plan, _) = self.cached_shared_query_plan_for_entity::<E>(query)?;
423
424        Ok(map(prepared_plan))
425    }
426
427    // Compile one typed query using only the indexes currently visible for the
428    // query's recovered store.
429    pub(in crate::db) fn compile_query_with_visible_indexes<E>(
430        &self,
431        query: &Query<E>,
432    ) -> Result<CompiledQuery<E>, QueryError>
433    where
434        E: EntityKind<Canister = C>,
435    {
436        self.map_cached_shared_query_plan_for_entity(query, CompiledQuery::<E>::from_prepared_plan)
437    }
438
439    // Build one logical planned-query shell using only the indexes currently
440    // visible for the query's recovered store.
441    pub(in crate::db) fn planned_query_with_visible_indexes<E>(
442        &self,
443        query: &Query<E>,
444    ) -> Result<PlannedQuery<E>, QueryError>
445    where
446        E: EntityKind<Canister = C>,
447    {
448        self.map_cached_shared_query_plan_for_entity(query, PlannedQuery::<E>::from_prepared_plan)
449    }
450
451    // Project one logical explain payload using only planner-visible indexes.
452    pub(in crate::db) fn explain_query_with_visible_indexes<E>(
453        &self,
454        query: &Query<E>,
455    ) -> Result<ExplainPlan, QueryError>
456    where
457        E: EntityKind<Canister = C>,
458    {
459        self.with_query_visible_indexes(query, Query::<E>::explain_with_visible_indexes)
460    }
461
462    // Hash one typed query plan using only the indexes currently visible for
463    // the query's recovered store.
464    pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
465        &self,
466        query: &Query<E>,
467    ) -> Result<String, QueryError>
468    where
469        E: EntityKind<Canister = C>,
470    {
471        self.with_query_visible_indexes(query, Query::<E>::plan_hash_hex_with_visible_indexes)
472    }
473
474    // Explain one load execution shape using only planner-visible
475    // indexes from the recovered store state.
476    pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
477        &self,
478        query: &Query<E>,
479    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
480    where
481        E: EntityValue + EntityKind<Canister = C>,
482    {
483        self.with_query_visible_indexes(query, Query::<E>::explain_execution_with_visible_indexes)
484    }
485
486    // Render one load execution descriptor plus route diagnostics using
487    // only planner-visible indexes from the recovered store state.
488    pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
489        &self,
490        query: &Query<E>,
491    ) -> Result<String, QueryError>
492    where
493        E: EntityValue + EntityKind<Canister = C>,
494    {
495        self.with_query_visible_indexes(query, |query, visible_indexes| {
496            let (prepared_plan, cache_attribution) =
497                self.cached_prepared_query_plan_for_entity(query)?;
498            let mut plan = prepared_plan.logical_plan().clone();
499
500            // Freeze the same planner-owned explain access-choice snapshot used
501            // by the direct non-cached explain path before rendering verbose
502            // diagnostics from the reused logical plan.
503            plan.finalize_access_choice_for_model_with_indexes(
504                query.structural().model(),
505                visible_indexes.as_slice(),
506            );
507
508            query
509                .structural()
510                .finalized_execution_diagnostics_from_plan_with_descriptor_mutator(
511                    &plan,
512                    Some(query_plan_cache_reuse_event(cache_attribution)),
513                    |_| {},
514                )
515                .map(|diagnostics| diagnostics.render_text_verbose())
516        })
517    }
518
519    // Explain one prepared fluent aggregate terminal using only
520    // planner-visible indexes from the recovered store state.
521    pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
522        &self,
523        query: &Query<E>,
524        strategy: &S,
525    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
526    where
527        E: EntityValue + EntityKind<Canister = C>,
528        S: PreparedFluentAggregateExplainStrategy,
529    {
530        self.with_query_visible_indexes(query, |query, visible_indexes| {
531            query
532                .explain_prepared_aggregate_terminal_with_visible_indexes(visible_indexes, strategy)
533        })
534    }
535
536    // Explain one `bytes_by(field)` terminal using only planner-visible
537    // indexes from the recovered store state.
538    pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
539        &self,
540        query: &Query<E>,
541        target_field: &str,
542    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
543    where
544        E: EntityValue + EntityKind<Canister = C>,
545    {
546        self.with_query_visible_indexes(query, |query, visible_indexes| {
547            query.explain_bytes_by_with_visible_indexes(visible_indexes, target_field)
548        })
549    }
550
551    // Explain one prepared fluent projection terminal using only
552    // planner-visible indexes from the recovered store state.
553    pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
554        &self,
555        query: &Query<E>,
556        strategy: &PreparedFluentProjectionStrategy,
557    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
558    where
559        E: EntityValue + EntityKind<Canister = C>,
560    {
561        self.with_query_visible_indexes(query, |query, visible_indexes| {
562            query.explain_prepared_projection_terminal_with_visible_indexes(
563                visible_indexes,
564                strategy,
565            )
566        })
567    }
568
569    // Validate that one execution strategy is admissible for scalar paged load
570    // execution and fail closed on grouped/primary-key-only routes.
571    fn ensure_scalar_paged_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
572        match family {
573            ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
574                CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
575            )),
576            ExecutionFamily::Ordered => Ok(()),
577            ExecutionFamily::Grouped => Err(QueryError::invariant(
578                "grouped queries execute via execute(), not page().execute()",
579            )),
580        }
581    }
582
583    // Validate that one execution strategy is admissible for the grouped
584    // execution surface.
585    fn ensure_grouped_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
586        match family {
587            ExecutionFamily::Grouped => Ok(()),
588            ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
589                "grouped execution requires grouped logical plans",
590            )),
591        }
592    }
593
594    // Finalize one grouped cursor page into the outward grouped execution
595    // payload so grouped cursor encoding and continuation-shape validation
596    // stay owned by the session boundary.
597    fn finalize_grouped_execution_page(
598        page: GroupedCursorPage,
599        trace: Option<ExecutionTrace>,
600    ) -> Result<PagedGroupedExecutionWithTrace, QueryError> {
601        let next_cursor = page
602            .next_cursor
603            .map(|token| {
604                let Some(token) = token.as_grouped() else {
605                    return Err(QueryError::grouped_paged_emitted_scalar_continuation());
606                };
607
608                token.encode().map_err(|err| {
609                    QueryError::serialize_internal(format!(
610                        "failed to serialize grouped continuation cursor: {err}"
611                    ))
612                })
613            })
614            .transpose()?;
615
616        Ok(PagedGroupedExecutionWithTrace::new(
617            page.rows,
618            next_cursor,
619            trace,
620        ))
621    }
622
623    /// Execute one scalar load/delete query and return materialized response rows.
624    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
625    where
626        E: PersistedRow<Canister = C> + EntityValue,
627    {
628        // Phase 1: compile typed intent into one prepared execution-plan contract.
629        let mode = query.mode();
630        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
631
632        // Phase 2: delegate execution to the shared compiled-plan entry path.
633        self.execute_query_dyn(mode, plan)
634    }
635
636    /// Execute one typed query while reporting the compile/execute split at
637    /// the shared fluent query seam.
638    #[cfg(feature = "diagnostics")]
639    #[doc(hidden)]
640    #[expect(
641        clippy::too_many_lines,
642        reason = "the diagnostics-only attribution path keeps grouped and scalar execution on one explicit compile/execute accounting seam"
643    )]
644    pub fn execute_query_result_with_attribution<E>(
645        &self,
646        query: &Query<E>,
647    ) -> Result<(LoadQueryResult<E>, QueryExecutionAttribution), QueryError>
648    where
649        E: PersistedRow<Canister = C> + EntityValue,
650    {
651        // Phase 1: measure compile work at the typed/fluent boundary,
652        // including the shared lower query-plan cache lookup/build exactly
653        // once. This preserves honest hit/miss attribution without
654        // double-building plans on one-shot cache misses.
655        let (compile_local_instructions, plan_and_cache) =
656            measure_query_stage(|| self.cached_prepared_query_plan_for_entity::<E>(query));
657        let (plan, cache_attribution) = plan_and_cache?;
658
659        // Phase 2: execute one query result using the prepared plan produced
660        // by the compile/cache boundary above.
661        let (execute_local_instructions, result) = measure_query_stage(
662            || -> Result<(LoadQueryResult<E>, QueryExecutePhaseAttribution, u64), QueryError> {
663                if query.has_grouping() {
664                    let (page, trace, phase_attribution) =
665                        self.execute_grouped_plan_with(plan, None, |executor, plan, cursor| {
666                            executor
667                                .execute_grouped_paged_with_cursor_traced_with_phase_attribution(
668                                    plan, cursor,
669                                )
670                        })?;
671                    let grouped = Self::finalize_grouped_execution_page(page, trace)?;
672
673                    Ok((
674                        LoadQueryResult::Grouped(grouped),
675                        Self::grouped_query_execute_phase_attribution(phase_attribution),
676                        0,
677                    ))
678                } else {
679                    match query.mode() {
680                        QueryMode::Load(_) => {
681                            let (rows, phase_attribution, response_decode_local_instructions) =
682                                self.load_executor::<E>()
683                                    .execute_with_phase_attribution(plan)
684                                    .map_err(QueryError::execute)?;
685
686                            Ok((
687                                LoadQueryResult::Rows(rows),
688                                Self::scalar_query_execute_phase_attribution(phase_attribution),
689                                response_decode_local_instructions,
690                            ))
691                        }
692                        QueryMode::Delete(_) => {
693                            let result = self.execute_query_dyn(query.mode(), plan)?;
694
695                            Ok((
696                                LoadQueryResult::Rows(result),
697                                Self::empty_query_execute_phase_attribution(),
698                                0,
699                            ))
700                        }
701                    }
702                }
703            },
704        );
705        let (result, execute_phase_attribution, response_decode_local_instructions) = result?;
706        let total_local_instructions =
707            compile_local_instructions.saturating_add(execute_local_instructions);
708
709        Ok((
710            result,
711            QueryExecutionAttribution {
712                compile_local_instructions,
713                runtime_local_instructions: execute_phase_attribution.runtime_local_instructions,
714                finalize_local_instructions: execute_phase_attribution.finalize_local_instructions,
715                direct_data_row_scan_local_instructions: execute_phase_attribution
716                    .direct_data_row_scan_local_instructions,
717                direct_data_row_key_stream_local_instructions: execute_phase_attribution
718                    .direct_data_row_key_stream_local_instructions,
719                direct_data_row_row_read_local_instructions: execute_phase_attribution
720                    .direct_data_row_row_read_local_instructions,
721                direct_data_row_key_encode_local_instructions: execute_phase_attribution
722                    .direct_data_row_key_encode_local_instructions,
723                direct_data_row_store_get_local_instructions: execute_phase_attribution
724                    .direct_data_row_store_get_local_instructions,
725                direct_data_row_order_window_local_instructions: execute_phase_attribution
726                    .direct_data_row_order_window_local_instructions,
727                direct_data_row_page_window_local_instructions: execute_phase_attribution
728                    .direct_data_row_page_window_local_instructions,
729                grouped_stream_local_instructions: execute_phase_attribution
730                    .grouped_stream_local_instructions,
731                grouped_fold_local_instructions: execute_phase_attribution
732                    .grouped_fold_local_instructions,
733                grouped_finalize_local_instructions: execute_phase_attribution
734                    .grouped_finalize_local_instructions,
735                grouped_count_borrowed_hash_computations: execute_phase_attribution
736                    .grouped_count
737                    .borrowed_hash_computations,
738                grouped_count_bucket_candidate_checks: execute_phase_attribution
739                    .grouped_count
740                    .bucket_candidate_checks,
741                grouped_count_existing_group_hits: execute_phase_attribution
742                    .grouped_count
743                    .existing_group_hits,
744                grouped_count_new_group_inserts: execute_phase_attribution
745                    .grouped_count
746                    .new_group_inserts,
747                grouped_count_row_materialization_local_instructions: execute_phase_attribution
748                    .grouped_count
749                    .row_materialization_local_instructions,
750                grouped_count_group_lookup_local_instructions: execute_phase_attribution
751                    .grouped_count
752                    .group_lookup_local_instructions,
753                grouped_count_existing_group_update_local_instructions: execute_phase_attribution
754                    .grouped_count
755                    .existing_group_update_local_instructions,
756                grouped_count_new_group_insert_local_instructions: execute_phase_attribution
757                    .grouped_count
758                    .new_group_insert_local_instructions,
759                response_decode_local_instructions,
760                execute_local_instructions,
761                total_local_instructions,
762                shared_query_plan_cache_hits: cache_attribution.hits,
763                shared_query_plan_cache_misses: cache_attribution.misses,
764            },
765        ))
766    }
767
768    // Execute one typed query through the unified row/grouped result surface so
769    // higher layers do not need to branch on grouped shape themselves.
770    #[doc(hidden)]
771    pub fn execute_query_result<E>(
772        &self,
773        query: &Query<E>,
774    ) -> Result<LoadQueryResult<E>, QueryError>
775    where
776        E: PersistedRow<Canister = C> + EntityValue,
777    {
778        if query.has_grouping() {
779            return self
780                .execute_grouped(query, None)
781                .map(LoadQueryResult::Grouped);
782        }
783
784        self.execute_query(query).map(LoadQueryResult::Rows)
785    }
786
787    /// Execute one typed delete query and return only the affected-row count.
788    #[doc(hidden)]
789    pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
790    where
791        E: PersistedRow<Canister = C> + EntityValue,
792    {
793        // Phase 1: fail closed if the caller routes a non-delete query here.
794        if !query.mode().is_delete() {
795            return Err(QueryError::unsupported_query(
796                "delete count execution requires delete query mode",
797            ));
798        }
799
800        // Phase 2: resolve one cached prepared execution-plan contract directly
801        // from the shared lower boundary instead of rebuilding it through the
802        // typed compiled-query wrapper.
803        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
804
805        // Phase 3: execute the shared delete core while skipping response-row materialization.
806        self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
807            .map_err(QueryError::execute)
808    }
809
810    /// Execute one scalar query from one pre-built prepared execution contract.
811    ///
812    /// This is the shared compiled-plan entry boundary used by the typed
813    /// `execute_query(...)` surface and adjacent query execution facades.
814    pub(in crate::db) fn execute_query_dyn<E>(
815        &self,
816        mode: QueryMode,
817        plan: PreparedExecutionPlan<E>,
818    ) -> Result<EntityResponse<E>, QueryError>
819    where
820        E: PersistedRow<Canister = C> + EntityValue,
821    {
822        let result = match mode {
823            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
824            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
825        };
826
827        result.map_err(QueryError::execute)
828    }
829
830    // Shared load-query terminal wrapper: build plan, run under metrics, map
831    // execution errors into query-facing errors.
832    pub(in crate::db) fn execute_load_query_with<E, T>(
833        &self,
834        query: &Query<E>,
835        op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
836    ) -> Result<T, QueryError>
837    where
838        E: PersistedRow<Canister = C> + EntityValue,
839    {
840        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
841
842        self.with_metrics(|| op(self.load_executor::<E>(), plan))
843            .map_err(QueryError::execute)
844    }
845
846    /// Build one trace payload for a query without executing it.
847    ///
848    /// This lightweight surface is intended for developer diagnostics:
849    /// plan hash, access strategy summary, and planner/executor route shape.
850    pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
851    where
852        E: EntityKind<Canister = C>,
853    {
854        let visibility = self.query_plan_visibility_for_store_path(E::Store::PATH)?;
855        let visible_indexes = Self::visible_indexes_for_model(E::MODEL, visibility);
856        let (prepared_plan, cache_attribution) =
857            self.cached_prepared_query_plan_for_entity::<E>(query)?;
858        let logical_plan = prepared_plan.logical_plan();
859        let explain = logical_plan.explain();
860        let plan_hash = query.plan_hash_hex_with_visible_indexes(&visible_indexes)?;
861        let access_strategy = AccessStrategy::from_plan(prepared_plan.access()).debug_summary();
862        let execution_family = match query.mode() {
863            QueryMode::Load(_) => Some(
864                prepared_plan
865                    .execution_family()
866                    .map_err(QueryError::execute)?,
867            ),
868            QueryMode::Delete(_) => None,
869        };
870        let reuse = query_plan_cache_reuse_event(cache_attribution);
871
872        Ok(QueryTracePlan::new(
873            plan_hash,
874            access_strategy,
875            execution_family,
876            reuse,
877            explain,
878        ))
879    }
880
881    /// Execute one scalar paged load query and return optional continuation cursor plus trace.
882    pub(crate) fn execute_load_query_paged_with_trace<E>(
883        &self,
884        query: &Query<E>,
885        cursor_token: Option<&str>,
886    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
887    where
888        E: PersistedRow<Canister = C> + EntityValue,
889    {
890        // Phase 1: build/validate prepared execution plan and reject grouped plans.
891        let plan = self.cached_prepared_query_plan_for_entity::<E>(query)?.0;
892        Self::ensure_scalar_paged_execution_family(
893            plan.execution_family().map_err(QueryError::execute)?,
894        )?;
895
896        // Phase 2: decode external cursor token and validate it against plan surface.
897        let cursor_bytes = decode_optional_cursor_token(cursor_token)
898            .map_err(QueryError::from_cursor_plan_error)?;
899        let cursor = plan
900            .prepare_cursor(cursor_bytes.as_deref())
901            .map_err(QueryError::from_executor_plan_error)?;
902
903        // Phase 3: execute one traced page and encode outbound continuation token.
904        let (page, trace) = self
905            .with_metrics(|| {
906                self.load_executor::<E>()
907                    .execute_paged_with_cursor_traced(plan, cursor)
908            })
909            .map_err(QueryError::execute)?;
910        let next_cursor = page
911            .next_cursor
912            .map(|token| {
913                let Some(token) = token.as_scalar() else {
914                    return Err(QueryError::scalar_paged_emitted_grouped_continuation());
915                };
916
917                token.encode().map_err(|err| {
918                    QueryError::serialize_internal(format!(
919                        "failed to serialize continuation cursor: {err}"
920                    ))
921                })
922            })
923            .transpose()?;
924
925        Ok(PagedLoadExecutionWithTrace::new(
926            page.items,
927            next_cursor,
928            trace,
929        ))
930    }
931
932    /// Execute one grouped query page with optional grouped continuation cursor.
933    ///
934    /// This is the explicit grouped execution boundary; scalar load APIs reject
935    /// grouped plans to preserve scalar response contracts.
936    pub(in crate::db) fn execute_grouped<E>(
937        &self,
938        query: &Query<E>,
939        cursor_token: Option<&str>,
940    ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
941    where
942        E: PersistedRow<Canister = C> + EntityValue,
943    {
944        // Phase 1: build the prepared execution plan once from the typed query.
945        let plan = self.cached_prepared_query_plan_for_entity::<E>(query)?.0;
946
947        // Phase 2: reuse the shared prepared grouped execution path and then
948        // finalize the outward grouped payload at the session boundary.
949        let (page, trace) = self.execute_grouped_plan_with_trace(plan, cursor_token)?;
950
951        Self::finalize_grouped_execution_page(page, trace)
952    }
953
954    // Execute one grouped prepared plan page with optional grouped cursor
955    // while letting the caller choose the final grouped-runtime dispatch.
956    fn execute_grouped_plan_with<E, T>(
957        &self,
958        plan: PreparedExecutionPlan<E>,
959        cursor_token: Option<&str>,
960        op: impl FnOnce(
961            LoadExecutor<E>,
962            PreparedExecutionPlan<E>,
963            crate::db::cursor::GroupedPlannedCursor,
964        ) -> Result<T, InternalError>,
965    ) -> Result<T, QueryError>
966    where
967        E: PersistedRow<Canister = C> + EntityValue,
968    {
969        // Phase 1: validate the prepared plan shape before decoding cursors.
970        Self::ensure_grouped_execution_family(
971            plan.execution_family().map_err(QueryError::execute)?,
972        )?;
973
974        // Phase 2: decode external grouped cursor token and validate against plan.
975        let cursor = decode_optional_grouped_cursor_token(cursor_token)
976            .map_err(QueryError::from_cursor_plan_error)?;
977        let cursor = plan
978            .prepare_grouped_cursor_token(cursor)
979            .map_err(QueryError::from_executor_plan_error)?;
980
981        // Phase 3: execute one grouped page while preserving the structural
982        // grouped cursor payload for whichever outward cursor format the caller needs.
983        self.with_metrics(|| op(self.load_executor::<E>(), plan, cursor))
984            .map_err(QueryError::execute)
985    }
986
987    // Execute one grouped prepared plan page with optional grouped cursor.
988    fn execute_grouped_plan_with_trace<E>(
989        &self,
990        plan: PreparedExecutionPlan<E>,
991        cursor_token: Option<&str>,
992    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
993    where
994        E: PersistedRow<Canister = C> + EntityValue,
995    {
996        self.execute_grouped_plan_with(plan, cursor_token, |executor, plan, cursor| {
997            executor.execute_grouped_paged_with_cursor_traced(plan, cursor)
998        })
999    }
1000}
1001
1002impl QueryPlanCacheKey {
1003    // Assemble the canonical cache-key shell once so the test and
1004    // normalized-predicate constructors only decide which structural query key
1005    // they feed into the shared session cache identity.
1006    const fn from_authority_parts(
1007        authority: crate::db::executor::EntityAuthority,
1008        schema_fingerprint: CommitSchemaFingerprint,
1009        visibility: QueryPlanVisibility,
1010        structural_query: crate::db::query::intent::StructuralQueryCacheKey,
1011        cache_method_version: u8,
1012    ) -> Self {
1013        Self {
1014            cache_method_version,
1015            entity_path: authority.entity_path(),
1016            schema_fingerprint,
1017            visibility,
1018            structural_query,
1019        }
1020    }
1021
1022    #[cfg(test)]
1023    fn for_authority_with_method_version(
1024        authority: crate::db::executor::EntityAuthority,
1025        schema_fingerprint: CommitSchemaFingerprint,
1026        visibility: QueryPlanVisibility,
1027        query: &StructuralQuery,
1028        cache_method_version: u8,
1029    ) -> Self {
1030        Self::from_authority_parts(
1031            authority,
1032            schema_fingerprint,
1033            visibility,
1034            query.structural_cache_key(),
1035            cache_method_version,
1036        )
1037    }
1038
1039    fn for_authority_with_normalized_predicate_fingerprint_and_method_version(
1040        authority: crate::db::executor::EntityAuthority,
1041        schema_fingerprint: CommitSchemaFingerprint,
1042        visibility: QueryPlanVisibility,
1043        query: &StructuralQuery,
1044        normalized_predicate_fingerprint: Option<[u8; 32]>,
1045        cache_method_version: u8,
1046    ) -> Self {
1047        Self::from_authority_parts(
1048            authority,
1049            schema_fingerprint,
1050            visibility,
1051            query.structural_cache_key_with_normalized_predicate_fingerprint(
1052                normalized_predicate_fingerprint,
1053            ),
1054            cache_method_version,
1055        )
1056    }
1057}