Skip to main content

icydb_core/db/session/
query.rs

1//! Module: db::session::query
2//! Responsibility: session-bound query planning, explain, and cursor execution
3//! helpers that recover store visibility before delegating to query-owned logic.
4//! Does not own: query intent construction or executor runtime semantics.
5//! Boundary: resolves session visibility and cursor policy before handing work to the planner/executor.
6
7mod cache;
8
9#[cfg(feature = "diagnostics")]
10use crate::db::executor::{
11    GroupedCountAttribution, GroupedExecutePhaseAttribution, ScalarExecutePhaseAttribution,
12};
13use crate::{
14    db::{
15        DbSession, EntityResponse, LoadQueryResult, PagedGroupedExecutionWithTrace,
16        PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
17        TraceExecutionFamily,
18        access::summarize_executable_access_plan,
19        cursor::{
20            CursorPlanError, decode_optional_cursor_token, decode_optional_grouped_cursor_token,
21        },
22        diagnostics::ExecutionTrace,
23        executor::{
24            ExecutionFamily, ExecutorPlanError, GroupedCursorPage, LoadExecutor,
25            PreparedExecutionPlan, ScalarNumericFieldBoundaryRequest,
26            ScalarProjectionBoundaryOutput, ScalarProjectionBoundaryRequest,
27            ScalarTerminalBoundaryOutput, ScalarTerminalBoundaryRequest,
28        },
29        query::builder::{
30            PreparedFluentAggregateExplainStrategy,
31            PreparedFluentExistingRowsTerminalRuntimeRequest,
32            PreparedFluentExistingRowsTerminalStrategy, PreparedFluentNumericFieldRuntimeRequest,
33            PreparedFluentNumericFieldStrategy, PreparedFluentOrderSensitiveTerminalRuntimeRequest,
34            PreparedFluentOrderSensitiveTerminalStrategy, PreparedFluentProjectionRuntimeRequest,
35            PreparedFluentProjectionStrategy, PreparedFluentScalarTerminalRuntimeRequest,
36            PreparedFluentScalarTerminalStrategy,
37        },
38        query::explain::{
39            ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
40        },
41        query::fluent::load::{FluentProjectionTerminalOutput, FluentScalarTerminalOutput},
42        query::{
43            intent::{CompiledQuery, PlannedQuery},
44            plan::{FieldSlot, QueryMode},
45        },
46        session::{finalize_grouped_paged_execution, finalize_scalar_paged_execution},
47    },
48    error::InternalError,
49    traits::{CanisterKind, EntityKind, EntityValue, Path},
50    types::{Decimal, Id},
51    value::Value,
52};
53pub(in crate::db) use cache::QueryPlanCacheAttribution;
54#[cfg(test)]
55pub(in crate::db) use cache::QueryPlanVisibility;
56pub(in crate::db::session) use cache::query_plan_cache_reuse_event;
57#[cfg(feature = "diagnostics")]
58use candid::CandidType;
59#[cfg(feature = "diagnostics")]
60use serde::Deserialize;
61
62// Translate executor route-family selection into the query-owned trace label
63// at the session boundary so trace DTOs do not depend on executor types.
64const fn trace_execution_family_from_executor(family: ExecutionFamily) -> TraceExecutionFamily {
65    match family {
66        ExecutionFamily::PrimaryKey => TraceExecutionFamily::PrimaryKey,
67        ExecutionFamily::Ordered => TraceExecutionFamily::Ordered,
68        ExecutionFamily::Grouped => TraceExecutionFamily::Grouped,
69    }
70}
71
72// Convert executor plan-surface failures at the session boundary so query error
73// types do not import executor-owned error enums.
74pub(in crate::db::session) fn query_error_from_executor_plan_error(
75    err: ExecutorPlanError,
76) -> QueryError {
77    match err {
78        ExecutorPlanError::Cursor(err) => QueryError::from_cursor_plan_error(*err),
79    }
80}
81
82///
83/// QueryExecutionAttribution
84///
85/// QueryExecutionAttribution records the top-level compile/execute split for
86/// typed/fluent query execution at the session boundary.
87/// Every field is an additive counter where zero means no observed work or no
88/// observed event for that bucket. Future non-additive diagnostics must use an
89/// explicit presence type instead of relying on `Default`.
90///
91#[cfg(feature = "diagnostics")]
92#[derive(CandidType, Clone, Debug, Default, Deserialize, Eq, PartialEq)]
93pub struct QueryExecutionAttribution {
94    pub compile_local_instructions: u64,
95    pub plan_lookup_local_instructions: u64,
96    pub executor_invocation_local_instructions: u64,
97    pub response_finalization_local_instructions: u64,
98    pub runtime_local_instructions: u64,
99    pub finalize_local_instructions: u64,
100    pub direct_data_row_scan_local_instructions: u64,
101    pub direct_data_row_key_stream_local_instructions: u64,
102    pub direct_data_row_row_read_local_instructions: u64,
103    pub direct_data_row_key_encode_local_instructions: u64,
104    pub direct_data_row_store_get_local_instructions: u64,
105    pub direct_data_row_order_window_local_instructions: u64,
106    pub direct_data_row_page_window_local_instructions: u64,
107    pub grouped_stream_local_instructions: u64,
108    pub grouped_fold_local_instructions: u64,
109    pub grouped_finalize_local_instructions: u64,
110    pub grouped_count_borrowed_hash_computations: u64,
111    pub grouped_count_bucket_candidate_checks: u64,
112    pub grouped_count_existing_group_hits: u64,
113    pub grouped_count_new_group_inserts: u64,
114    pub grouped_count_row_materialization_local_instructions: u64,
115    pub grouped_count_group_lookup_local_instructions: u64,
116    pub grouped_count_existing_group_update_local_instructions: u64,
117    pub grouped_count_new_group_insert_local_instructions: u64,
118    pub response_decode_local_instructions: u64,
119    pub execute_local_instructions: u64,
120    pub total_local_instructions: u64,
121    pub shared_query_plan_cache_hits: u64,
122    pub shared_query_plan_cache_misses: u64,
123}
124
125#[cfg(feature = "diagnostics")]
126#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
127struct QueryExecutePhaseAttribution {
128    executor_invocation_local_instructions: u64,
129    response_finalization_local_instructions: u64,
130    runtime_local_instructions: u64,
131    finalize_local_instructions: u64,
132    direct_data_row_scan_local_instructions: u64,
133    direct_data_row_key_stream_local_instructions: u64,
134    direct_data_row_row_read_local_instructions: u64,
135    direct_data_row_key_encode_local_instructions: u64,
136    direct_data_row_store_get_local_instructions: u64,
137    direct_data_row_order_window_local_instructions: u64,
138    direct_data_row_page_window_local_instructions: u64,
139    grouped_stream_local_instructions: u64,
140    grouped_fold_local_instructions: u64,
141    grouped_finalize_local_instructions: u64,
142    grouped_count: GroupedCountAttribution,
143}
144
145#[cfg(feature = "diagnostics")]
146#[expect(
147    clippy::missing_const_for_fn,
148    reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
149)]
150fn read_query_local_instruction_counter() -> u64 {
151    #[cfg(target_arch = "wasm32")]
152    {
153        canic_cdk::api::performance_counter(1)
154    }
155
156    #[cfg(not(target_arch = "wasm32"))]
157    {
158        0
159    }
160}
161
162#[cfg(feature = "diagnostics")]
163fn measure_query_stage<T, E>(run: impl FnOnce() -> Result<T, E>) -> (u64, Result<T, E>) {
164    let start = read_query_local_instruction_counter();
165    let result = run();
166    let delta = read_query_local_instruction_counter().saturating_sub(start);
167
168    (delta, result)
169}
170
171impl<C: CanisterKind> DbSession<C> {
172    #[cfg(feature = "diagnostics")]
173    const fn empty_query_execute_phase_attribution() -> QueryExecutePhaseAttribution {
174        QueryExecutePhaseAttribution {
175            executor_invocation_local_instructions: 0,
176            response_finalization_local_instructions: 0,
177            runtime_local_instructions: 0,
178            finalize_local_instructions: 0,
179            direct_data_row_scan_local_instructions: 0,
180            direct_data_row_key_stream_local_instructions: 0,
181            direct_data_row_row_read_local_instructions: 0,
182            direct_data_row_key_encode_local_instructions: 0,
183            direct_data_row_store_get_local_instructions: 0,
184            direct_data_row_order_window_local_instructions: 0,
185            direct_data_row_page_window_local_instructions: 0,
186            grouped_stream_local_instructions: 0,
187            grouped_fold_local_instructions: 0,
188            grouped_finalize_local_instructions: 0,
189            grouped_count: GroupedCountAttribution::none(),
190        }
191    }
192
193    #[cfg(feature = "diagnostics")]
194    const fn scalar_query_execute_phase_attribution(
195        phase: ScalarExecutePhaseAttribution,
196        executor_invocation_local_instructions: u64,
197    ) -> QueryExecutePhaseAttribution {
198        QueryExecutePhaseAttribution {
199            executor_invocation_local_instructions,
200            response_finalization_local_instructions: 0,
201            runtime_local_instructions: phase.runtime_local_instructions,
202            finalize_local_instructions: phase.finalize_local_instructions,
203            direct_data_row_scan_local_instructions: phase.direct_data_row_scan_local_instructions,
204            direct_data_row_key_stream_local_instructions: phase
205                .direct_data_row_key_stream_local_instructions,
206            direct_data_row_row_read_local_instructions: phase
207                .direct_data_row_row_read_local_instructions,
208            direct_data_row_key_encode_local_instructions: phase
209                .direct_data_row_key_encode_local_instructions,
210            direct_data_row_store_get_local_instructions: phase
211                .direct_data_row_store_get_local_instructions,
212            direct_data_row_order_window_local_instructions: phase
213                .direct_data_row_order_window_local_instructions,
214            direct_data_row_page_window_local_instructions: phase
215                .direct_data_row_page_window_local_instructions,
216            grouped_stream_local_instructions: 0,
217            grouped_fold_local_instructions: 0,
218            grouped_finalize_local_instructions: 0,
219            grouped_count: GroupedCountAttribution::none(),
220        }
221    }
222
223    #[cfg(feature = "diagnostics")]
224    const fn grouped_query_execute_phase_attribution(
225        phase: GroupedExecutePhaseAttribution,
226        executor_invocation_local_instructions: u64,
227        response_finalization_local_instructions: u64,
228    ) -> QueryExecutePhaseAttribution {
229        QueryExecutePhaseAttribution {
230            executor_invocation_local_instructions,
231            response_finalization_local_instructions,
232            runtime_local_instructions: phase
233                .stream_local_instructions
234                .saturating_add(phase.fold_local_instructions),
235            finalize_local_instructions: phase.finalize_local_instructions,
236            direct_data_row_scan_local_instructions: 0,
237            direct_data_row_key_stream_local_instructions: 0,
238            direct_data_row_row_read_local_instructions: 0,
239            direct_data_row_key_encode_local_instructions: 0,
240            direct_data_row_store_get_local_instructions: 0,
241            direct_data_row_order_window_local_instructions: 0,
242            direct_data_row_page_window_local_instructions: 0,
243            grouped_stream_local_instructions: phase.stream_local_instructions,
244            grouped_fold_local_instructions: phase.fold_local_instructions,
245            grouped_finalize_local_instructions: phase.finalize_local_instructions,
246            grouped_count: phase.grouped_count,
247        }
248    }
249
250    // Compile one typed query using only the indexes currently visible for the
251    // query's recovered store.
252    pub(in crate::db) fn compile_query_with_visible_indexes<E>(
253        &self,
254        query: &Query<E>,
255    ) -> Result<CompiledQuery<E>, QueryError>
256    where
257        E: EntityKind<Canister = C>,
258    {
259        self.map_cached_shared_query_plan_for_entity(query, CompiledQuery::<E>::from_plan)
260    }
261
262    // Build one logical planned-query shell using only the indexes currently
263    // visible for the query's recovered store.
264    pub(in crate::db) fn planned_query_with_visible_indexes<E>(
265        &self,
266        query: &Query<E>,
267    ) -> Result<PlannedQuery<E>, QueryError>
268    where
269        E: EntityKind<Canister = C>,
270    {
271        self.map_cached_shared_query_plan_for_entity(query, PlannedQuery::<E>::from_plan)
272    }
273
274    // Project one logical explain payload using only planner-visible indexes.
275    pub(in crate::db) fn explain_query_with_visible_indexes<E>(
276        &self,
277        query: &Query<E>,
278    ) -> Result<ExplainPlan, QueryError>
279    where
280        E: EntityKind<Canister = C>,
281    {
282        self.with_query_visible_indexes(query, Query::<E>::explain_with_visible_indexes)
283    }
284
285    // Hash one typed query plan using only the indexes currently visible for
286    // the query's recovered store.
287    pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
288        &self,
289        query: &Query<E>,
290    ) -> Result<String, QueryError>
291    where
292        E: EntityKind<Canister = C>,
293    {
294        self.with_query_visible_indexes(query, Query::<E>::plan_hash_hex_with_visible_indexes)
295    }
296
297    // Explain one load execution shape using only planner-visible
298    // indexes from the recovered store state.
299    pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
300        &self,
301        query: &Query<E>,
302    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
303    where
304        E: EntityValue + EntityKind<Canister = C>,
305    {
306        self.with_query_visible_indexes(query, Query::<E>::explain_execution_with_visible_indexes)
307    }
308
309    // Render one load execution descriptor plus route diagnostics using
310    // only planner-visible indexes from the recovered store state.
311    pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
312        &self,
313        query: &Query<E>,
314    ) -> Result<String, QueryError>
315    where
316        E: EntityValue + EntityKind<Canister = C>,
317    {
318        self.with_query_visible_indexes(query, |query, visible_indexes| {
319            let (prepared_plan, cache_attribution) =
320                self.cached_prepared_query_plan_for_entity(query)?;
321            let mut plan = prepared_plan.logical_plan().clone();
322
323            // Freeze the same planner-owned explain access-choice snapshot used
324            // by the direct non-cached explain path before rendering verbose
325            // diagnostics from the reused logical plan.
326            plan.finalize_access_choice_for_model_with_indexes(
327                query.structural().model(),
328                visible_indexes.as_slice(),
329            );
330
331            query
332                .structural()
333                .finalized_execution_diagnostics_from_plan_with_descriptor_mutator(
334                    &plan,
335                    Some(query_plan_cache_reuse_event(cache_attribution)),
336                    |_| {},
337                )
338                .map(|diagnostics| diagnostics.render_text_verbose())
339        })
340    }
341
342    // Explain one prepared fluent aggregate terminal using only
343    // planner-visible indexes from the recovered store state.
344    pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
345        &self,
346        query: &Query<E>,
347        strategy: &S,
348    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
349    where
350        E: EntityValue + EntityKind<Canister = C>,
351        S: PreparedFluentAggregateExplainStrategy,
352    {
353        self.with_query_visible_indexes(query, |query, visible_indexes| {
354            query
355                .explain_prepared_aggregate_terminal_with_visible_indexes(visible_indexes, strategy)
356        })
357    }
358
359    // Explain one `bytes_by(field)` terminal using only planner-visible
360    // indexes from the recovered store state.
361    pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
362        &self,
363        query: &Query<E>,
364        target_field: &str,
365    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
366    where
367        E: EntityValue + EntityKind<Canister = C>,
368    {
369        self.with_query_visible_indexes(query, |query, visible_indexes| {
370            query.explain_bytes_by_with_visible_indexes(visible_indexes, target_field)
371        })
372    }
373
374    // Explain one prepared fluent projection terminal using only
375    // planner-visible indexes from the recovered store state.
376    pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
377        &self,
378        query: &Query<E>,
379        strategy: &PreparedFluentProjectionStrategy,
380    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
381    where
382        E: EntityValue + EntityKind<Canister = C>,
383    {
384        self.with_query_visible_indexes(query, |query, visible_indexes| {
385            query.explain_prepared_projection_terminal_with_visible_indexes(
386                visible_indexes,
387                strategy,
388            )
389        })
390    }
391
392    // Validate that one execution strategy is admissible for scalar paged load
393    // execution and fail closed on grouped/primary-key-only routes.
394    fn ensure_scalar_paged_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
395        match family {
396            ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
397                CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
398            )),
399            ExecutionFamily::Ordered => Ok(()),
400            ExecutionFamily::Grouped => Err(QueryError::invariant(
401                "grouped queries execute via execute(), not page().execute()",
402            )),
403        }
404    }
405
406    // Validate that one execution strategy is admissible for the grouped
407    // execution surface.
408    fn ensure_grouped_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
409        match family {
410            ExecutionFamily::Grouped => Ok(()),
411            ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
412                "grouped execution requires grouped logical plans",
413            )),
414        }
415    }
416
417    // Finalize one grouped cursor page into the outward grouped execution
418    // payload so grouped cursor encoding and continuation-shape validation
419    // stay owned by the session boundary.
420    fn finalize_grouped_execution_page(
421        page: GroupedCursorPage,
422        trace: Option<ExecutionTrace>,
423    ) -> Result<PagedGroupedExecutionWithTrace, QueryError> {
424        finalize_grouped_paged_execution(page, trace)
425    }
426
427    /// Execute one scalar load/delete query and return materialized response rows.
428    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
429    where
430        E: PersistedRow<Canister = C> + EntityValue,
431    {
432        // Phase 1: compile typed intent into one prepared execution-plan contract.
433        let mode = query.mode();
434        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
435
436        // Phase 2: delegate execution to the shared compiled-plan entry path.
437        self.execute_query_dyn(mode, plan)
438    }
439
440    /// Execute one typed query while reporting the compile/execute split at
441    /// the shared fluent query seam.
442    #[cfg(feature = "diagnostics")]
443    #[doc(hidden)]
444    #[expect(
445        clippy::too_many_lines,
446        reason = "the diagnostics-only attribution path keeps grouped and scalar execution on one explicit compile/execute accounting seam"
447    )]
448    #[expect(
449        clippy::needless_update,
450        reason = "diagnostics attribution literals stay default-backed so future counters do not break every initializer"
451    )]
452    pub fn execute_query_result_with_attribution<E>(
453        &self,
454        query: &Query<E>,
455    ) -> Result<(LoadQueryResult<E>, QueryExecutionAttribution), QueryError>
456    where
457        E: PersistedRow<Canister = C> + EntityValue,
458    {
459        // Phase 1: measure compile work at the typed/fluent boundary,
460        // including the shared lower query-plan cache lookup/build exactly
461        // once. This preserves honest hit/miss attribution without
462        // double-building plans on one-shot cache misses.
463        let (plan_lookup_local_instructions, plan_and_cache) =
464            measure_query_stage(|| self.cached_prepared_query_plan_for_entity::<E>(query));
465        let (plan, cache_attribution) = plan_and_cache?;
466        let compile_local_instructions = plan_lookup_local_instructions;
467
468        // Phase 2: execute one query result using the prepared plan produced
469        // by the compile/cache boundary above.
470        let result =
471            || -> Result<(LoadQueryResult<E>, QueryExecutePhaseAttribution, u64), QueryError> {
472                if query.has_grouping() {
473                    let (executor_invocation_local_instructions, grouped_page) =
474                        measure_query_stage(|| {
475                            self.execute_grouped_plan_with(plan, None, |executor, plan, cursor| {
476                                executor
477                                    .execute_grouped_paged_with_cursor_traced_with_phase_attribution(
478                                        plan, cursor,
479                                    )
480                            })
481                        });
482                    let (page, trace, phase_attribution) = grouped_page?;
483                    let (response_finalization_local_instructions, grouped) =
484                        measure_query_stage(|| Self::finalize_grouped_execution_page(page, trace));
485                    let grouped = grouped?;
486
487                    Ok((
488                        LoadQueryResult::Grouped(grouped),
489                        Self::grouped_query_execute_phase_attribution(
490                            phase_attribution,
491                            executor_invocation_local_instructions,
492                            response_finalization_local_instructions,
493                        ),
494                        0,
495                    ))
496                } else {
497                    match query.mode() {
498                        QueryMode::Load(_) => {
499                            let (executor_invocation_local_instructions, executed) =
500                                measure_query_stage(|| {
501                                    self.load_executor::<E>()
502                                        .execute_with_phase_attribution(plan)
503                                        .map_err(QueryError::execute)
504                                });
505                            let (rows, phase_attribution, response_decode_local_instructions) =
506                                executed?;
507
508                            Ok((
509                                LoadQueryResult::Rows(rows),
510                                Self::scalar_query_execute_phase_attribution(
511                                    phase_attribution,
512                                    executor_invocation_local_instructions,
513                                ),
514                                response_decode_local_instructions,
515                            ))
516                        }
517                        QueryMode::Delete(_) => {
518                            let (executor_invocation_local_instructions, result) =
519                                measure_query_stage(|| self.execute_query_dyn(query.mode(), plan));
520                            let result = result?;
521
522                            Ok((
523                                LoadQueryResult::Rows(result),
524                                QueryExecutePhaseAttribution {
525                                    executor_invocation_local_instructions,
526                                    ..Self::empty_query_execute_phase_attribution()
527                                },
528                                0,
529                            ))
530                        }
531                    }
532                }
533            }();
534        let (result, execute_phase_attribution, response_decode_local_instructions) = result?;
535        let execute_local_instructions = execute_phase_attribution
536            .executor_invocation_local_instructions
537            .saturating_add(execute_phase_attribution.response_finalization_local_instructions);
538        let total_local_instructions =
539            compile_local_instructions.saturating_add(execute_local_instructions);
540
541        Ok((
542            result,
543            QueryExecutionAttribution {
544                compile_local_instructions,
545                plan_lookup_local_instructions,
546                executor_invocation_local_instructions: execute_phase_attribution
547                    .executor_invocation_local_instructions,
548                response_finalization_local_instructions: execute_phase_attribution
549                    .response_finalization_local_instructions,
550                runtime_local_instructions: execute_phase_attribution.runtime_local_instructions,
551                finalize_local_instructions: execute_phase_attribution.finalize_local_instructions,
552                direct_data_row_scan_local_instructions: execute_phase_attribution
553                    .direct_data_row_scan_local_instructions,
554                direct_data_row_key_stream_local_instructions: execute_phase_attribution
555                    .direct_data_row_key_stream_local_instructions,
556                direct_data_row_row_read_local_instructions: execute_phase_attribution
557                    .direct_data_row_row_read_local_instructions,
558                direct_data_row_key_encode_local_instructions: execute_phase_attribution
559                    .direct_data_row_key_encode_local_instructions,
560                direct_data_row_store_get_local_instructions: execute_phase_attribution
561                    .direct_data_row_store_get_local_instructions,
562                direct_data_row_order_window_local_instructions: execute_phase_attribution
563                    .direct_data_row_order_window_local_instructions,
564                direct_data_row_page_window_local_instructions: execute_phase_attribution
565                    .direct_data_row_page_window_local_instructions,
566                grouped_stream_local_instructions: execute_phase_attribution
567                    .grouped_stream_local_instructions,
568                grouped_fold_local_instructions: execute_phase_attribution
569                    .grouped_fold_local_instructions,
570                grouped_finalize_local_instructions: execute_phase_attribution
571                    .grouped_finalize_local_instructions,
572                grouped_count_borrowed_hash_computations: execute_phase_attribution
573                    .grouped_count
574                    .borrowed_hash_computations,
575                grouped_count_bucket_candidate_checks: execute_phase_attribution
576                    .grouped_count
577                    .bucket_candidate_checks,
578                grouped_count_existing_group_hits: execute_phase_attribution
579                    .grouped_count
580                    .existing_group_hits,
581                grouped_count_new_group_inserts: execute_phase_attribution
582                    .grouped_count
583                    .new_group_inserts,
584                grouped_count_row_materialization_local_instructions: execute_phase_attribution
585                    .grouped_count
586                    .row_materialization_local_instructions,
587                grouped_count_group_lookup_local_instructions: execute_phase_attribution
588                    .grouped_count
589                    .group_lookup_local_instructions,
590                grouped_count_existing_group_update_local_instructions: execute_phase_attribution
591                    .grouped_count
592                    .existing_group_update_local_instructions,
593                grouped_count_new_group_insert_local_instructions: execute_phase_attribution
594                    .grouped_count
595                    .new_group_insert_local_instructions,
596                response_decode_local_instructions,
597                execute_local_instructions,
598                total_local_instructions,
599                shared_query_plan_cache_hits: cache_attribution.hits,
600                shared_query_plan_cache_misses: cache_attribution.misses,
601                ..QueryExecutionAttribution::default()
602            },
603        ))
604    }
605
606    // Execute one typed query through the unified row/grouped result surface so
607    // higher layers do not need to branch on grouped shape themselves.
608    #[doc(hidden)]
609    pub fn execute_query_result<E>(
610        &self,
611        query: &Query<E>,
612    ) -> Result<LoadQueryResult<E>, QueryError>
613    where
614        E: PersistedRow<Canister = C> + EntityValue,
615    {
616        if query.has_grouping() {
617            return self
618                .execute_grouped(query, None)
619                .map(LoadQueryResult::Grouped);
620        }
621
622        self.execute_query(query).map(LoadQueryResult::Rows)
623    }
624
625    /// Execute one typed delete query and return only the affected-row count.
626    #[doc(hidden)]
627    pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
628    where
629        E: PersistedRow<Canister = C> + EntityValue,
630    {
631        // Phase 1: fail closed if the caller routes a non-delete query here.
632        if !query.mode().is_delete() {
633            return Err(QueryError::unsupported_query(
634                "delete count execution requires delete query mode",
635            ));
636        }
637
638        // Phase 2: resolve one cached prepared execution-plan contract directly
639        // from the shared lower boundary instead of rebuilding it through the
640        // typed compiled-query wrapper.
641        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
642
643        // Phase 3: execute the shared delete core while skipping response-row materialization.
644        self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
645            .map_err(QueryError::execute)
646    }
647
648    /// Execute one scalar query from one pre-built prepared execution contract.
649    ///
650    /// This is the shared compiled-plan entry boundary used by the typed
651    /// `execute_query(...)` surface and adjacent query execution facades.
652    pub(in crate::db) fn execute_query_dyn<E>(
653        &self,
654        mode: QueryMode,
655        plan: PreparedExecutionPlan<E>,
656    ) -> Result<EntityResponse<E>, QueryError>
657    where
658        E: PersistedRow<Canister = C> + EntityValue,
659    {
660        let result = match mode {
661            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
662            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
663        };
664
665        result.map_err(QueryError::execute)
666    }
667
668    // Shared load-query terminal wrapper: build plan, run under metrics, map
669    // execution errors into query-facing errors.
670    pub(in crate::db) fn execute_load_query_with<E, T>(
671        &self,
672        query: &Query<E>,
673        op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
674    ) -> Result<T, QueryError>
675    where
676        E: PersistedRow<Canister = C> + EntityValue,
677    {
678        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
679
680        self.with_metrics(|| op(self.load_executor::<E>(), plan))
681            .map_err(QueryError::execute)
682    }
683
684    // Execute one scalar terminal boundary and keep the executor-specific
685    // request/output types contained inside the session adapter.
686    fn execute_scalar_terminal_boundary<E>(
687        &self,
688        query: &Query<E>,
689        request: ScalarTerminalBoundaryRequest,
690    ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
691    where
692        E: PersistedRow<Canister = C> + EntityValue,
693    {
694        self.execute_load_query_with(query, move |load, plan| {
695            load.execute_scalar_terminal_request(plan, request)
696        })
697    }
698
699    // Execute one projection terminal boundary and keep field projection
700    // executor details out of fluent query modules.
701    fn execute_scalar_projection_boundary<E>(
702        &self,
703        query: &Query<E>,
704        target_field: FieldSlot,
705        request: ScalarProjectionBoundaryRequest,
706    ) -> Result<ScalarProjectionBoundaryOutput, QueryError>
707    where
708        E: PersistedRow<Canister = C> + EntityValue,
709    {
710        self.execute_load_query_with(query, move |load, plan| {
711            load.execute_scalar_projection_boundary(plan, target_field, request)
712        })
713    }
714
715    // Execute one fluent count/exists terminal through a query-owned result
716    // shape so fluent terminals do not import executor aggregate outputs.
717    pub(in crate::db) fn execute_fluent_existing_rows_terminal<E>(
718        &self,
719        query: &Query<E>,
720        strategy: PreparedFluentExistingRowsTerminalStrategy,
721    ) -> Result<FluentScalarTerminalOutput<E>, QueryError>
722    where
723        E: PersistedRow<Canister = C> + EntityValue,
724    {
725        match strategy.into_runtime_request() {
726            PreparedFluentExistingRowsTerminalRuntimeRequest::CountRows => self
727                .execute_scalar_terminal_boundary(query, ScalarTerminalBoundaryRequest::Count)?
728                .into_count()
729                .map(FluentScalarTerminalOutput::Count)
730                .map_err(QueryError::execute),
731            PreparedFluentExistingRowsTerminalRuntimeRequest::ExistsRows => self
732                .execute_scalar_terminal_boundary(query, ScalarTerminalBoundaryRequest::Exists)?
733                .into_exists()
734                .map(FluentScalarTerminalOutput::Exists)
735                .map_err(QueryError::execute),
736        }
737    }
738
739    // Execute one fluent id/extrema terminal through a query-owned result
740    // shape after the session adapter has decoded storage keys into typed ids.
741    pub(in crate::db) fn execute_fluent_scalar_terminal<E>(
742        &self,
743        query: &Query<E>,
744        strategy: PreparedFluentScalarTerminalStrategy,
745    ) -> Result<FluentScalarTerminalOutput<E>, QueryError>
746    where
747        E: PersistedRow<Canister = C> + EntityValue,
748    {
749        let request = match strategy.into_runtime_request() {
750            PreparedFluentScalarTerminalRuntimeRequest::IdTerminal { kind } => {
751                ScalarTerminalBoundaryRequest::IdTerminal { kind }
752            }
753            PreparedFluentScalarTerminalRuntimeRequest::IdBySlot { kind, target_field } => {
754                ScalarTerminalBoundaryRequest::IdBySlot { kind, target_field }
755            }
756        };
757
758        self.execute_scalar_terminal_boundary(query, request)?
759            .into_id::<E>()
760            .map(FluentScalarTerminalOutput::Id)
761            .map_err(QueryError::execute)
762    }
763
764    // Execute one fluent order-sensitive terminal through the session adapter.
765    // The min/max pair request remains distinguished because it returns two ids.
766    pub(in crate::db) fn execute_fluent_order_sensitive_terminal<E>(
767        &self,
768        query: &Query<E>,
769        strategy: PreparedFluentOrderSensitiveTerminalStrategy,
770    ) -> Result<FluentScalarTerminalOutput<E>, QueryError>
771    where
772        E: PersistedRow<Canister = C> + EntityValue,
773    {
774        match strategy.into_runtime_request() {
775            PreparedFluentOrderSensitiveTerminalRuntimeRequest::ResponseOrder { kind } => self
776                .execute_scalar_terminal_boundary(
777                    query,
778                    ScalarTerminalBoundaryRequest::IdTerminal { kind },
779                )?
780                .into_id::<E>()
781                .map(FluentScalarTerminalOutput::Id)
782                .map_err(QueryError::execute),
783            PreparedFluentOrderSensitiveTerminalRuntimeRequest::NthBySlot { target_field, nth } => {
784                self.execute_scalar_terminal_boundary(
785                    query,
786                    ScalarTerminalBoundaryRequest::NthBySlot { target_field, nth },
787                )?
788                .into_id::<E>()
789                .map(FluentScalarTerminalOutput::Id)
790                .map_err(QueryError::execute)
791            }
792            PreparedFluentOrderSensitiveTerminalRuntimeRequest::MedianBySlot { target_field } => {
793                self.execute_scalar_terminal_boundary(
794                    query,
795                    ScalarTerminalBoundaryRequest::MedianBySlot { target_field },
796                )?
797                .into_id::<E>()
798                .map(FluentScalarTerminalOutput::Id)
799                .map_err(QueryError::execute)
800            }
801            PreparedFluentOrderSensitiveTerminalRuntimeRequest::MinMaxBySlot { target_field } => {
802                self.execute_scalar_terminal_boundary(
803                    query,
804                    ScalarTerminalBoundaryRequest::MinMaxBySlot { target_field },
805                )?
806                .into_id_pair::<E>()
807                .map(FluentScalarTerminalOutput::IdPair)
808                .map_err(QueryError::execute)
809            }
810        }
811    }
812
813    // Execute one fluent numeric-field terminal through the session-owned
814    // request conversion layer.
815    pub(in crate::db) fn execute_fluent_numeric_field_terminal<E>(
816        &self,
817        query: &Query<E>,
818        strategy: PreparedFluentNumericFieldStrategy,
819    ) -> Result<Option<Decimal>, QueryError>
820    where
821        E: PersistedRow<Canister = C> + EntityValue,
822    {
823        let (target_field, runtime_request) = strategy.into_runtime_parts();
824        let request = match runtime_request {
825            PreparedFluentNumericFieldRuntimeRequest::Sum => ScalarNumericFieldBoundaryRequest::Sum,
826            PreparedFluentNumericFieldRuntimeRequest::SumDistinct => {
827                ScalarNumericFieldBoundaryRequest::SumDistinct
828            }
829            PreparedFluentNumericFieldRuntimeRequest::Avg => ScalarNumericFieldBoundaryRequest::Avg,
830            PreparedFluentNumericFieldRuntimeRequest::AvgDistinct => {
831                ScalarNumericFieldBoundaryRequest::AvgDistinct
832            }
833        };
834
835        self.execute_load_query_with(query, move |load, plan| {
836            load.execute_numeric_field_boundary(plan, target_field, request)
837        })
838    }
839
840    // Execute one fluent projection terminal through a query-owned output
841    // shape after the session adapter has decoded any data keys into typed ids.
842    pub(in crate::db) fn execute_fluent_projection_terminal<E>(
843        &self,
844        query: &Query<E>,
845        strategy: PreparedFluentProjectionStrategy,
846    ) -> Result<FluentProjectionTerminalOutput<E>, QueryError>
847    where
848        E: PersistedRow<Canister = C> + EntityValue,
849    {
850        let (target_field, runtime_request) = strategy.into_runtime_parts();
851
852        match runtime_request {
853            PreparedFluentProjectionRuntimeRequest::Values => self
854                .execute_scalar_projection_boundary(
855                    query,
856                    target_field,
857                    ScalarProjectionBoundaryRequest::Values,
858                )?
859                .into_values()
860                .map(FluentProjectionTerminalOutput::Values)
861                .map_err(QueryError::execute),
862            PreparedFluentProjectionRuntimeRequest::DistinctValues => self
863                .execute_scalar_projection_boundary(
864                    query,
865                    target_field,
866                    ScalarProjectionBoundaryRequest::DistinctValues,
867                )?
868                .into_values()
869                .map(FluentProjectionTerminalOutput::Values)
870                .map_err(QueryError::execute),
871            PreparedFluentProjectionRuntimeRequest::CountDistinct => self
872                .execute_scalar_projection_boundary(
873                    query,
874                    target_field,
875                    ScalarProjectionBoundaryRequest::CountDistinct,
876                )?
877                .into_count()
878                .map(FluentProjectionTerminalOutput::Count)
879                .map_err(QueryError::execute),
880            PreparedFluentProjectionRuntimeRequest::ValuesWithIds => self
881                .execute_scalar_projection_boundary(
882                    query,
883                    target_field,
884                    ScalarProjectionBoundaryRequest::ValuesWithIds,
885                )?
886                .into_values_with_ids::<E>()
887                .map(FluentProjectionTerminalOutput::ValuesWithIds)
888                .map_err(QueryError::execute),
889            PreparedFluentProjectionRuntimeRequest::TerminalValue { terminal_kind } => self
890                .execute_scalar_projection_boundary(
891                    query,
892                    target_field,
893                    ScalarProjectionBoundaryRequest::TerminalValue { terminal_kind },
894                )?
895                .into_terminal_value()
896                .map(FluentProjectionTerminalOutput::TerminalValue)
897                .map_err(QueryError::execute),
898        }
899    }
900
901    // Execute the fluent `bytes()` terminal without leaking `LoadExecutor`
902    // closure assembly into query fluent code.
903    pub(in crate::db) fn execute_fluent_bytes<E>(&self, query: &Query<E>) -> Result<u64, QueryError>
904    where
905        E: PersistedRow<Canister = C> + EntityValue,
906    {
907        self.execute_load_query_with(query, |load, plan| load.bytes(plan))
908    }
909
910    // Execute the fluent `bytes_by(field)` terminal at the session boundary.
911    pub(in crate::db) fn execute_fluent_bytes_by_slot<E>(
912        &self,
913        query: &Query<E>,
914        target_slot: FieldSlot,
915    ) -> Result<u64, QueryError>
916    where
917        E: PersistedRow<Canister = C> + EntityValue,
918    {
919        self.execute_load_query_with(query, move |load, plan| {
920            load.bytes_by_slot(plan, target_slot)
921        })
922    }
923
924    // Execute the fluent `take(k)` terminal at the session boundary.
925    pub(in crate::db) fn execute_fluent_take<E>(
926        &self,
927        query: &Query<E>,
928        take_count: u32,
929    ) -> Result<EntityResponse<E>, QueryError>
930    where
931        E: PersistedRow<Canister = C> + EntityValue,
932    {
933        self.execute_load_query_with(query, move |load, plan| load.take(plan, take_count))
934    }
935
936    // Execute one row-returning fluent top/bottom-k terminal at the session boundary.
937    pub(in crate::db) fn execute_fluent_ranked_rows_by_slot<E>(
938        &self,
939        query: &Query<E>,
940        target_slot: FieldSlot,
941        take_count: u32,
942        descending: bool,
943    ) -> Result<EntityResponse<E>, QueryError>
944    where
945        E: PersistedRow<Canister = C> + EntityValue,
946    {
947        self.execute_load_query_with(query, move |load, plan| {
948            if descending {
949                load.top_k_by_slot(plan, target_slot, take_count)
950            } else {
951                load.bottom_k_by_slot(plan, target_slot, take_count)
952            }
953        })
954    }
955
956    // Execute one value-returning fluent top/bottom-k terminal at the session boundary.
957    pub(in crate::db) fn execute_fluent_ranked_values_by_slot<E>(
958        &self,
959        query: &Query<E>,
960        target_slot: FieldSlot,
961        take_count: u32,
962        descending: bool,
963    ) -> Result<Vec<Value>, QueryError>
964    where
965        E: PersistedRow<Canister = C> + EntityValue,
966    {
967        self.execute_load_query_with(query, move |load, plan| {
968            if descending {
969                load.top_k_by_values_slot(plan, target_slot, take_count)
970            } else {
971                load.bottom_k_by_values_slot(plan, target_slot, take_count)
972            }
973        })
974    }
975
976    // Execute one id/value-returning fluent top/bottom-k terminal at the session boundary.
977    pub(in crate::db) fn execute_fluent_ranked_values_with_ids_by_slot<E>(
978        &self,
979        query: &Query<E>,
980        target_slot: FieldSlot,
981        take_count: u32,
982        descending: bool,
983    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
984    where
985        E: PersistedRow<Canister = C> + EntityValue,
986    {
987        self.execute_load_query_with(query, move |load, plan| {
988            if descending {
989                load.top_k_by_with_ids_slot(plan, target_slot, take_count)
990            } else {
991                load.bottom_k_by_with_ids_slot(plan, target_slot, take_count)
992            }
993        })
994    }
995
996    /// Build one trace payload for a query without executing it.
997    ///
998    /// This lightweight surface is intended for developer diagnostics:
999    /// plan hash, access strategy summary, and planner/executor route shape.
1000    pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
1001    where
1002        E: EntityKind<Canister = C>,
1003    {
1004        let visibility = self.query_plan_visibility_for_store_path(E::Store::PATH)?;
1005        let visible_indexes = Self::visible_indexes_for_model(E::MODEL, visibility);
1006        let (prepared_plan, cache_attribution) =
1007            self.cached_prepared_query_plan_for_entity::<E>(query)?;
1008        let logical_plan = prepared_plan.logical_plan();
1009        let explain = logical_plan.explain();
1010        let plan_hash = query.plan_hash_hex_with_visible_indexes(&visible_indexes)?;
1011        let executable_access = prepared_plan.access().executable_contract();
1012        let access_strategy = summarize_executable_access_plan(&executable_access);
1013        let execution_family = match query.mode() {
1014            QueryMode::Load(_) => Some(trace_execution_family_from_executor(
1015                prepared_plan
1016                    .execution_family()
1017                    .map_err(QueryError::execute)?,
1018            )),
1019            QueryMode::Delete(_) => None,
1020        };
1021        let reuse = query_plan_cache_reuse_event(cache_attribution);
1022
1023        Ok(QueryTracePlan::new(
1024            plan_hash,
1025            access_strategy,
1026            execution_family,
1027            reuse,
1028            explain,
1029        ))
1030    }
1031
1032    /// Execute one scalar paged load query and return optional continuation cursor plus trace.
1033    pub(crate) fn execute_load_query_paged_with_trace<E>(
1034        &self,
1035        query: &Query<E>,
1036        cursor_token: Option<&str>,
1037    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
1038    where
1039        E: PersistedRow<Canister = C> + EntityValue,
1040    {
1041        // Phase 1: build/validate prepared execution plan and reject grouped plans.
1042        let plan = self.cached_prepared_query_plan_for_entity::<E>(query)?.0;
1043        Self::ensure_scalar_paged_execution_family(
1044            plan.execution_family().map_err(QueryError::execute)?,
1045        )?;
1046
1047        // Phase 2: decode external cursor token and validate it against plan surface.
1048        let cursor_bytes = decode_optional_cursor_token(cursor_token)
1049            .map_err(QueryError::from_cursor_plan_error)?;
1050        let cursor = plan
1051            .prepare_cursor(cursor_bytes.as_deref())
1052            .map_err(query_error_from_executor_plan_error)?;
1053
1054        // Phase 3: execute one traced page and encode outbound continuation token.
1055        let (page, trace) = self
1056            .with_metrics(|| {
1057                self.load_executor::<E>()
1058                    .execute_paged_with_cursor_traced(plan, cursor)
1059            })
1060            .map_err(QueryError::execute)?;
1061        finalize_scalar_paged_execution(page, trace)
1062    }
1063
1064    /// Execute one grouped query page with optional grouped continuation cursor.
1065    ///
1066    /// This is the explicit grouped execution boundary; scalar load APIs reject
1067    /// grouped plans to preserve scalar response contracts.
1068    pub(in crate::db) fn execute_grouped<E>(
1069        &self,
1070        query: &Query<E>,
1071        cursor_token: Option<&str>,
1072    ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
1073    where
1074        E: PersistedRow<Canister = C> + EntityValue,
1075    {
1076        // Phase 1: build the prepared execution plan once from the typed query.
1077        let plan = self.cached_prepared_query_plan_for_entity::<E>(query)?.0;
1078
1079        // Phase 2: reuse the shared prepared grouped execution path and then
1080        // finalize the outward grouped payload at the session boundary.
1081        let (page, trace) = self.execute_grouped_plan_with_trace(plan, cursor_token)?;
1082
1083        Self::finalize_grouped_execution_page(page, trace)
1084    }
1085
1086    // Execute one grouped prepared plan page with optional grouped cursor
1087    // while letting the caller choose the final grouped-runtime dispatch.
1088    fn execute_grouped_plan_with<E, T>(
1089        &self,
1090        plan: PreparedExecutionPlan<E>,
1091        cursor_token: Option<&str>,
1092        op: impl FnOnce(
1093            LoadExecutor<E>,
1094            PreparedExecutionPlan<E>,
1095            crate::db::cursor::GroupedPlannedCursor,
1096        ) -> Result<T, InternalError>,
1097    ) -> Result<T, QueryError>
1098    where
1099        E: PersistedRow<Canister = C> + EntityValue,
1100    {
1101        // Phase 1: validate the prepared plan shape before decoding cursors.
1102        Self::ensure_grouped_execution_family(
1103            plan.execution_family().map_err(QueryError::execute)?,
1104        )?;
1105
1106        // Phase 2: decode external grouped cursor token and validate against plan.
1107        let cursor = decode_optional_grouped_cursor_token(cursor_token)
1108            .map_err(QueryError::from_cursor_plan_error)?;
1109        let cursor = plan
1110            .prepare_grouped_cursor_token(cursor)
1111            .map_err(query_error_from_executor_plan_error)?;
1112
1113        // Phase 3: execute one grouped page while preserving the structural
1114        // grouped cursor payload for whichever outward cursor format the caller needs.
1115        self.with_metrics(|| op(self.load_executor::<E>(), plan, cursor))
1116            .map_err(QueryError::execute)
1117    }
1118
1119    // Execute one grouped prepared plan page with optional grouped cursor.
1120    fn execute_grouped_plan_with_trace<E>(
1121        &self,
1122        plan: PreparedExecutionPlan<E>,
1123        cursor_token: Option<&str>,
1124    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
1125    where
1126        E: PersistedRow<Canister = C> + EntityValue,
1127    {
1128        self.execute_grouped_plan_with(plan, cursor_token, |executor, plan, cursor| {
1129            executor.execute_grouped_paged_with_cursor_traced(plan, cursor)
1130        })
1131    }
1132}