Skip to main content

icydb_core/db/session/
query.rs

1//! Module: db::session::query
2//! Responsibility: session-bound query planning, explain, and cursor execution
3//! helpers that recover store visibility before delegating to query-owned logic.
4//! Does not own: query intent construction or executor runtime semantics.
5//! Boundary: resolves session visibility and cursor policy before handing work to the planner/executor.
6
7#[cfg(feature = "diagnostics")]
8use crate::db::executor::{
9    GroupedCountAttribution, GroupedExecutePhaseAttribution, ScalarExecutePhaseAttribution,
10};
11use crate::{
12    db::{
13        DbSession, EntityResponse, LoadQueryResult, PagedGroupedExecutionWithTrace,
14        PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
15        TraceExecutionFamily, TraceReuseArtifactClass, TraceReuseEvent,
16        access::summarize_executable_access_plan,
17        commit::CommitSchemaFingerprint,
18        cursor::{
19            CursorPlanError, decode_optional_cursor_token, decode_optional_grouped_cursor_token,
20        },
21        diagnostics::ExecutionTrace,
22        executor::{
23            ExecutionFamily, ExecutorPlanError, GroupedCursorPage, LoadExecutor,
24            PreparedExecutionPlan, ScalarNumericFieldBoundaryRequest,
25            ScalarProjectionBoundaryOutput, ScalarProjectionBoundaryRequest,
26            ScalarTerminalBoundaryOutput, ScalarTerminalBoundaryRequest,
27            SharedPreparedExecutionPlan,
28        },
29        predicate::predicate_fingerprint_normalized,
30        query::builder::{
31            PreparedFluentAggregateExplainStrategy,
32            PreparedFluentExistingRowsTerminalRuntimeRequest,
33            PreparedFluentExistingRowsTerminalStrategy, PreparedFluentNumericFieldRuntimeRequest,
34            PreparedFluentNumericFieldStrategy, PreparedFluentOrderSensitiveTerminalRuntimeRequest,
35            PreparedFluentOrderSensitiveTerminalStrategy, PreparedFluentProjectionRuntimeRequest,
36            PreparedFluentProjectionStrategy, PreparedFluentScalarTerminalRuntimeRequest,
37            PreparedFluentScalarTerminalStrategy,
38        },
39        query::explain::{
40            ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
41        },
42        query::fluent::load::{FluentProjectionTerminalOutput, FluentScalarTerminalOutput},
43        query::{
44            intent::{CompiledQuery, PlannedQuery, StructuralQuery},
45            plan::{AccessPlannedQuery, FieldSlot, QueryMode, VisibleIndexes},
46        },
47    },
48    error::InternalError,
49    model::entity::EntityModel,
50    traits::{CanisterKind, EntityKind, EntityValue, Path},
51    types::{Decimal, Id},
52    value::Value,
53};
54#[cfg(feature = "diagnostics")]
55use candid::CandidType;
56#[cfg(feature = "diagnostics")]
57use serde::Deserialize;
58use std::{cell::RefCell, collections::HashMap};
59
60// Bump this when the shared lower query-plan cache key meaning changes in a
61// way that must force old in-heap entries to miss instead of aliasing.
62const SHARED_QUERY_PLAN_CACHE_METHOD_VERSION: u8 = 2;
63
64#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
65pub(in crate::db) enum QueryPlanVisibility {
66    StoreNotReady,
67    StoreReady,
68}
69
70#[derive(Clone, Debug, Eq, Hash, PartialEq)]
71pub(in crate::db) struct QueryPlanCacheKey {
72    cache_method_version: u8,
73    entity_path: &'static str,
74    schema_fingerprint: CommitSchemaFingerprint,
75    visibility: QueryPlanVisibility,
76    structural_query: crate::db::query::intent::StructuralQueryCacheKey,
77}
78
79pub(in crate::db) type QueryPlanCache = HashMap<QueryPlanCacheKey, SharedPreparedExecutionPlan>;
80
81thread_local! {
82    // Keep one in-heap query-plan cache per store registry so fresh `DbSession`
83    // facades can share prepared logical plans across update/query calls while
84    // tests and multi-registry host processes remain isolated by registry
85    // identity.
86    static QUERY_PLAN_CACHES: RefCell<HashMap<usize, QueryPlanCache>> =
87        RefCell::new(HashMap::default());
88}
89
90#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
91pub(in crate::db) struct QueryPlanCacheAttribution {
92    pub hits: u64,
93    pub misses: u64,
94}
95
96impl QueryPlanCacheAttribution {
97    #[must_use]
98    const fn hit() -> Self {
99        Self { hits: 1, misses: 0 }
100    }
101
102    #[must_use]
103    const fn miss() -> Self {
104        Self { hits: 0, misses: 1 }
105    }
106}
107
108// Map one shared query-plan cache attribution outcome onto the explicit reuse
109// event shipped in `0.109.0`.
110pub(in crate::db::session) const fn query_plan_cache_reuse_event(
111    attribution: QueryPlanCacheAttribution,
112) -> TraceReuseEvent {
113    if attribution.hits > 0 {
114        TraceReuseEvent::hit(TraceReuseArtifactClass::SharedPreparedQueryPlan)
115    } else {
116        TraceReuseEvent::miss(TraceReuseArtifactClass::SharedPreparedQueryPlan)
117    }
118}
119
120// Translate executor route-family selection into the query-owned trace label
121// at the session boundary so trace DTOs do not depend on executor types.
122const fn trace_execution_family_from_executor(family: ExecutionFamily) -> TraceExecutionFamily {
123    match family {
124        ExecutionFamily::PrimaryKey => TraceExecutionFamily::PrimaryKey,
125        ExecutionFamily::Ordered => TraceExecutionFamily::Ordered,
126        ExecutionFamily::Grouped => TraceExecutionFamily::Grouped,
127    }
128}
129
130// Convert executor plan-surface failures at the session boundary so query error
131// types do not import executor-owned error enums.
132pub(in crate::db::session) fn query_error_from_executor_plan_error(
133    err: ExecutorPlanError,
134) -> QueryError {
135    match err {
136        ExecutorPlanError::Cursor(err) => QueryError::from_cursor_plan_error(*err),
137    }
138}
139
140///
141/// QueryExecutionAttribution
142///
143/// QueryExecutionAttribution records the top-level compile/execute split for
144/// typed/fluent query execution at the session boundary.
145///
146#[cfg(feature = "diagnostics")]
147#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
148pub struct QueryExecutionAttribution {
149    pub compile_local_instructions: u64,
150    pub runtime_local_instructions: u64,
151    pub finalize_local_instructions: u64,
152    pub direct_data_row_scan_local_instructions: u64,
153    pub direct_data_row_key_stream_local_instructions: u64,
154    pub direct_data_row_row_read_local_instructions: u64,
155    pub direct_data_row_key_encode_local_instructions: u64,
156    pub direct_data_row_store_get_local_instructions: u64,
157    pub direct_data_row_order_window_local_instructions: u64,
158    pub direct_data_row_page_window_local_instructions: u64,
159    pub grouped_stream_local_instructions: u64,
160    pub grouped_fold_local_instructions: u64,
161    pub grouped_finalize_local_instructions: u64,
162    pub grouped_count_borrowed_hash_computations: u64,
163    pub grouped_count_bucket_candidate_checks: u64,
164    pub grouped_count_existing_group_hits: u64,
165    pub grouped_count_new_group_inserts: u64,
166    pub grouped_count_row_materialization_local_instructions: u64,
167    pub grouped_count_group_lookup_local_instructions: u64,
168    pub grouped_count_existing_group_update_local_instructions: u64,
169    pub grouped_count_new_group_insert_local_instructions: u64,
170    pub response_decode_local_instructions: u64,
171    pub execute_local_instructions: u64,
172    pub total_local_instructions: u64,
173    pub shared_query_plan_cache_hits: u64,
174    pub shared_query_plan_cache_misses: u64,
175}
176
177#[cfg(feature = "diagnostics")]
178#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
179struct QueryExecutePhaseAttribution {
180    runtime_local_instructions: u64,
181    finalize_local_instructions: u64,
182    direct_data_row_scan_local_instructions: u64,
183    direct_data_row_key_stream_local_instructions: u64,
184    direct_data_row_row_read_local_instructions: u64,
185    direct_data_row_key_encode_local_instructions: u64,
186    direct_data_row_store_get_local_instructions: u64,
187    direct_data_row_order_window_local_instructions: u64,
188    direct_data_row_page_window_local_instructions: u64,
189    grouped_stream_local_instructions: u64,
190    grouped_fold_local_instructions: u64,
191    grouped_finalize_local_instructions: u64,
192    grouped_count: GroupedCountAttribution,
193}
194
195#[cfg(feature = "diagnostics")]
196#[expect(
197    clippy::missing_const_for_fn,
198    reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
199)]
200fn read_query_local_instruction_counter() -> u64 {
201    #[cfg(target_arch = "wasm32")]
202    {
203        canic_cdk::api::performance_counter(1)
204    }
205
206    #[cfg(not(target_arch = "wasm32"))]
207    {
208        0
209    }
210}
211
212#[cfg(feature = "diagnostics")]
213fn measure_query_stage<T, E>(run: impl FnOnce() -> Result<T, E>) -> (u64, Result<T, E>) {
214    let start = read_query_local_instruction_counter();
215    let result = run();
216    let delta = read_query_local_instruction_counter().saturating_sub(start);
217
218    (delta, result)
219}
220
221impl<C: CanisterKind> DbSession<C> {
222    #[cfg(feature = "diagnostics")]
223    const fn empty_query_execute_phase_attribution() -> QueryExecutePhaseAttribution {
224        QueryExecutePhaseAttribution {
225            runtime_local_instructions: 0,
226            finalize_local_instructions: 0,
227            direct_data_row_scan_local_instructions: 0,
228            direct_data_row_key_stream_local_instructions: 0,
229            direct_data_row_row_read_local_instructions: 0,
230            direct_data_row_key_encode_local_instructions: 0,
231            direct_data_row_store_get_local_instructions: 0,
232            direct_data_row_order_window_local_instructions: 0,
233            direct_data_row_page_window_local_instructions: 0,
234            grouped_stream_local_instructions: 0,
235            grouped_fold_local_instructions: 0,
236            grouped_finalize_local_instructions: 0,
237            grouped_count: GroupedCountAttribution::none(),
238        }
239    }
240
241    #[cfg(feature = "diagnostics")]
242    const fn scalar_query_execute_phase_attribution(
243        phase: ScalarExecutePhaseAttribution,
244    ) -> QueryExecutePhaseAttribution {
245        QueryExecutePhaseAttribution {
246            runtime_local_instructions: phase.runtime_local_instructions,
247            finalize_local_instructions: phase.finalize_local_instructions,
248            direct_data_row_scan_local_instructions: phase.direct_data_row_scan_local_instructions,
249            direct_data_row_key_stream_local_instructions: phase
250                .direct_data_row_key_stream_local_instructions,
251            direct_data_row_row_read_local_instructions: phase
252                .direct_data_row_row_read_local_instructions,
253            direct_data_row_key_encode_local_instructions: phase
254                .direct_data_row_key_encode_local_instructions,
255            direct_data_row_store_get_local_instructions: phase
256                .direct_data_row_store_get_local_instructions,
257            direct_data_row_order_window_local_instructions: phase
258                .direct_data_row_order_window_local_instructions,
259            direct_data_row_page_window_local_instructions: phase
260                .direct_data_row_page_window_local_instructions,
261            grouped_stream_local_instructions: 0,
262            grouped_fold_local_instructions: 0,
263            grouped_finalize_local_instructions: 0,
264            grouped_count: GroupedCountAttribution::none(),
265        }
266    }
267
268    #[cfg(feature = "diagnostics")]
269    const fn grouped_query_execute_phase_attribution(
270        phase: GroupedExecutePhaseAttribution,
271    ) -> QueryExecutePhaseAttribution {
272        QueryExecutePhaseAttribution {
273            runtime_local_instructions: phase
274                .stream_local_instructions
275                .saturating_add(phase.fold_local_instructions),
276            finalize_local_instructions: phase.finalize_local_instructions,
277            direct_data_row_scan_local_instructions: 0,
278            direct_data_row_key_stream_local_instructions: 0,
279            direct_data_row_row_read_local_instructions: 0,
280            direct_data_row_key_encode_local_instructions: 0,
281            direct_data_row_store_get_local_instructions: 0,
282            direct_data_row_order_window_local_instructions: 0,
283            direct_data_row_page_window_local_instructions: 0,
284            grouped_stream_local_instructions: phase.stream_local_instructions,
285            grouped_fold_local_instructions: phase.fold_local_instructions,
286            grouped_finalize_local_instructions: phase.finalize_local_instructions,
287            grouped_count: phase.grouped_count,
288        }
289    }
290
291    fn with_query_plan_cache<R>(&self, f: impl FnOnce(&mut QueryPlanCache) -> R) -> R {
292        let scope_id = self.db.cache_scope_id();
293
294        QUERY_PLAN_CACHES.with(|caches| {
295            let mut caches = caches.borrow_mut();
296            let cache = caches.entry(scope_id).or_default();
297
298            f(cache)
299        })
300    }
301
302    const fn visible_indexes_for_model(
303        model: &'static EntityModel,
304        visibility: QueryPlanVisibility,
305    ) -> VisibleIndexes<'static> {
306        match visibility {
307            QueryPlanVisibility::StoreReady => VisibleIndexes::planner_visible(model.indexes()),
308            QueryPlanVisibility::StoreNotReady => VisibleIndexes::none(),
309        }
310    }
311
312    #[cfg(test)]
313    pub(in crate::db) fn query_plan_cache_len(&self) -> usize {
314        self.with_query_plan_cache(|cache| cache.len())
315    }
316
317    #[cfg(test)]
318    pub(in crate::db) fn clear_query_plan_cache_for_tests(&self) {
319        self.with_query_plan_cache(QueryPlanCache::clear);
320    }
321
322    pub(in crate::db) fn query_plan_visibility_for_store_path(
323        &self,
324        store_path: &'static str,
325    ) -> Result<QueryPlanVisibility, QueryError> {
326        let store = self
327            .db
328            .recovered_store(store_path)
329            .map_err(QueryError::execute)?;
330        let visibility = if store.index_state() == crate::db::IndexState::Ready {
331            QueryPlanVisibility::StoreReady
332        } else {
333            QueryPlanVisibility::StoreNotReady
334        };
335
336        Ok(visibility)
337    }
338
339    pub(in crate::db) fn cached_shared_query_plan_for_authority(
340        &self,
341        authority: crate::db::executor::EntityAuthority,
342        schema_fingerprint: CommitSchemaFingerprint,
343        query: &StructuralQuery,
344    ) -> Result<(SharedPreparedExecutionPlan, QueryPlanCacheAttribution), QueryError> {
345        let visibility = self.query_plan_visibility_for_store_path(authority.store_path())?;
346        let visible_indexes = Self::visible_indexes_for_model(authority.model(), visibility);
347        let planning_state = query.prepare_scalar_planning_state()?;
348        let normalized_predicate_fingerprint = planning_state
349            .normalized_predicate()
350            .map(predicate_fingerprint_normalized);
351        let cache_key =
352            QueryPlanCacheKey::for_authority_with_normalized_predicate_fingerprint_and_method_version(
353                authority,
354                schema_fingerprint,
355                visibility,
356                query,
357                normalized_predicate_fingerprint,
358                SHARED_QUERY_PLAN_CACHE_METHOD_VERSION,
359            );
360
361        {
362            let cached = self.with_query_plan_cache(|cache| cache.get(&cache_key).cloned());
363            if let Some(prepared_plan) = cached {
364                return Ok((prepared_plan, QueryPlanCacheAttribution::hit()));
365            }
366        }
367
368        let plan = query.build_plan_with_visible_indexes_from_scalar_planning_state(
369            &visible_indexes,
370            planning_state,
371        )?;
372        let prepared_plan = SharedPreparedExecutionPlan::from_plan(authority, plan);
373        self.with_query_plan_cache(|cache| {
374            cache.insert(cache_key, prepared_plan.clone());
375        });
376
377        Ok((prepared_plan, QueryPlanCacheAttribution::miss()))
378    }
379
380    #[cfg(test)]
381    pub(in crate::db) fn query_plan_cache_key_for_tests(
382        authority: crate::db::executor::EntityAuthority,
383        schema_fingerprint: CommitSchemaFingerprint,
384        visibility: QueryPlanVisibility,
385        query: &StructuralQuery,
386        cache_method_version: u8,
387    ) -> QueryPlanCacheKey {
388        QueryPlanCacheKey::for_authority_with_method_version(
389            authority,
390            schema_fingerprint,
391            visibility,
392            query,
393            cache_method_version,
394        )
395    }
396
397    // Resolve the planner-visible index slice for one typed query exactly once
398    // at the session boundary before handing execution/planning off to query-owned logic.
399    fn with_query_visible_indexes<E, T>(
400        &self,
401        query: &Query<E>,
402        op: impl FnOnce(
403            &Query<E>,
404            &crate::db::query::plan::VisibleIndexes<'static>,
405        ) -> Result<T, QueryError>,
406    ) -> Result<T, QueryError>
407    where
408        E: EntityKind<Canister = C>,
409    {
410        let visibility = self.query_plan_visibility_for_store_path(E::Store::PATH)?;
411        let visible_indexes = Self::visible_indexes_for_model(E::MODEL, visibility);
412
413        op(query, &visible_indexes)
414    }
415
416    pub(in crate::db::session) fn cached_prepared_query_plan_for_entity<E>(
417        &self,
418        query: &Query<E>,
419    ) -> Result<(PreparedExecutionPlan<E>, QueryPlanCacheAttribution), QueryError>
420    where
421        E: EntityKind<Canister = C>,
422    {
423        let (prepared_plan, attribution) = self.cached_shared_query_plan_for_entity::<E>(query)?;
424
425        Ok((prepared_plan.typed_clone::<E>(), attribution))
426    }
427
428    // Resolve one typed query through the shared lower query-plan cache using
429    // the canonical authority and schema-fingerprint pair for that entity.
430    fn cached_shared_query_plan_for_entity<E>(
431        &self,
432        query: &Query<E>,
433    ) -> Result<(SharedPreparedExecutionPlan, QueryPlanCacheAttribution), QueryError>
434    where
435        E: EntityKind<Canister = C>,
436    {
437        self.cached_shared_query_plan_for_authority(
438            crate::db::executor::EntityAuthority::for_type::<E>(),
439            crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
440            query.structural(),
441        )
442    }
443
444    // Map one typed query onto one cached lower prepared plan so session-owned
445    // planned and compiled wrappers reuse the same cache lookup while returning
446    // query-owned neutral plan DTOs.
447    fn map_cached_shared_query_plan_for_entity<E, T>(
448        &self,
449        query: &Query<E>,
450        map: impl FnOnce(AccessPlannedQuery) -> T,
451    ) -> Result<T, QueryError>
452    where
453        E: EntityKind<Canister = C>,
454    {
455        let (prepared_plan, _) = self.cached_shared_query_plan_for_entity::<E>(query)?;
456
457        Ok(map(prepared_plan.logical_plan().clone()))
458    }
459
460    // Compile one typed query using only the indexes currently visible for the
461    // query's recovered store.
462    pub(in crate::db) fn compile_query_with_visible_indexes<E>(
463        &self,
464        query: &Query<E>,
465    ) -> Result<CompiledQuery<E>, QueryError>
466    where
467        E: EntityKind<Canister = C>,
468    {
469        self.map_cached_shared_query_plan_for_entity(query, CompiledQuery::<E>::from_plan)
470    }
471
472    // Build one logical planned-query shell using only the indexes currently
473    // visible for the query's recovered store.
474    pub(in crate::db) fn planned_query_with_visible_indexes<E>(
475        &self,
476        query: &Query<E>,
477    ) -> Result<PlannedQuery<E>, QueryError>
478    where
479        E: EntityKind<Canister = C>,
480    {
481        self.map_cached_shared_query_plan_for_entity(query, PlannedQuery::<E>::from_plan)
482    }
483
484    // Project one logical explain payload using only planner-visible indexes.
485    pub(in crate::db) fn explain_query_with_visible_indexes<E>(
486        &self,
487        query: &Query<E>,
488    ) -> Result<ExplainPlan, QueryError>
489    where
490        E: EntityKind<Canister = C>,
491    {
492        self.with_query_visible_indexes(query, Query::<E>::explain_with_visible_indexes)
493    }
494
495    // Hash one typed query plan using only the indexes currently visible for
496    // the query's recovered store.
497    pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
498        &self,
499        query: &Query<E>,
500    ) -> Result<String, QueryError>
501    where
502        E: EntityKind<Canister = C>,
503    {
504        self.with_query_visible_indexes(query, Query::<E>::plan_hash_hex_with_visible_indexes)
505    }
506
507    // Explain one load execution shape using only planner-visible
508    // indexes from the recovered store state.
509    pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
510        &self,
511        query: &Query<E>,
512    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
513    where
514        E: EntityValue + EntityKind<Canister = C>,
515    {
516        self.with_query_visible_indexes(query, Query::<E>::explain_execution_with_visible_indexes)
517    }
518
519    // Render one load execution descriptor plus route diagnostics using
520    // only planner-visible indexes from the recovered store state.
521    pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
522        &self,
523        query: &Query<E>,
524    ) -> Result<String, QueryError>
525    where
526        E: EntityValue + EntityKind<Canister = C>,
527    {
528        self.with_query_visible_indexes(query, |query, visible_indexes| {
529            let (prepared_plan, cache_attribution) =
530                self.cached_prepared_query_plan_for_entity(query)?;
531            let mut plan = prepared_plan.logical_plan().clone();
532
533            // Freeze the same planner-owned explain access-choice snapshot used
534            // by the direct non-cached explain path before rendering verbose
535            // diagnostics from the reused logical plan.
536            plan.finalize_access_choice_for_model_with_indexes(
537                query.structural().model(),
538                visible_indexes.as_slice(),
539            );
540
541            query
542                .structural()
543                .finalized_execution_diagnostics_from_plan_with_descriptor_mutator(
544                    &plan,
545                    Some(query_plan_cache_reuse_event(cache_attribution)),
546                    |_| {},
547                )
548                .map(|diagnostics| diagnostics.render_text_verbose())
549        })
550    }
551
552    // Explain one prepared fluent aggregate terminal using only
553    // planner-visible indexes from the recovered store state.
554    pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
555        &self,
556        query: &Query<E>,
557        strategy: &S,
558    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
559    where
560        E: EntityValue + EntityKind<Canister = C>,
561        S: PreparedFluentAggregateExplainStrategy,
562    {
563        self.with_query_visible_indexes(query, |query, visible_indexes| {
564            query
565                .explain_prepared_aggregate_terminal_with_visible_indexes(visible_indexes, strategy)
566        })
567    }
568
569    // Explain one `bytes_by(field)` terminal using only planner-visible
570    // indexes from the recovered store state.
571    pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
572        &self,
573        query: &Query<E>,
574        target_field: &str,
575    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
576    where
577        E: EntityValue + EntityKind<Canister = C>,
578    {
579        self.with_query_visible_indexes(query, |query, visible_indexes| {
580            query.explain_bytes_by_with_visible_indexes(visible_indexes, target_field)
581        })
582    }
583
584    // Explain one prepared fluent projection terminal using only
585    // planner-visible indexes from the recovered store state.
586    pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
587        &self,
588        query: &Query<E>,
589        strategy: &PreparedFluentProjectionStrategy,
590    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
591    where
592        E: EntityValue + EntityKind<Canister = C>,
593    {
594        self.with_query_visible_indexes(query, |query, visible_indexes| {
595            query.explain_prepared_projection_terminal_with_visible_indexes(
596                visible_indexes,
597                strategy,
598            )
599        })
600    }
601
602    // Validate that one execution strategy is admissible for scalar paged load
603    // execution and fail closed on grouped/primary-key-only routes.
604    fn ensure_scalar_paged_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
605        match family {
606            ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
607                CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
608            )),
609            ExecutionFamily::Ordered => Ok(()),
610            ExecutionFamily::Grouped => Err(QueryError::invariant(
611                "grouped queries execute via execute(), not page().execute()",
612            )),
613        }
614    }
615
616    // Validate that one execution strategy is admissible for the grouped
617    // execution surface.
618    fn ensure_grouped_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
619        match family {
620            ExecutionFamily::Grouped => Ok(()),
621            ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
622                "grouped execution requires grouped logical plans",
623            )),
624        }
625    }
626
627    // Finalize one grouped cursor page into the outward grouped execution
628    // payload so grouped cursor encoding and continuation-shape validation
629    // stay owned by the session boundary.
630    fn finalize_grouped_execution_page(
631        page: GroupedCursorPage,
632        trace: Option<ExecutionTrace>,
633    ) -> Result<PagedGroupedExecutionWithTrace, QueryError> {
634        let next_cursor = page
635            .next_cursor
636            .map(|token| {
637                let Some(token) = token.as_grouped() else {
638                    return Err(QueryError::grouped_paged_emitted_scalar_continuation());
639                };
640
641                token.encode().map_err(|err| {
642                    QueryError::serialize_internal(format!(
643                        "failed to serialize grouped continuation cursor: {err}"
644                    ))
645                })
646            })
647            .transpose()?;
648
649        Ok(PagedGroupedExecutionWithTrace::new(
650            page.rows
651                .into_iter()
652                .map(crate::db::GroupedRow::from_runtime_row)
653                .collect(),
654            next_cursor,
655            trace,
656        ))
657    }
658
659    /// Execute one scalar load/delete query and return materialized response rows.
660    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
661    where
662        E: PersistedRow<Canister = C> + EntityValue,
663    {
664        // Phase 1: compile typed intent into one prepared execution-plan contract.
665        let mode = query.mode();
666        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
667
668        // Phase 2: delegate execution to the shared compiled-plan entry path.
669        self.execute_query_dyn(mode, plan)
670    }
671
672    /// Execute one typed query while reporting the compile/execute split at
673    /// the shared fluent query seam.
674    #[cfg(feature = "diagnostics")]
675    #[doc(hidden)]
676    #[expect(
677        clippy::too_many_lines,
678        reason = "the diagnostics-only attribution path keeps grouped and scalar execution on one explicit compile/execute accounting seam"
679    )]
680    pub fn execute_query_result_with_attribution<E>(
681        &self,
682        query: &Query<E>,
683    ) -> Result<(LoadQueryResult<E>, QueryExecutionAttribution), QueryError>
684    where
685        E: PersistedRow<Canister = C> + EntityValue,
686    {
687        // Phase 1: measure compile work at the typed/fluent boundary,
688        // including the shared lower query-plan cache lookup/build exactly
689        // once. This preserves honest hit/miss attribution without
690        // double-building plans on one-shot cache misses.
691        let (compile_local_instructions, plan_and_cache) =
692            measure_query_stage(|| self.cached_prepared_query_plan_for_entity::<E>(query));
693        let (plan, cache_attribution) = plan_and_cache?;
694
695        // Phase 2: execute one query result using the prepared plan produced
696        // by the compile/cache boundary above.
697        let (execute_local_instructions, result) = measure_query_stage(
698            || -> Result<(LoadQueryResult<E>, QueryExecutePhaseAttribution, u64), QueryError> {
699                if query.has_grouping() {
700                    let (page, trace, phase_attribution) =
701                        self.execute_grouped_plan_with(plan, None, |executor, plan, cursor| {
702                            executor
703                                .execute_grouped_paged_with_cursor_traced_with_phase_attribution(
704                                    plan, cursor,
705                                )
706                        })?;
707                    let grouped = Self::finalize_grouped_execution_page(page, trace)?;
708
709                    Ok((
710                        LoadQueryResult::Grouped(grouped),
711                        Self::grouped_query_execute_phase_attribution(phase_attribution),
712                        0,
713                    ))
714                } else {
715                    match query.mode() {
716                        QueryMode::Load(_) => {
717                            let (rows, phase_attribution, response_decode_local_instructions) =
718                                self.load_executor::<E>()
719                                    .execute_with_phase_attribution(plan)
720                                    .map_err(QueryError::execute)?;
721
722                            Ok((
723                                LoadQueryResult::Rows(rows),
724                                Self::scalar_query_execute_phase_attribution(phase_attribution),
725                                response_decode_local_instructions,
726                            ))
727                        }
728                        QueryMode::Delete(_) => {
729                            let result = self.execute_query_dyn(query.mode(), plan)?;
730
731                            Ok((
732                                LoadQueryResult::Rows(result),
733                                Self::empty_query_execute_phase_attribution(),
734                                0,
735                            ))
736                        }
737                    }
738                }
739            },
740        );
741        let (result, execute_phase_attribution, response_decode_local_instructions) = result?;
742        let total_local_instructions =
743            compile_local_instructions.saturating_add(execute_local_instructions);
744
745        Ok((
746            result,
747            QueryExecutionAttribution {
748                compile_local_instructions,
749                runtime_local_instructions: execute_phase_attribution.runtime_local_instructions,
750                finalize_local_instructions: execute_phase_attribution.finalize_local_instructions,
751                direct_data_row_scan_local_instructions: execute_phase_attribution
752                    .direct_data_row_scan_local_instructions,
753                direct_data_row_key_stream_local_instructions: execute_phase_attribution
754                    .direct_data_row_key_stream_local_instructions,
755                direct_data_row_row_read_local_instructions: execute_phase_attribution
756                    .direct_data_row_row_read_local_instructions,
757                direct_data_row_key_encode_local_instructions: execute_phase_attribution
758                    .direct_data_row_key_encode_local_instructions,
759                direct_data_row_store_get_local_instructions: execute_phase_attribution
760                    .direct_data_row_store_get_local_instructions,
761                direct_data_row_order_window_local_instructions: execute_phase_attribution
762                    .direct_data_row_order_window_local_instructions,
763                direct_data_row_page_window_local_instructions: execute_phase_attribution
764                    .direct_data_row_page_window_local_instructions,
765                grouped_stream_local_instructions: execute_phase_attribution
766                    .grouped_stream_local_instructions,
767                grouped_fold_local_instructions: execute_phase_attribution
768                    .grouped_fold_local_instructions,
769                grouped_finalize_local_instructions: execute_phase_attribution
770                    .grouped_finalize_local_instructions,
771                grouped_count_borrowed_hash_computations: execute_phase_attribution
772                    .grouped_count
773                    .borrowed_hash_computations,
774                grouped_count_bucket_candidate_checks: execute_phase_attribution
775                    .grouped_count
776                    .bucket_candidate_checks,
777                grouped_count_existing_group_hits: execute_phase_attribution
778                    .grouped_count
779                    .existing_group_hits,
780                grouped_count_new_group_inserts: execute_phase_attribution
781                    .grouped_count
782                    .new_group_inserts,
783                grouped_count_row_materialization_local_instructions: execute_phase_attribution
784                    .grouped_count
785                    .row_materialization_local_instructions,
786                grouped_count_group_lookup_local_instructions: execute_phase_attribution
787                    .grouped_count
788                    .group_lookup_local_instructions,
789                grouped_count_existing_group_update_local_instructions: execute_phase_attribution
790                    .grouped_count
791                    .existing_group_update_local_instructions,
792                grouped_count_new_group_insert_local_instructions: execute_phase_attribution
793                    .grouped_count
794                    .new_group_insert_local_instructions,
795                response_decode_local_instructions,
796                execute_local_instructions,
797                total_local_instructions,
798                shared_query_plan_cache_hits: cache_attribution.hits,
799                shared_query_plan_cache_misses: cache_attribution.misses,
800            },
801        ))
802    }
803
804    // Execute one typed query through the unified row/grouped result surface so
805    // higher layers do not need to branch on grouped shape themselves.
806    #[doc(hidden)]
807    pub fn execute_query_result<E>(
808        &self,
809        query: &Query<E>,
810    ) -> Result<LoadQueryResult<E>, QueryError>
811    where
812        E: PersistedRow<Canister = C> + EntityValue,
813    {
814        if query.has_grouping() {
815            return self
816                .execute_grouped(query, None)
817                .map(LoadQueryResult::Grouped);
818        }
819
820        self.execute_query(query).map(LoadQueryResult::Rows)
821    }
822
823    /// Execute one typed delete query and return only the affected-row count.
824    #[doc(hidden)]
825    pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
826    where
827        E: PersistedRow<Canister = C> + EntityValue,
828    {
829        // Phase 1: fail closed if the caller routes a non-delete query here.
830        if !query.mode().is_delete() {
831            return Err(QueryError::unsupported_query(
832                "delete count execution requires delete query mode",
833            ));
834        }
835
836        // Phase 2: resolve one cached prepared execution-plan contract directly
837        // from the shared lower boundary instead of rebuilding it through the
838        // typed compiled-query wrapper.
839        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
840
841        // Phase 3: execute the shared delete core while skipping response-row materialization.
842        self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
843            .map_err(QueryError::execute)
844    }
845
846    /// Execute one scalar query from one pre-built prepared execution contract.
847    ///
848    /// This is the shared compiled-plan entry boundary used by the typed
849    /// `execute_query(...)` surface and adjacent query execution facades.
850    pub(in crate::db) fn execute_query_dyn<E>(
851        &self,
852        mode: QueryMode,
853        plan: PreparedExecutionPlan<E>,
854    ) -> Result<EntityResponse<E>, QueryError>
855    where
856        E: PersistedRow<Canister = C> + EntityValue,
857    {
858        let result = match mode {
859            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
860            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
861        };
862
863        result.map_err(QueryError::execute)
864    }
865
866    // Shared load-query terminal wrapper: build plan, run under metrics, map
867    // execution errors into query-facing errors.
868    pub(in crate::db) fn execute_load_query_with<E, T>(
869        &self,
870        query: &Query<E>,
871        op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
872    ) -> Result<T, QueryError>
873    where
874        E: PersistedRow<Canister = C> + EntityValue,
875    {
876        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
877
878        self.with_metrics(|| op(self.load_executor::<E>(), plan))
879            .map_err(QueryError::execute)
880    }
881
882    // Execute one scalar terminal boundary and keep the executor-specific
883    // request/output types contained inside the session adapter.
884    fn execute_scalar_terminal_boundary<E>(
885        &self,
886        query: &Query<E>,
887        request: ScalarTerminalBoundaryRequest,
888    ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
889    where
890        E: PersistedRow<Canister = C> + EntityValue,
891    {
892        self.execute_load_query_with(query, move |load, plan| {
893            load.execute_scalar_terminal_request(plan, request)
894        })
895    }
896
897    // Execute one projection terminal boundary and keep field projection
898    // executor details out of fluent query modules.
899    fn execute_scalar_projection_boundary<E>(
900        &self,
901        query: &Query<E>,
902        target_field: FieldSlot,
903        request: ScalarProjectionBoundaryRequest,
904    ) -> Result<ScalarProjectionBoundaryOutput, QueryError>
905    where
906        E: PersistedRow<Canister = C> + EntityValue,
907    {
908        self.execute_load_query_with(query, move |load, plan| {
909            load.execute_scalar_projection_boundary(plan, target_field, request)
910        })
911    }
912
913    // Execute one fluent count/exists terminal through a query-owned result
914    // shape so fluent terminals do not import executor aggregate outputs.
915    pub(in crate::db) fn execute_fluent_existing_rows_terminal<E>(
916        &self,
917        query: &Query<E>,
918        strategy: PreparedFluentExistingRowsTerminalStrategy,
919    ) -> Result<FluentScalarTerminalOutput<E>, QueryError>
920    where
921        E: PersistedRow<Canister = C> + EntityValue,
922    {
923        match strategy.into_runtime_request() {
924            PreparedFluentExistingRowsTerminalRuntimeRequest::CountRows => self
925                .execute_scalar_terminal_boundary(query, ScalarTerminalBoundaryRequest::Count)?
926                .into_count()
927                .map(FluentScalarTerminalOutput::Count)
928                .map_err(QueryError::execute),
929            PreparedFluentExistingRowsTerminalRuntimeRequest::ExistsRows => self
930                .execute_scalar_terminal_boundary(query, ScalarTerminalBoundaryRequest::Exists)?
931                .into_exists()
932                .map(FluentScalarTerminalOutput::Exists)
933                .map_err(QueryError::execute),
934        }
935    }
936
937    // Execute one fluent id/extrema terminal through a query-owned result
938    // shape after the session adapter has decoded storage keys into typed ids.
939    pub(in crate::db) fn execute_fluent_scalar_terminal<E>(
940        &self,
941        query: &Query<E>,
942        strategy: PreparedFluentScalarTerminalStrategy,
943    ) -> Result<FluentScalarTerminalOutput<E>, QueryError>
944    where
945        E: PersistedRow<Canister = C> + EntityValue,
946    {
947        let request = match strategy.into_runtime_request() {
948            PreparedFluentScalarTerminalRuntimeRequest::IdTerminal { kind } => {
949                ScalarTerminalBoundaryRequest::IdTerminal { kind }
950            }
951            PreparedFluentScalarTerminalRuntimeRequest::IdBySlot { kind, target_field } => {
952                ScalarTerminalBoundaryRequest::IdBySlot { kind, target_field }
953            }
954        };
955
956        self.execute_scalar_terminal_boundary(query, request)?
957            .into_id::<E>()
958            .map(FluentScalarTerminalOutput::Id)
959            .map_err(QueryError::execute)
960    }
961
962    // Execute one fluent order-sensitive terminal through the session adapter.
963    // The min/max pair request remains distinguished because it returns two ids.
964    pub(in crate::db) fn execute_fluent_order_sensitive_terminal<E>(
965        &self,
966        query: &Query<E>,
967        strategy: PreparedFluentOrderSensitiveTerminalStrategy,
968    ) -> Result<FluentScalarTerminalOutput<E>, QueryError>
969    where
970        E: PersistedRow<Canister = C> + EntityValue,
971    {
972        match strategy.into_runtime_request() {
973            PreparedFluentOrderSensitiveTerminalRuntimeRequest::ResponseOrder { kind } => self
974                .execute_scalar_terminal_boundary(
975                    query,
976                    ScalarTerminalBoundaryRequest::IdTerminal { kind },
977                )?
978                .into_id::<E>()
979                .map(FluentScalarTerminalOutput::Id)
980                .map_err(QueryError::execute),
981            PreparedFluentOrderSensitiveTerminalRuntimeRequest::NthBySlot { target_field, nth } => {
982                self.execute_scalar_terminal_boundary(
983                    query,
984                    ScalarTerminalBoundaryRequest::NthBySlot { target_field, nth },
985                )?
986                .into_id::<E>()
987                .map(FluentScalarTerminalOutput::Id)
988                .map_err(QueryError::execute)
989            }
990            PreparedFluentOrderSensitiveTerminalRuntimeRequest::MedianBySlot { target_field } => {
991                self.execute_scalar_terminal_boundary(
992                    query,
993                    ScalarTerminalBoundaryRequest::MedianBySlot { target_field },
994                )?
995                .into_id::<E>()
996                .map(FluentScalarTerminalOutput::Id)
997                .map_err(QueryError::execute)
998            }
999            PreparedFluentOrderSensitiveTerminalRuntimeRequest::MinMaxBySlot { target_field } => {
1000                self.execute_scalar_terminal_boundary(
1001                    query,
1002                    ScalarTerminalBoundaryRequest::MinMaxBySlot { target_field },
1003                )?
1004                .into_id_pair::<E>()
1005                .map(FluentScalarTerminalOutput::IdPair)
1006                .map_err(QueryError::execute)
1007            }
1008        }
1009    }
1010
1011    // Execute one fluent numeric-field terminal through the session-owned
1012    // request conversion layer.
1013    pub(in crate::db) fn execute_fluent_numeric_field_terminal<E>(
1014        &self,
1015        query: &Query<E>,
1016        strategy: PreparedFluentNumericFieldStrategy,
1017    ) -> Result<Option<Decimal>, QueryError>
1018    where
1019        E: PersistedRow<Canister = C> + EntityValue,
1020    {
1021        let (target_field, runtime_request) = strategy.into_runtime_parts();
1022        let request = match runtime_request {
1023            PreparedFluentNumericFieldRuntimeRequest::Sum => ScalarNumericFieldBoundaryRequest::Sum,
1024            PreparedFluentNumericFieldRuntimeRequest::SumDistinct => {
1025                ScalarNumericFieldBoundaryRequest::SumDistinct
1026            }
1027            PreparedFluentNumericFieldRuntimeRequest::Avg => ScalarNumericFieldBoundaryRequest::Avg,
1028            PreparedFluentNumericFieldRuntimeRequest::AvgDistinct => {
1029                ScalarNumericFieldBoundaryRequest::AvgDistinct
1030            }
1031        };
1032
1033        self.execute_load_query_with(query, move |load, plan| {
1034            load.execute_numeric_field_boundary(plan, target_field, request)
1035        })
1036    }
1037
1038    // Execute one fluent projection terminal through a query-owned output
1039    // shape after the session adapter has decoded any data keys into typed ids.
1040    pub(in crate::db) fn execute_fluent_projection_terminal<E>(
1041        &self,
1042        query: &Query<E>,
1043        strategy: PreparedFluentProjectionStrategy,
1044    ) -> Result<FluentProjectionTerminalOutput<E>, QueryError>
1045    where
1046        E: PersistedRow<Canister = C> + EntityValue,
1047    {
1048        let (target_field, runtime_request) = strategy.into_runtime_parts();
1049
1050        match runtime_request {
1051            PreparedFluentProjectionRuntimeRequest::Values => self
1052                .execute_scalar_projection_boundary(
1053                    query,
1054                    target_field,
1055                    ScalarProjectionBoundaryRequest::Values,
1056                )?
1057                .into_values()
1058                .map(FluentProjectionTerminalOutput::Values)
1059                .map_err(QueryError::execute),
1060            PreparedFluentProjectionRuntimeRequest::DistinctValues => self
1061                .execute_scalar_projection_boundary(
1062                    query,
1063                    target_field,
1064                    ScalarProjectionBoundaryRequest::DistinctValues,
1065                )?
1066                .into_values()
1067                .map(FluentProjectionTerminalOutput::Values)
1068                .map_err(QueryError::execute),
1069            PreparedFluentProjectionRuntimeRequest::CountDistinct => self
1070                .execute_scalar_projection_boundary(
1071                    query,
1072                    target_field,
1073                    ScalarProjectionBoundaryRequest::CountDistinct,
1074                )?
1075                .into_count()
1076                .map(FluentProjectionTerminalOutput::Count)
1077                .map_err(QueryError::execute),
1078            PreparedFluentProjectionRuntimeRequest::ValuesWithIds => self
1079                .execute_scalar_projection_boundary(
1080                    query,
1081                    target_field,
1082                    ScalarProjectionBoundaryRequest::ValuesWithIds,
1083                )?
1084                .into_values_with_ids::<E>()
1085                .map(FluentProjectionTerminalOutput::ValuesWithIds)
1086                .map_err(QueryError::execute),
1087            PreparedFluentProjectionRuntimeRequest::TerminalValue { terminal_kind } => self
1088                .execute_scalar_projection_boundary(
1089                    query,
1090                    target_field,
1091                    ScalarProjectionBoundaryRequest::TerminalValue { terminal_kind },
1092                )?
1093                .into_terminal_value()
1094                .map(FluentProjectionTerminalOutput::TerminalValue)
1095                .map_err(QueryError::execute),
1096        }
1097    }
1098
1099    // Execute the fluent `bytes()` terminal without leaking `LoadExecutor`
1100    // closure assembly into query fluent code.
1101    pub(in crate::db) fn execute_fluent_bytes<E>(&self, query: &Query<E>) -> Result<u64, QueryError>
1102    where
1103        E: PersistedRow<Canister = C> + EntityValue,
1104    {
1105        self.execute_load_query_with(query, |load, plan| load.bytes(plan))
1106    }
1107
1108    // Execute the fluent `bytes_by(field)` terminal at the session boundary.
1109    pub(in crate::db) fn execute_fluent_bytes_by_slot<E>(
1110        &self,
1111        query: &Query<E>,
1112        target_slot: FieldSlot,
1113    ) -> Result<u64, QueryError>
1114    where
1115        E: PersistedRow<Canister = C> + EntityValue,
1116    {
1117        self.execute_load_query_with(query, move |load, plan| {
1118            load.bytes_by_slot(plan, target_slot)
1119        })
1120    }
1121
1122    // Execute the fluent `take(k)` terminal at the session boundary.
1123    pub(in crate::db) fn execute_fluent_take<E>(
1124        &self,
1125        query: &Query<E>,
1126        take_count: u32,
1127    ) -> Result<EntityResponse<E>, QueryError>
1128    where
1129        E: PersistedRow<Canister = C> + EntityValue,
1130    {
1131        self.execute_load_query_with(query, move |load, plan| load.take(plan, take_count))
1132    }
1133
1134    // Execute one row-returning fluent top/bottom-k terminal at the session boundary.
1135    pub(in crate::db) fn execute_fluent_ranked_rows_by_slot<E>(
1136        &self,
1137        query: &Query<E>,
1138        target_slot: FieldSlot,
1139        take_count: u32,
1140        descending: bool,
1141    ) -> Result<EntityResponse<E>, QueryError>
1142    where
1143        E: PersistedRow<Canister = C> + EntityValue,
1144    {
1145        self.execute_load_query_with(query, move |load, plan| {
1146            if descending {
1147                load.top_k_by_slot(plan, target_slot, take_count)
1148            } else {
1149                load.bottom_k_by_slot(plan, target_slot, take_count)
1150            }
1151        })
1152    }
1153
1154    // Execute one value-returning fluent top/bottom-k terminal at the session boundary.
1155    pub(in crate::db) fn execute_fluent_ranked_values_by_slot<E>(
1156        &self,
1157        query: &Query<E>,
1158        target_slot: FieldSlot,
1159        take_count: u32,
1160        descending: bool,
1161    ) -> Result<Vec<Value>, QueryError>
1162    where
1163        E: PersistedRow<Canister = C> + EntityValue,
1164    {
1165        self.execute_load_query_with(query, move |load, plan| {
1166            if descending {
1167                load.top_k_by_values_slot(plan, target_slot, take_count)
1168            } else {
1169                load.bottom_k_by_values_slot(plan, target_slot, take_count)
1170            }
1171        })
1172    }
1173
1174    // Execute one id/value-returning fluent top/bottom-k terminal at the session boundary.
1175    pub(in crate::db) fn execute_fluent_ranked_values_with_ids_by_slot<E>(
1176        &self,
1177        query: &Query<E>,
1178        target_slot: FieldSlot,
1179        take_count: u32,
1180        descending: bool,
1181    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
1182    where
1183        E: PersistedRow<Canister = C> + EntityValue,
1184    {
1185        self.execute_load_query_with(query, move |load, plan| {
1186            if descending {
1187                load.top_k_by_with_ids_slot(plan, target_slot, take_count)
1188            } else {
1189                load.bottom_k_by_with_ids_slot(plan, target_slot, take_count)
1190            }
1191        })
1192    }
1193
1194    /// Build one trace payload for a query without executing it.
1195    ///
1196    /// This lightweight surface is intended for developer diagnostics:
1197    /// plan hash, access strategy summary, and planner/executor route shape.
1198    pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
1199    where
1200        E: EntityKind<Canister = C>,
1201    {
1202        let visibility = self.query_plan_visibility_for_store_path(E::Store::PATH)?;
1203        let visible_indexes = Self::visible_indexes_for_model(E::MODEL, visibility);
1204        let (prepared_plan, cache_attribution) =
1205            self.cached_prepared_query_plan_for_entity::<E>(query)?;
1206        let logical_plan = prepared_plan.logical_plan();
1207        let explain = logical_plan.explain();
1208        let plan_hash = query.plan_hash_hex_with_visible_indexes(&visible_indexes)?;
1209        let executable_access = prepared_plan.access().executable_contract();
1210        let access_strategy = summarize_executable_access_plan(&executable_access);
1211        let execution_family = match query.mode() {
1212            QueryMode::Load(_) => Some(trace_execution_family_from_executor(
1213                prepared_plan
1214                    .execution_family()
1215                    .map_err(QueryError::execute)?,
1216            )),
1217            QueryMode::Delete(_) => None,
1218        };
1219        let reuse = query_plan_cache_reuse_event(cache_attribution);
1220
1221        Ok(QueryTracePlan::new(
1222            plan_hash,
1223            access_strategy,
1224            execution_family,
1225            reuse,
1226            explain,
1227        ))
1228    }
1229
1230    /// Execute one scalar paged load query and return optional continuation cursor plus trace.
1231    pub(crate) fn execute_load_query_paged_with_trace<E>(
1232        &self,
1233        query: &Query<E>,
1234        cursor_token: Option<&str>,
1235    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
1236    where
1237        E: PersistedRow<Canister = C> + EntityValue,
1238    {
1239        // Phase 1: build/validate prepared execution plan and reject grouped plans.
1240        let plan = self.cached_prepared_query_plan_for_entity::<E>(query)?.0;
1241        Self::ensure_scalar_paged_execution_family(
1242            plan.execution_family().map_err(QueryError::execute)?,
1243        )?;
1244
1245        // Phase 2: decode external cursor token and validate it against plan surface.
1246        let cursor_bytes = decode_optional_cursor_token(cursor_token)
1247            .map_err(QueryError::from_cursor_plan_error)?;
1248        let cursor = plan
1249            .prepare_cursor(cursor_bytes.as_deref())
1250            .map_err(query_error_from_executor_plan_error)?;
1251
1252        // Phase 3: execute one traced page and encode outbound continuation token.
1253        let (page, trace) = self
1254            .with_metrics(|| {
1255                self.load_executor::<E>()
1256                    .execute_paged_with_cursor_traced(plan, cursor)
1257            })
1258            .map_err(QueryError::execute)?;
1259        let next_cursor = page
1260            .next_cursor
1261            .map(|token| {
1262                let Some(token) = token.as_scalar() else {
1263                    return Err(QueryError::scalar_paged_emitted_grouped_continuation());
1264                };
1265
1266                token.encode().map_err(|err| {
1267                    QueryError::serialize_internal(format!(
1268                        "failed to serialize continuation cursor: {err}"
1269                    ))
1270                })
1271            })
1272            .transpose()?;
1273
1274        Ok(PagedLoadExecutionWithTrace::new(
1275            page.items,
1276            next_cursor,
1277            trace,
1278        ))
1279    }
1280
1281    /// Execute one grouped query page with optional grouped continuation cursor.
1282    ///
1283    /// This is the explicit grouped execution boundary; scalar load APIs reject
1284    /// grouped plans to preserve scalar response contracts.
1285    pub(in crate::db) fn execute_grouped<E>(
1286        &self,
1287        query: &Query<E>,
1288        cursor_token: Option<&str>,
1289    ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
1290    where
1291        E: PersistedRow<Canister = C> + EntityValue,
1292    {
1293        // Phase 1: build the prepared execution plan once from the typed query.
1294        let plan = self.cached_prepared_query_plan_for_entity::<E>(query)?.0;
1295
1296        // Phase 2: reuse the shared prepared grouped execution path and then
1297        // finalize the outward grouped payload at the session boundary.
1298        let (page, trace) = self.execute_grouped_plan_with_trace(plan, cursor_token)?;
1299
1300        Self::finalize_grouped_execution_page(page, trace)
1301    }
1302
1303    // Execute one grouped prepared plan page with optional grouped cursor
1304    // while letting the caller choose the final grouped-runtime dispatch.
1305    fn execute_grouped_plan_with<E, T>(
1306        &self,
1307        plan: PreparedExecutionPlan<E>,
1308        cursor_token: Option<&str>,
1309        op: impl FnOnce(
1310            LoadExecutor<E>,
1311            PreparedExecutionPlan<E>,
1312            crate::db::cursor::GroupedPlannedCursor,
1313        ) -> Result<T, InternalError>,
1314    ) -> Result<T, QueryError>
1315    where
1316        E: PersistedRow<Canister = C> + EntityValue,
1317    {
1318        // Phase 1: validate the prepared plan shape before decoding cursors.
1319        Self::ensure_grouped_execution_family(
1320            plan.execution_family().map_err(QueryError::execute)?,
1321        )?;
1322
1323        // Phase 2: decode external grouped cursor token and validate against plan.
1324        let cursor = decode_optional_grouped_cursor_token(cursor_token)
1325            .map_err(QueryError::from_cursor_plan_error)?;
1326        let cursor = plan
1327            .prepare_grouped_cursor_token(cursor)
1328            .map_err(query_error_from_executor_plan_error)?;
1329
1330        // Phase 3: execute one grouped page while preserving the structural
1331        // grouped cursor payload for whichever outward cursor format the caller needs.
1332        self.with_metrics(|| op(self.load_executor::<E>(), plan, cursor))
1333            .map_err(QueryError::execute)
1334    }
1335
1336    // Execute one grouped prepared plan page with optional grouped cursor.
1337    fn execute_grouped_plan_with_trace<E>(
1338        &self,
1339        plan: PreparedExecutionPlan<E>,
1340        cursor_token: Option<&str>,
1341    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
1342    where
1343        E: PersistedRow<Canister = C> + EntityValue,
1344    {
1345        self.execute_grouped_plan_with(plan, cursor_token, |executor, plan, cursor| {
1346            executor.execute_grouped_paged_with_cursor_traced(plan, cursor)
1347        })
1348    }
1349}
1350
1351impl QueryPlanCacheKey {
1352    // Assemble the canonical cache-key shell once so the test and
1353    // normalized-predicate constructors only decide which structural query key
1354    // they feed into the shared session cache identity.
1355    const fn from_authority_parts(
1356        authority: crate::db::executor::EntityAuthority,
1357        schema_fingerprint: CommitSchemaFingerprint,
1358        visibility: QueryPlanVisibility,
1359        structural_query: crate::db::query::intent::StructuralQueryCacheKey,
1360        cache_method_version: u8,
1361    ) -> Self {
1362        Self {
1363            cache_method_version,
1364            entity_path: authority.entity_path(),
1365            schema_fingerprint,
1366            visibility,
1367            structural_query,
1368        }
1369    }
1370
1371    #[cfg(test)]
1372    fn for_authority_with_method_version(
1373        authority: crate::db::executor::EntityAuthority,
1374        schema_fingerprint: CommitSchemaFingerprint,
1375        visibility: QueryPlanVisibility,
1376        query: &StructuralQuery,
1377        cache_method_version: u8,
1378    ) -> Self {
1379        Self::from_authority_parts(
1380            authority,
1381            schema_fingerprint,
1382            visibility,
1383            query.structural_cache_key(),
1384            cache_method_version,
1385        )
1386    }
1387
1388    fn for_authority_with_normalized_predicate_fingerprint_and_method_version(
1389        authority: crate::db::executor::EntityAuthority,
1390        schema_fingerprint: CommitSchemaFingerprint,
1391        visibility: QueryPlanVisibility,
1392        query: &StructuralQuery,
1393        normalized_predicate_fingerprint: Option<[u8; 32]>,
1394        cache_method_version: u8,
1395    ) -> Self {
1396        Self::from_authority_parts(
1397            authority,
1398            schema_fingerprint,
1399            visibility,
1400            query.structural_cache_key_with_normalized_predicate_fingerprint(
1401                normalized_predicate_fingerprint,
1402            ),
1403            cache_method_version,
1404        )
1405    }
1406}