Skip to main content

icydb_core/db/session/
query.rs

1//! Module: db::session::query
2//! Responsibility: session-bound query planning, explain, and cursor execution
3//! helpers that recover store visibility before delegating to query-owned logic.
4//! Does not own: query intent construction or executor runtime semantics.
5//! Boundary: resolves session visibility and cursor policy before handing work to the planner/executor.
6
7#[cfg(feature = "diagnostics")]
8use crate::db::executor::{
9    GroupedCountAttribution, GroupedExecutePhaseAttribution, ScalarExecutePhaseAttribution,
10};
11use crate::{
12    db::{
13        DbSession, EntityResponse, LoadQueryResult, PagedGroupedExecutionWithTrace,
14        PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
15        TraceReuseArtifactClass, TraceReuseEvent,
16        access::AccessStrategy,
17        commit::CommitSchemaFingerprint,
18        cursor::{
19            CursorPlanError, decode_optional_cursor_token, decode_optional_grouped_cursor_token,
20        },
21        diagnostics::ExecutionTrace,
22        executor::{
23            ExecutionFamily, GroupedCursorPage, LoadExecutor, PreparedExecutionPlan,
24            SharedPreparedExecutionPlan,
25        },
26        predicate::predicate_fingerprint_normalized,
27        query::builder::{
28            PreparedFluentAggregateExplainStrategy, PreparedFluentProjectionStrategy,
29        },
30        query::explain::{
31            ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
32        },
33        query::{
34            intent::{CompiledQuery, PlannedQuery, StructuralQuery},
35            plan::{QueryMode, VisibleIndexes},
36        },
37    },
38    error::InternalError,
39    model::entity::EntityModel,
40    traits::{CanisterKind, EntityKind, EntityValue, Path},
41};
42#[cfg(feature = "diagnostics")]
43use candid::CandidType;
44#[cfg(feature = "diagnostics")]
45use serde::Deserialize;
46use std::{cell::RefCell, collections::HashMap};
47
48// Bump this when the shared lower query-plan cache key meaning changes in a
49// way that must force old in-heap entries to miss instead of aliasing.
50const SHARED_QUERY_PLAN_CACHE_METHOD_VERSION: u8 = 2;
51
52#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
53pub(in crate::db) enum QueryPlanVisibility {
54    StoreNotReady,
55    StoreReady,
56}
57
58#[derive(Clone, Debug, Eq, Hash, PartialEq)]
59pub(in crate::db) struct QueryPlanCacheKey {
60    cache_method_version: u8,
61    entity_path: &'static str,
62    schema_fingerprint: CommitSchemaFingerprint,
63    visibility: QueryPlanVisibility,
64    structural_query: crate::db::query::intent::StructuralQueryCacheKey,
65}
66
67pub(in crate::db) type QueryPlanCache = HashMap<QueryPlanCacheKey, SharedPreparedExecutionPlan>;
68
69thread_local! {
70    // Keep one in-heap query-plan cache per store registry so fresh `DbSession`
71    // facades can share prepared logical plans across update/query calls while
72    // tests and multi-registry host processes remain isolated by registry
73    // identity.
74    static QUERY_PLAN_CACHES: RefCell<HashMap<usize, QueryPlanCache>> =
75        RefCell::new(HashMap::default());
76}
77
78#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
79pub(in crate::db) struct QueryPlanCacheAttribution {
80    pub hits: u64,
81    pub misses: u64,
82}
83
84impl QueryPlanCacheAttribution {
85    #[must_use]
86    const fn hit() -> Self {
87        Self { hits: 1, misses: 0 }
88    }
89
90    #[must_use]
91    const fn miss() -> Self {
92        Self { hits: 0, misses: 1 }
93    }
94}
95
96// Map one shared query-plan cache attribution outcome onto the explicit reuse
97// event shipped in `0.109.0`.
98pub(in crate::db::session) const fn query_plan_cache_reuse_event(
99    attribution: QueryPlanCacheAttribution,
100) -> TraceReuseEvent {
101    if attribution.hits > 0 {
102        TraceReuseEvent::hit(TraceReuseArtifactClass::SharedPreparedQueryPlan)
103    } else {
104        TraceReuseEvent::miss(TraceReuseArtifactClass::SharedPreparedQueryPlan)
105    }
106}
107
108///
109/// QueryExecutionAttribution
110///
111/// QueryExecutionAttribution records the top-level compile/execute split for
112/// typed/fluent query execution at the session boundary.
113///
114#[cfg(feature = "diagnostics")]
115#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
116pub struct QueryExecutionAttribution {
117    pub compile_local_instructions: u64,
118    pub runtime_local_instructions: u64,
119    pub finalize_local_instructions: u64,
120    pub direct_data_row_scan_local_instructions: u64,
121    pub direct_data_row_key_stream_local_instructions: u64,
122    pub direct_data_row_row_read_local_instructions: u64,
123    pub direct_data_row_key_encode_local_instructions: u64,
124    pub direct_data_row_store_get_local_instructions: u64,
125    pub direct_data_row_order_window_local_instructions: u64,
126    pub direct_data_row_page_window_local_instructions: u64,
127    pub grouped_stream_local_instructions: u64,
128    pub grouped_fold_local_instructions: u64,
129    pub grouped_finalize_local_instructions: u64,
130    pub grouped_count_borrowed_hash_computations: u64,
131    pub grouped_count_bucket_candidate_checks: u64,
132    pub grouped_count_existing_group_hits: u64,
133    pub grouped_count_new_group_inserts: u64,
134    pub grouped_count_row_materialization_local_instructions: u64,
135    pub grouped_count_group_lookup_local_instructions: u64,
136    pub grouped_count_existing_group_update_local_instructions: u64,
137    pub grouped_count_new_group_insert_local_instructions: u64,
138    pub response_decode_local_instructions: u64,
139    pub execute_local_instructions: u64,
140    pub total_local_instructions: u64,
141    pub shared_query_plan_cache_hits: u64,
142    pub shared_query_plan_cache_misses: u64,
143}
144
145#[cfg(feature = "diagnostics")]
146#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
147struct QueryExecutePhaseAttribution {
148    runtime_local_instructions: u64,
149    finalize_local_instructions: u64,
150    direct_data_row_scan_local_instructions: u64,
151    direct_data_row_key_stream_local_instructions: u64,
152    direct_data_row_row_read_local_instructions: u64,
153    direct_data_row_key_encode_local_instructions: u64,
154    direct_data_row_store_get_local_instructions: u64,
155    direct_data_row_order_window_local_instructions: u64,
156    direct_data_row_page_window_local_instructions: u64,
157    grouped_stream_local_instructions: u64,
158    grouped_fold_local_instructions: u64,
159    grouped_finalize_local_instructions: u64,
160    grouped_count: GroupedCountAttribution,
161}
162
163#[cfg(feature = "diagnostics")]
164#[expect(
165    clippy::missing_const_for_fn,
166    reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
167)]
168fn read_query_local_instruction_counter() -> u64 {
169    #[cfg(target_arch = "wasm32")]
170    {
171        canic_cdk::api::performance_counter(1)
172    }
173
174    #[cfg(not(target_arch = "wasm32"))]
175    {
176        0
177    }
178}
179
180#[cfg(feature = "diagnostics")]
181fn measure_query_stage<T, E>(run: impl FnOnce() -> Result<T, E>) -> (u64, Result<T, E>) {
182    let start = read_query_local_instruction_counter();
183    let result = run();
184    let delta = read_query_local_instruction_counter().saturating_sub(start);
185
186    (delta, result)
187}
188
189impl<C: CanisterKind> DbSession<C> {
190    #[cfg(feature = "diagnostics")]
191    const fn empty_query_execute_phase_attribution() -> QueryExecutePhaseAttribution {
192        QueryExecutePhaseAttribution {
193            runtime_local_instructions: 0,
194            finalize_local_instructions: 0,
195            direct_data_row_scan_local_instructions: 0,
196            direct_data_row_key_stream_local_instructions: 0,
197            direct_data_row_row_read_local_instructions: 0,
198            direct_data_row_key_encode_local_instructions: 0,
199            direct_data_row_store_get_local_instructions: 0,
200            direct_data_row_order_window_local_instructions: 0,
201            direct_data_row_page_window_local_instructions: 0,
202            grouped_stream_local_instructions: 0,
203            grouped_fold_local_instructions: 0,
204            grouped_finalize_local_instructions: 0,
205            grouped_count: GroupedCountAttribution::none(),
206        }
207    }
208
209    #[cfg(feature = "diagnostics")]
210    const fn scalar_query_execute_phase_attribution(
211        phase: ScalarExecutePhaseAttribution,
212    ) -> QueryExecutePhaseAttribution {
213        QueryExecutePhaseAttribution {
214            runtime_local_instructions: phase.runtime_local_instructions,
215            finalize_local_instructions: phase.finalize_local_instructions,
216            direct_data_row_scan_local_instructions: phase.direct_data_row_scan_local_instructions,
217            direct_data_row_key_stream_local_instructions: phase
218                .direct_data_row_key_stream_local_instructions,
219            direct_data_row_row_read_local_instructions: phase
220                .direct_data_row_row_read_local_instructions,
221            direct_data_row_key_encode_local_instructions: phase
222                .direct_data_row_key_encode_local_instructions,
223            direct_data_row_store_get_local_instructions: phase
224                .direct_data_row_store_get_local_instructions,
225            direct_data_row_order_window_local_instructions: phase
226                .direct_data_row_order_window_local_instructions,
227            direct_data_row_page_window_local_instructions: phase
228                .direct_data_row_page_window_local_instructions,
229            grouped_stream_local_instructions: 0,
230            grouped_fold_local_instructions: 0,
231            grouped_finalize_local_instructions: 0,
232            grouped_count: GroupedCountAttribution::none(),
233        }
234    }
235
236    #[cfg(feature = "diagnostics")]
237    const fn grouped_query_execute_phase_attribution(
238        phase: GroupedExecutePhaseAttribution,
239    ) -> QueryExecutePhaseAttribution {
240        QueryExecutePhaseAttribution {
241            runtime_local_instructions: phase
242                .stream_local_instructions
243                .saturating_add(phase.fold_local_instructions),
244            finalize_local_instructions: phase.finalize_local_instructions,
245            direct_data_row_scan_local_instructions: 0,
246            direct_data_row_key_stream_local_instructions: 0,
247            direct_data_row_row_read_local_instructions: 0,
248            direct_data_row_key_encode_local_instructions: 0,
249            direct_data_row_store_get_local_instructions: 0,
250            direct_data_row_order_window_local_instructions: 0,
251            direct_data_row_page_window_local_instructions: 0,
252            grouped_stream_local_instructions: phase.stream_local_instructions,
253            grouped_fold_local_instructions: phase.fold_local_instructions,
254            grouped_finalize_local_instructions: phase.finalize_local_instructions,
255            grouped_count: phase.grouped_count,
256        }
257    }
258
259    fn with_query_plan_cache<R>(&self, f: impl FnOnce(&mut QueryPlanCache) -> R) -> R {
260        let scope_id = self.db.cache_scope_id();
261
262        QUERY_PLAN_CACHES.with(|caches| {
263            let mut caches = caches.borrow_mut();
264            let cache = caches.entry(scope_id).or_default();
265
266            f(cache)
267        })
268    }
269
270    const fn visible_indexes_for_model(
271        model: &'static EntityModel,
272        visibility: QueryPlanVisibility,
273    ) -> VisibleIndexes<'static> {
274        match visibility {
275            QueryPlanVisibility::StoreReady => VisibleIndexes::planner_visible(model.indexes()),
276            QueryPlanVisibility::StoreNotReady => VisibleIndexes::none(),
277        }
278    }
279
280    #[cfg(test)]
281    pub(in crate::db) fn query_plan_cache_len(&self) -> usize {
282        self.with_query_plan_cache(|cache| cache.len())
283    }
284
285    #[cfg(test)]
286    pub(in crate::db) fn clear_query_plan_cache_for_tests(&self) {
287        self.with_query_plan_cache(QueryPlanCache::clear);
288    }
289
290    pub(in crate::db) fn query_plan_visibility_for_store_path(
291        &self,
292        store_path: &'static str,
293    ) -> Result<QueryPlanVisibility, QueryError> {
294        let store = self
295            .db
296            .recovered_store(store_path)
297            .map_err(QueryError::execute)?;
298        let visibility = if store.index_state() == crate::db::IndexState::Ready {
299            QueryPlanVisibility::StoreReady
300        } else {
301            QueryPlanVisibility::StoreNotReady
302        };
303
304        Ok(visibility)
305    }
306
307    pub(in crate::db) fn cached_shared_query_plan_for_authority(
308        &self,
309        authority: crate::db::executor::EntityAuthority,
310        schema_fingerprint: CommitSchemaFingerprint,
311        query: &StructuralQuery,
312    ) -> Result<(SharedPreparedExecutionPlan, QueryPlanCacheAttribution), QueryError> {
313        let visibility = self.query_plan_visibility_for_store_path(authority.store_path())?;
314        let visible_indexes = Self::visible_indexes_for_model(authority.model(), visibility);
315        let planning_state = query.prepare_scalar_planning_state()?;
316        let normalized_predicate_fingerprint = planning_state
317            .normalized_predicate()
318            .map(predicate_fingerprint_normalized);
319        let cache_key =
320            QueryPlanCacheKey::for_authority_with_normalized_predicate_fingerprint_and_method_version(
321                authority,
322                schema_fingerprint,
323                visibility,
324                query,
325                normalized_predicate_fingerprint,
326                SHARED_QUERY_PLAN_CACHE_METHOD_VERSION,
327            );
328
329        {
330            let cached = self.with_query_plan_cache(|cache| cache.get(&cache_key).cloned());
331            if let Some(prepared_plan) = cached {
332                return Ok((prepared_plan, QueryPlanCacheAttribution::hit()));
333            }
334        }
335
336        let plan = query.build_plan_with_visible_indexes_from_scalar_planning_state(
337            &visible_indexes,
338            planning_state,
339        )?;
340        let prepared_plan = SharedPreparedExecutionPlan::from_plan(authority, plan);
341        self.with_query_plan_cache(|cache| {
342            cache.insert(cache_key, prepared_plan.clone());
343        });
344
345        Ok((prepared_plan, QueryPlanCacheAttribution::miss()))
346    }
347
348    #[cfg(test)]
349    pub(in crate::db) fn query_plan_cache_key_for_tests(
350        authority: crate::db::executor::EntityAuthority,
351        schema_fingerprint: CommitSchemaFingerprint,
352        visibility: QueryPlanVisibility,
353        query: &StructuralQuery,
354        cache_method_version: u8,
355    ) -> QueryPlanCacheKey {
356        QueryPlanCacheKey::for_authority_with_method_version(
357            authority,
358            schema_fingerprint,
359            visibility,
360            query,
361            cache_method_version,
362        )
363    }
364
365    // Resolve the planner-visible index slice for one typed query exactly once
366    // at the session boundary before handing execution/planning off to query-owned logic.
367    fn with_query_visible_indexes<E, T>(
368        &self,
369        query: &Query<E>,
370        op: impl FnOnce(
371            &Query<E>,
372            &crate::db::query::plan::VisibleIndexes<'static>,
373        ) -> Result<T, QueryError>,
374    ) -> Result<T, QueryError>
375    where
376        E: EntityKind<Canister = C>,
377    {
378        let visibility = self.query_plan_visibility_for_store_path(E::Store::PATH)?;
379        let visible_indexes = Self::visible_indexes_for_model(E::MODEL, visibility);
380
381        op(query, &visible_indexes)
382    }
383
384    pub(in crate::db::session) fn cached_prepared_query_plan_for_entity<E>(
385        &self,
386        query: &Query<E>,
387    ) -> Result<(PreparedExecutionPlan<E>, QueryPlanCacheAttribution), QueryError>
388    where
389        E: EntityKind<Canister = C>,
390    {
391        let (prepared_plan, attribution) = self.cached_shared_query_plan_for_entity::<E>(query)?;
392
393        Ok((prepared_plan.typed_clone::<E>(), attribution))
394    }
395
396    // Resolve one typed query through the shared lower query-plan cache using
397    // the canonical authority and schema-fingerprint pair for that entity.
398    fn cached_shared_query_plan_for_entity<E>(
399        &self,
400        query: &Query<E>,
401    ) -> Result<(SharedPreparedExecutionPlan, QueryPlanCacheAttribution), QueryError>
402    where
403        E: EntityKind<Canister = C>,
404    {
405        self.cached_shared_query_plan_for_authority(
406            crate::db::executor::EntityAuthority::for_type::<E>(),
407            crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
408            query.structural(),
409        )
410    }
411
412    // Map one typed query onto one cached lower prepared plan so query-owned
413    // planned and compiled wrappers do not each repeat the same cache lookup.
414    fn map_cached_shared_query_plan_for_entity<E, T>(
415        &self,
416        query: &Query<E>,
417        map: impl FnOnce(SharedPreparedExecutionPlan) -> T,
418    ) -> Result<T, QueryError>
419    where
420        E: EntityKind<Canister = C>,
421    {
422        let (prepared_plan, _) = self.cached_shared_query_plan_for_entity::<E>(query)?;
423
424        Ok(map(prepared_plan))
425    }
426
427    // Compile one typed query using only the indexes currently visible for the
428    // query's recovered store.
429    pub(in crate::db) fn compile_query_with_visible_indexes<E>(
430        &self,
431        query: &Query<E>,
432    ) -> Result<CompiledQuery<E>, QueryError>
433    where
434        E: EntityKind<Canister = C>,
435    {
436        self.map_cached_shared_query_plan_for_entity(query, CompiledQuery::<E>::from_prepared_plan)
437    }
438
439    // Build one logical planned-query shell using only the indexes currently
440    // visible for the query's recovered store.
441    pub(in crate::db) fn planned_query_with_visible_indexes<E>(
442        &self,
443        query: &Query<E>,
444    ) -> Result<PlannedQuery<E>, QueryError>
445    where
446        E: EntityKind<Canister = C>,
447    {
448        self.map_cached_shared_query_plan_for_entity(query, PlannedQuery::<E>::from_prepared_plan)
449    }
450
451    // Project one logical explain payload using only planner-visible indexes.
452    pub(in crate::db) fn explain_query_with_visible_indexes<E>(
453        &self,
454        query: &Query<E>,
455    ) -> Result<ExplainPlan, QueryError>
456    where
457        E: EntityKind<Canister = C>,
458    {
459        self.with_query_visible_indexes(query, Query::<E>::explain_with_visible_indexes)
460    }
461
462    // Hash one typed query plan using only the indexes currently visible for
463    // the query's recovered store.
464    pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
465        &self,
466        query: &Query<E>,
467    ) -> Result<String, QueryError>
468    where
469        E: EntityKind<Canister = C>,
470    {
471        self.with_query_visible_indexes(query, Query::<E>::plan_hash_hex_with_visible_indexes)
472    }
473
474    // Explain one load execution shape using only planner-visible
475    // indexes from the recovered store state.
476    pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
477        &self,
478        query: &Query<E>,
479    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
480    where
481        E: EntityValue + EntityKind<Canister = C>,
482    {
483        self.with_query_visible_indexes(query, Query::<E>::explain_execution_with_visible_indexes)
484    }
485
486    // Render one load execution descriptor plus route diagnostics using
487    // only planner-visible indexes from the recovered store state.
488    pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
489        &self,
490        query: &Query<E>,
491    ) -> Result<String, QueryError>
492    where
493        E: EntityValue + EntityKind<Canister = C>,
494    {
495        self.with_query_visible_indexes(query, |query, visible_indexes| {
496            let (prepared_plan, cache_attribution) =
497                self.cached_prepared_query_plan_for_entity(query)?;
498            let mut plan = prepared_plan.logical_plan().clone();
499
500            // Freeze the same planner-owned explain access-choice snapshot used
501            // by the direct non-cached explain path before rendering verbose
502            // diagnostics from the reused logical plan.
503            plan.finalize_access_choice_for_model_with_indexes(
504                query.structural().model(),
505                visible_indexes.as_slice(),
506            );
507
508            query
509                .structural()
510                .finalized_execution_diagnostics_from_plan_with_descriptor_mutator(
511                    &plan,
512                    Some(query_plan_cache_reuse_event(cache_attribution)),
513                    |_| {},
514                )
515                .map(|diagnostics| diagnostics.render_text_verbose())
516        })
517    }
518
519    // Explain one prepared fluent aggregate terminal using only
520    // planner-visible indexes from the recovered store state.
521    pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
522        &self,
523        query: &Query<E>,
524        strategy: &S,
525    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
526    where
527        E: EntityValue + EntityKind<Canister = C>,
528        S: PreparedFluentAggregateExplainStrategy,
529    {
530        self.with_query_visible_indexes(query, |query, visible_indexes| {
531            query
532                .explain_prepared_aggregate_terminal_with_visible_indexes(visible_indexes, strategy)
533        })
534    }
535
536    // Explain one `bytes_by(field)` terminal using only planner-visible
537    // indexes from the recovered store state.
538    pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
539        &self,
540        query: &Query<E>,
541        target_field: &str,
542    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
543    where
544        E: EntityValue + EntityKind<Canister = C>,
545    {
546        self.with_query_visible_indexes(query, |query, visible_indexes| {
547            query.explain_bytes_by_with_visible_indexes(visible_indexes, target_field)
548        })
549    }
550
551    // Explain one prepared fluent projection terminal using only
552    // planner-visible indexes from the recovered store state.
553    pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
554        &self,
555        query: &Query<E>,
556        strategy: &PreparedFluentProjectionStrategy,
557    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
558    where
559        E: EntityValue + EntityKind<Canister = C>,
560    {
561        self.with_query_visible_indexes(query, |query, visible_indexes| {
562            query.explain_prepared_projection_terminal_with_visible_indexes(
563                visible_indexes,
564                strategy,
565            )
566        })
567    }
568
569    // Validate that one execution strategy is admissible for scalar paged load
570    // execution and fail closed on grouped/primary-key-only routes.
571    fn ensure_scalar_paged_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
572        match family {
573            ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
574                CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
575            )),
576            ExecutionFamily::Ordered => Ok(()),
577            ExecutionFamily::Grouped => Err(QueryError::invariant(
578                "grouped queries execute via execute(), not page().execute()",
579            )),
580        }
581    }
582
583    // Validate that one execution strategy is admissible for the grouped
584    // execution surface.
585    fn ensure_grouped_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
586        match family {
587            ExecutionFamily::Grouped => Ok(()),
588            ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
589                "grouped execution requires grouped logical plans",
590            )),
591        }
592    }
593
594    // Finalize one grouped cursor page into the outward grouped execution
595    // payload so grouped cursor encoding and continuation-shape validation
596    // stay owned by the session boundary.
597    fn finalize_grouped_execution_page(
598        page: GroupedCursorPage,
599        trace: Option<ExecutionTrace>,
600    ) -> Result<PagedGroupedExecutionWithTrace, QueryError> {
601        let next_cursor = page
602            .next_cursor
603            .map(|token| {
604                let Some(token) = token.as_grouped() else {
605                    return Err(QueryError::grouped_paged_emitted_scalar_continuation());
606                };
607
608                token.encode().map_err(|err| {
609                    QueryError::serialize_internal(format!(
610                        "failed to serialize grouped continuation cursor: {err}"
611                    ))
612                })
613            })
614            .transpose()?;
615
616        Ok(PagedGroupedExecutionWithTrace::new(
617            page.rows
618                .into_iter()
619                .map(crate::db::GroupedRow::from_runtime_row)
620                .collect(),
621            next_cursor,
622            trace,
623        ))
624    }
625
626    /// Execute one scalar load/delete query and return materialized response rows.
627    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
628    where
629        E: PersistedRow<Canister = C> + EntityValue,
630    {
631        // Phase 1: compile typed intent into one prepared execution-plan contract.
632        let mode = query.mode();
633        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
634
635        // Phase 2: delegate execution to the shared compiled-plan entry path.
636        self.execute_query_dyn(mode, plan)
637    }
638
639    /// Execute one typed query while reporting the compile/execute split at
640    /// the shared fluent query seam.
641    #[cfg(feature = "diagnostics")]
642    #[doc(hidden)]
643    #[expect(
644        clippy::too_many_lines,
645        reason = "the diagnostics-only attribution path keeps grouped and scalar execution on one explicit compile/execute accounting seam"
646    )]
647    pub fn execute_query_result_with_attribution<E>(
648        &self,
649        query: &Query<E>,
650    ) -> Result<(LoadQueryResult<E>, QueryExecutionAttribution), QueryError>
651    where
652        E: PersistedRow<Canister = C> + EntityValue,
653    {
654        // Phase 1: measure compile work at the typed/fluent boundary,
655        // including the shared lower query-plan cache lookup/build exactly
656        // once. This preserves honest hit/miss attribution without
657        // double-building plans on one-shot cache misses.
658        let (compile_local_instructions, plan_and_cache) =
659            measure_query_stage(|| self.cached_prepared_query_plan_for_entity::<E>(query));
660        let (plan, cache_attribution) = plan_and_cache?;
661
662        // Phase 2: execute one query result using the prepared plan produced
663        // by the compile/cache boundary above.
664        let (execute_local_instructions, result) = measure_query_stage(
665            || -> Result<(LoadQueryResult<E>, QueryExecutePhaseAttribution, u64), QueryError> {
666                if query.has_grouping() {
667                    let (page, trace, phase_attribution) =
668                        self.execute_grouped_plan_with(plan, None, |executor, plan, cursor| {
669                            executor
670                                .execute_grouped_paged_with_cursor_traced_with_phase_attribution(
671                                    plan, cursor,
672                                )
673                        })?;
674                    let grouped = Self::finalize_grouped_execution_page(page, trace)?;
675
676                    Ok((
677                        LoadQueryResult::Grouped(grouped),
678                        Self::grouped_query_execute_phase_attribution(phase_attribution),
679                        0,
680                    ))
681                } else {
682                    match query.mode() {
683                        QueryMode::Load(_) => {
684                            let (rows, phase_attribution, response_decode_local_instructions) =
685                                self.load_executor::<E>()
686                                    .execute_with_phase_attribution(plan)
687                                    .map_err(QueryError::execute)?;
688
689                            Ok((
690                                LoadQueryResult::Rows(rows),
691                                Self::scalar_query_execute_phase_attribution(phase_attribution),
692                                response_decode_local_instructions,
693                            ))
694                        }
695                        QueryMode::Delete(_) => {
696                            let result = self.execute_query_dyn(query.mode(), plan)?;
697
698                            Ok((
699                                LoadQueryResult::Rows(result),
700                                Self::empty_query_execute_phase_attribution(),
701                                0,
702                            ))
703                        }
704                    }
705                }
706            },
707        );
708        let (result, execute_phase_attribution, response_decode_local_instructions) = result?;
709        let total_local_instructions =
710            compile_local_instructions.saturating_add(execute_local_instructions);
711
712        Ok((
713            result,
714            QueryExecutionAttribution {
715                compile_local_instructions,
716                runtime_local_instructions: execute_phase_attribution.runtime_local_instructions,
717                finalize_local_instructions: execute_phase_attribution.finalize_local_instructions,
718                direct_data_row_scan_local_instructions: execute_phase_attribution
719                    .direct_data_row_scan_local_instructions,
720                direct_data_row_key_stream_local_instructions: execute_phase_attribution
721                    .direct_data_row_key_stream_local_instructions,
722                direct_data_row_row_read_local_instructions: execute_phase_attribution
723                    .direct_data_row_row_read_local_instructions,
724                direct_data_row_key_encode_local_instructions: execute_phase_attribution
725                    .direct_data_row_key_encode_local_instructions,
726                direct_data_row_store_get_local_instructions: execute_phase_attribution
727                    .direct_data_row_store_get_local_instructions,
728                direct_data_row_order_window_local_instructions: execute_phase_attribution
729                    .direct_data_row_order_window_local_instructions,
730                direct_data_row_page_window_local_instructions: execute_phase_attribution
731                    .direct_data_row_page_window_local_instructions,
732                grouped_stream_local_instructions: execute_phase_attribution
733                    .grouped_stream_local_instructions,
734                grouped_fold_local_instructions: execute_phase_attribution
735                    .grouped_fold_local_instructions,
736                grouped_finalize_local_instructions: execute_phase_attribution
737                    .grouped_finalize_local_instructions,
738                grouped_count_borrowed_hash_computations: execute_phase_attribution
739                    .grouped_count
740                    .borrowed_hash_computations,
741                grouped_count_bucket_candidate_checks: execute_phase_attribution
742                    .grouped_count
743                    .bucket_candidate_checks,
744                grouped_count_existing_group_hits: execute_phase_attribution
745                    .grouped_count
746                    .existing_group_hits,
747                grouped_count_new_group_inserts: execute_phase_attribution
748                    .grouped_count
749                    .new_group_inserts,
750                grouped_count_row_materialization_local_instructions: execute_phase_attribution
751                    .grouped_count
752                    .row_materialization_local_instructions,
753                grouped_count_group_lookup_local_instructions: execute_phase_attribution
754                    .grouped_count
755                    .group_lookup_local_instructions,
756                grouped_count_existing_group_update_local_instructions: execute_phase_attribution
757                    .grouped_count
758                    .existing_group_update_local_instructions,
759                grouped_count_new_group_insert_local_instructions: execute_phase_attribution
760                    .grouped_count
761                    .new_group_insert_local_instructions,
762                response_decode_local_instructions,
763                execute_local_instructions,
764                total_local_instructions,
765                shared_query_plan_cache_hits: cache_attribution.hits,
766                shared_query_plan_cache_misses: cache_attribution.misses,
767            },
768        ))
769    }
770
771    // Execute one typed query through the unified row/grouped result surface so
772    // higher layers do not need to branch on grouped shape themselves.
773    #[doc(hidden)]
774    pub fn execute_query_result<E>(
775        &self,
776        query: &Query<E>,
777    ) -> Result<LoadQueryResult<E>, QueryError>
778    where
779        E: PersistedRow<Canister = C> + EntityValue,
780    {
781        if query.has_grouping() {
782            return self
783                .execute_grouped(query, None)
784                .map(LoadQueryResult::Grouped);
785        }
786
787        self.execute_query(query).map(LoadQueryResult::Rows)
788    }
789
790    /// Execute one typed delete query and return only the affected-row count.
791    #[doc(hidden)]
792    pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
793    where
794        E: PersistedRow<Canister = C> + EntityValue,
795    {
796        // Phase 1: fail closed if the caller routes a non-delete query here.
797        if !query.mode().is_delete() {
798            return Err(QueryError::unsupported_query(
799                "delete count execution requires delete query mode",
800            ));
801        }
802
803        // Phase 2: resolve one cached prepared execution-plan contract directly
804        // from the shared lower boundary instead of rebuilding it through the
805        // typed compiled-query wrapper.
806        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
807
808        // Phase 3: execute the shared delete core while skipping response-row materialization.
809        self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
810            .map_err(QueryError::execute)
811    }
812
813    /// Execute one scalar query from one pre-built prepared execution contract.
814    ///
815    /// This is the shared compiled-plan entry boundary used by the typed
816    /// `execute_query(...)` surface and adjacent query execution facades.
817    pub(in crate::db) fn execute_query_dyn<E>(
818        &self,
819        mode: QueryMode,
820        plan: PreparedExecutionPlan<E>,
821    ) -> Result<EntityResponse<E>, QueryError>
822    where
823        E: PersistedRow<Canister = C> + EntityValue,
824    {
825        let result = match mode {
826            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
827            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
828        };
829
830        result.map_err(QueryError::execute)
831    }
832
833    // Shared load-query terminal wrapper: build plan, run under metrics, map
834    // execution errors into query-facing errors.
835    pub(in crate::db) fn execute_load_query_with<E, T>(
836        &self,
837        query: &Query<E>,
838        op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
839    ) -> Result<T, QueryError>
840    where
841        E: PersistedRow<Canister = C> + EntityValue,
842    {
843        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
844
845        self.with_metrics(|| op(self.load_executor::<E>(), plan))
846            .map_err(QueryError::execute)
847    }
848
849    /// Build one trace payload for a query without executing it.
850    ///
851    /// This lightweight surface is intended for developer diagnostics:
852    /// plan hash, access strategy summary, and planner/executor route shape.
853    pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
854    where
855        E: EntityKind<Canister = C>,
856    {
857        let visibility = self.query_plan_visibility_for_store_path(E::Store::PATH)?;
858        let visible_indexes = Self::visible_indexes_for_model(E::MODEL, visibility);
859        let (prepared_plan, cache_attribution) =
860            self.cached_prepared_query_plan_for_entity::<E>(query)?;
861        let logical_plan = prepared_plan.logical_plan();
862        let explain = logical_plan.explain();
863        let plan_hash = query.plan_hash_hex_with_visible_indexes(&visible_indexes)?;
864        let access_strategy = AccessStrategy::from_plan(prepared_plan.access()).debug_summary();
865        let execution_family = match query.mode() {
866            QueryMode::Load(_) => Some(
867                prepared_plan
868                    .execution_family()
869                    .map_err(QueryError::execute)?,
870            ),
871            QueryMode::Delete(_) => None,
872        };
873        let reuse = query_plan_cache_reuse_event(cache_attribution);
874
875        Ok(QueryTracePlan::new(
876            plan_hash,
877            access_strategy,
878            execution_family,
879            reuse,
880            explain,
881        ))
882    }
883
884    /// Execute one scalar paged load query and return optional continuation cursor plus trace.
885    pub(crate) fn execute_load_query_paged_with_trace<E>(
886        &self,
887        query: &Query<E>,
888        cursor_token: Option<&str>,
889    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
890    where
891        E: PersistedRow<Canister = C> + EntityValue,
892    {
893        // Phase 1: build/validate prepared execution plan and reject grouped plans.
894        let plan = self.cached_prepared_query_plan_for_entity::<E>(query)?.0;
895        Self::ensure_scalar_paged_execution_family(
896            plan.execution_family().map_err(QueryError::execute)?,
897        )?;
898
899        // Phase 2: decode external cursor token and validate it against plan surface.
900        let cursor_bytes = decode_optional_cursor_token(cursor_token)
901            .map_err(QueryError::from_cursor_plan_error)?;
902        let cursor = plan
903            .prepare_cursor(cursor_bytes.as_deref())
904            .map_err(QueryError::from_executor_plan_error)?;
905
906        // Phase 3: execute one traced page and encode outbound continuation token.
907        let (page, trace) = self
908            .with_metrics(|| {
909                self.load_executor::<E>()
910                    .execute_paged_with_cursor_traced(plan, cursor)
911            })
912            .map_err(QueryError::execute)?;
913        let next_cursor = page
914            .next_cursor
915            .map(|token| {
916                let Some(token) = token.as_scalar() else {
917                    return Err(QueryError::scalar_paged_emitted_grouped_continuation());
918                };
919
920                token.encode().map_err(|err| {
921                    QueryError::serialize_internal(format!(
922                        "failed to serialize continuation cursor: {err}"
923                    ))
924                })
925            })
926            .transpose()?;
927
928        Ok(PagedLoadExecutionWithTrace::new(
929            page.items,
930            next_cursor,
931            trace,
932        ))
933    }
934
935    /// Execute one grouped query page with optional grouped continuation cursor.
936    ///
937    /// This is the explicit grouped execution boundary; scalar load APIs reject
938    /// grouped plans to preserve scalar response contracts.
939    pub(in crate::db) fn execute_grouped<E>(
940        &self,
941        query: &Query<E>,
942        cursor_token: Option<&str>,
943    ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
944    where
945        E: PersistedRow<Canister = C> + EntityValue,
946    {
947        // Phase 1: build the prepared execution plan once from the typed query.
948        let plan = self.cached_prepared_query_plan_for_entity::<E>(query)?.0;
949
950        // Phase 2: reuse the shared prepared grouped execution path and then
951        // finalize the outward grouped payload at the session boundary.
952        let (page, trace) = self.execute_grouped_plan_with_trace(plan, cursor_token)?;
953
954        Self::finalize_grouped_execution_page(page, trace)
955    }
956
957    // Execute one grouped prepared plan page with optional grouped cursor
958    // while letting the caller choose the final grouped-runtime dispatch.
959    fn execute_grouped_plan_with<E, T>(
960        &self,
961        plan: PreparedExecutionPlan<E>,
962        cursor_token: Option<&str>,
963        op: impl FnOnce(
964            LoadExecutor<E>,
965            PreparedExecutionPlan<E>,
966            crate::db::cursor::GroupedPlannedCursor,
967        ) -> Result<T, InternalError>,
968    ) -> Result<T, QueryError>
969    where
970        E: PersistedRow<Canister = C> + EntityValue,
971    {
972        // Phase 1: validate the prepared plan shape before decoding cursors.
973        Self::ensure_grouped_execution_family(
974            plan.execution_family().map_err(QueryError::execute)?,
975        )?;
976
977        // Phase 2: decode external grouped cursor token and validate against plan.
978        let cursor = decode_optional_grouped_cursor_token(cursor_token)
979            .map_err(QueryError::from_cursor_plan_error)?;
980        let cursor = plan
981            .prepare_grouped_cursor_token(cursor)
982            .map_err(QueryError::from_executor_plan_error)?;
983
984        // Phase 3: execute one grouped page while preserving the structural
985        // grouped cursor payload for whichever outward cursor format the caller needs.
986        self.with_metrics(|| op(self.load_executor::<E>(), plan, cursor))
987            .map_err(QueryError::execute)
988    }
989
990    // Execute one grouped prepared plan page with optional grouped cursor.
991    fn execute_grouped_plan_with_trace<E>(
992        &self,
993        plan: PreparedExecutionPlan<E>,
994        cursor_token: Option<&str>,
995    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
996    where
997        E: PersistedRow<Canister = C> + EntityValue,
998    {
999        self.execute_grouped_plan_with(plan, cursor_token, |executor, plan, cursor| {
1000            executor.execute_grouped_paged_with_cursor_traced(plan, cursor)
1001        })
1002    }
1003}
1004
1005impl QueryPlanCacheKey {
1006    // Assemble the canonical cache-key shell once so the test and
1007    // normalized-predicate constructors only decide which structural query key
1008    // they feed into the shared session cache identity.
1009    const fn from_authority_parts(
1010        authority: crate::db::executor::EntityAuthority,
1011        schema_fingerprint: CommitSchemaFingerprint,
1012        visibility: QueryPlanVisibility,
1013        structural_query: crate::db::query::intent::StructuralQueryCacheKey,
1014        cache_method_version: u8,
1015    ) -> Self {
1016        Self {
1017            cache_method_version,
1018            entity_path: authority.entity_path(),
1019            schema_fingerprint,
1020            visibility,
1021            structural_query,
1022        }
1023    }
1024
1025    #[cfg(test)]
1026    fn for_authority_with_method_version(
1027        authority: crate::db::executor::EntityAuthority,
1028        schema_fingerprint: CommitSchemaFingerprint,
1029        visibility: QueryPlanVisibility,
1030        query: &StructuralQuery,
1031        cache_method_version: u8,
1032    ) -> Self {
1033        Self::from_authority_parts(
1034            authority,
1035            schema_fingerprint,
1036            visibility,
1037            query.structural_cache_key(),
1038            cache_method_version,
1039        )
1040    }
1041
1042    fn for_authority_with_normalized_predicate_fingerprint_and_method_version(
1043        authority: crate::db::executor::EntityAuthority,
1044        schema_fingerprint: CommitSchemaFingerprint,
1045        visibility: QueryPlanVisibility,
1046        query: &StructuralQuery,
1047        normalized_predicate_fingerprint: Option<[u8; 32]>,
1048        cache_method_version: u8,
1049    ) -> Self {
1050        Self::from_authority_parts(
1051            authority,
1052            schema_fingerprint,
1053            visibility,
1054            query.structural_cache_key_with_normalized_predicate_fingerprint(
1055                normalized_predicate_fingerprint,
1056            ),
1057            cache_method_version,
1058        )
1059    }
1060}