Skip to main content

icydb_core/db/session/
query.rs

1//! Module: db::session::query
2//! Responsibility: session-bound query planning, explain, and cursor execution
3//! helpers that recover store visibility before delegating to query-owned logic.
4//! Does not own: query intent construction or executor runtime semantics.
5//! Boundary: resolves session visibility and cursor policy before handing work to the planner/executor.
6
7mod cache;
8
9#[cfg(feature = "diagnostics")]
10use crate::db::executor::{
11    GroupedCountAttribution, GroupedExecutePhaseAttribution, ScalarExecutePhaseAttribution,
12};
13use crate::{
14    db::{
15        DbSession, EntityResponse, LoadQueryResult, PagedGroupedExecutionWithTrace,
16        PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
17        TraceExecutionFamily,
18        access::summarize_executable_access_plan,
19        cursor::{
20            CursorPlanError, decode_optional_cursor_token, decode_optional_grouped_cursor_token,
21        },
22        diagnostics::ExecutionTrace,
23        executor::{
24            ExecutionFamily, ExecutorPlanError, GroupedCursorPage, LoadExecutor,
25            PreparedExecutionPlan, ScalarNumericFieldBoundaryRequest,
26            ScalarProjectionBoundaryOutput, ScalarProjectionBoundaryRequest,
27            ScalarTerminalBoundaryOutput, ScalarTerminalBoundaryRequest,
28        },
29        query::builder::{
30            PreparedFluentAggregateExplainStrategy,
31            PreparedFluentExistingRowsTerminalRuntimeRequest,
32            PreparedFluentExistingRowsTerminalStrategy, PreparedFluentNumericFieldRuntimeRequest,
33            PreparedFluentNumericFieldStrategy, PreparedFluentOrderSensitiveTerminalRuntimeRequest,
34            PreparedFluentOrderSensitiveTerminalStrategy, PreparedFluentProjectionRuntimeRequest,
35            PreparedFluentProjectionStrategy, PreparedFluentScalarTerminalRuntimeRequest,
36            PreparedFluentScalarTerminalStrategy,
37        },
38        query::explain::{
39            ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
40        },
41        query::fluent::load::{FluentProjectionTerminalOutput, FluentScalarTerminalOutput},
42        query::{
43            intent::{CompiledQuery, PlannedQuery},
44            plan::{FieldSlot, QueryMode},
45        },
46        session::{finalize_grouped_paged_execution, finalize_scalar_paged_execution},
47    },
48    error::InternalError,
49    traits::{CanisterKind, EntityKind, EntityValue, Path},
50    types::{Decimal, Id},
51    value::Value,
52};
53pub(in crate::db) use cache::QueryPlanCacheAttribution;
54#[cfg(test)]
55pub(in crate::db) use cache::QueryPlanVisibility;
56pub(in crate::db::session) use cache::query_plan_cache_reuse_event;
57#[cfg(feature = "diagnostics")]
58use candid::CandidType;
59#[cfg(feature = "diagnostics")]
60use serde::Deserialize;
61
62// Translate executor route-family selection into the query-owned trace label
63// at the session boundary so trace DTOs do not depend on executor types.
64const fn trace_execution_family_from_executor(family: ExecutionFamily) -> TraceExecutionFamily {
65    match family {
66        ExecutionFamily::PrimaryKey => TraceExecutionFamily::PrimaryKey,
67        ExecutionFamily::Ordered => TraceExecutionFamily::Ordered,
68        ExecutionFamily::Grouped => TraceExecutionFamily::Grouped,
69    }
70}
71
72// Convert executor plan-surface failures at the session boundary so query error
73// types do not import executor-owned error enums.
74pub(in crate::db::session) fn query_error_from_executor_plan_error(
75    err: ExecutorPlanError,
76) -> QueryError {
77    match err {
78        ExecutorPlanError::Cursor(err) => QueryError::from_cursor_plan_error(*err),
79    }
80}
81
82///
83/// QueryExecutionAttribution
84///
85/// QueryExecutionAttribution records the top-level compile/execute split for
86/// typed/fluent query execution at the session boundary.
87///
88#[cfg(feature = "diagnostics")]
89#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
90pub struct QueryExecutionAttribution {
91    pub compile_local_instructions: u64,
92    pub plan_lookup_local_instructions: u64,
93    pub executor_invocation_local_instructions: u64,
94    pub response_finalization_local_instructions: u64,
95    pub runtime_local_instructions: u64,
96    pub finalize_local_instructions: u64,
97    pub direct_data_row_scan_local_instructions: u64,
98    pub direct_data_row_key_stream_local_instructions: u64,
99    pub direct_data_row_row_read_local_instructions: u64,
100    pub direct_data_row_key_encode_local_instructions: u64,
101    pub direct_data_row_store_get_local_instructions: u64,
102    pub direct_data_row_order_window_local_instructions: u64,
103    pub direct_data_row_page_window_local_instructions: u64,
104    pub grouped_stream_local_instructions: u64,
105    pub grouped_fold_local_instructions: u64,
106    pub grouped_finalize_local_instructions: u64,
107    pub grouped_count_borrowed_hash_computations: u64,
108    pub grouped_count_bucket_candidate_checks: u64,
109    pub grouped_count_existing_group_hits: u64,
110    pub grouped_count_new_group_inserts: u64,
111    pub grouped_count_row_materialization_local_instructions: u64,
112    pub grouped_count_group_lookup_local_instructions: u64,
113    pub grouped_count_existing_group_update_local_instructions: u64,
114    pub grouped_count_new_group_insert_local_instructions: u64,
115    pub response_decode_local_instructions: u64,
116    pub execute_local_instructions: u64,
117    pub total_local_instructions: u64,
118    pub shared_query_plan_cache_hits: u64,
119    pub shared_query_plan_cache_misses: u64,
120}
121
122#[cfg(feature = "diagnostics")]
123#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
124struct QueryExecutePhaseAttribution {
125    executor_invocation_local_instructions: u64,
126    response_finalization_local_instructions: u64,
127    runtime_local_instructions: u64,
128    finalize_local_instructions: u64,
129    direct_data_row_scan_local_instructions: u64,
130    direct_data_row_key_stream_local_instructions: u64,
131    direct_data_row_row_read_local_instructions: u64,
132    direct_data_row_key_encode_local_instructions: u64,
133    direct_data_row_store_get_local_instructions: u64,
134    direct_data_row_order_window_local_instructions: u64,
135    direct_data_row_page_window_local_instructions: u64,
136    grouped_stream_local_instructions: u64,
137    grouped_fold_local_instructions: u64,
138    grouped_finalize_local_instructions: u64,
139    grouped_count: GroupedCountAttribution,
140}
141
142#[cfg(feature = "diagnostics")]
143#[expect(
144    clippy::missing_const_for_fn,
145    reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
146)]
147fn read_query_local_instruction_counter() -> u64 {
148    #[cfg(target_arch = "wasm32")]
149    {
150        canic_cdk::api::performance_counter(1)
151    }
152
153    #[cfg(not(target_arch = "wasm32"))]
154    {
155        0
156    }
157}
158
159#[cfg(feature = "diagnostics")]
160fn measure_query_stage<T, E>(run: impl FnOnce() -> Result<T, E>) -> (u64, Result<T, E>) {
161    let start = read_query_local_instruction_counter();
162    let result = run();
163    let delta = read_query_local_instruction_counter().saturating_sub(start);
164
165    (delta, result)
166}
167
168impl<C: CanisterKind> DbSession<C> {
169    #[cfg(feature = "diagnostics")]
170    const fn empty_query_execute_phase_attribution() -> QueryExecutePhaseAttribution {
171        QueryExecutePhaseAttribution {
172            executor_invocation_local_instructions: 0,
173            response_finalization_local_instructions: 0,
174            runtime_local_instructions: 0,
175            finalize_local_instructions: 0,
176            direct_data_row_scan_local_instructions: 0,
177            direct_data_row_key_stream_local_instructions: 0,
178            direct_data_row_row_read_local_instructions: 0,
179            direct_data_row_key_encode_local_instructions: 0,
180            direct_data_row_store_get_local_instructions: 0,
181            direct_data_row_order_window_local_instructions: 0,
182            direct_data_row_page_window_local_instructions: 0,
183            grouped_stream_local_instructions: 0,
184            grouped_fold_local_instructions: 0,
185            grouped_finalize_local_instructions: 0,
186            grouped_count: GroupedCountAttribution::none(),
187        }
188    }
189
190    #[cfg(feature = "diagnostics")]
191    const fn scalar_query_execute_phase_attribution(
192        phase: ScalarExecutePhaseAttribution,
193        executor_invocation_local_instructions: u64,
194    ) -> QueryExecutePhaseAttribution {
195        QueryExecutePhaseAttribution {
196            executor_invocation_local_instructions,
197            response_finalization_local_instructions: 0,
198            runtime_local_instructions: phase.runtime_local_instructions,
199            finalize_local_instructions: phase.finalize_local_instructions,
200            direct_data_row_scan_local_instructions: phase.direct_data_row_scan_local_instructions,
201            direct_data_row_key_stream_local_instructions: phase
202                .direct_data_row_key_stream_local_instructions,
203            direct_data_row_row_read_local_instructions: phase
204                .direct_data_row_row_read_local_instructions,
205            direct_data_row_key_encode_local_instructions: phase
206                .direct_data_row_key_encode_local_instructions,
207            direct_data_row_store_get_local_instructions: phase
208                .direct_data_row_store_get_local_instructions,
209            direct_data_row_order_window_local_instructions: phase
210                .direct_data_row_order_window_local_instructions,
211            direct_data_row_page_window_local_instructions: phase
212                .direct_data_row_page_window_local_instructions,
213            grouped_stream_local_instructions: 0,
214            grouped_fold_local_instructions: 0,
215            grouped_finalize_local_instructions: 0,
216            grouped_count: GroupedCountAttribution::none(),
217        }
218    }
219
220    #[cfg(feature = "diagnostics")]
221    const fn grouped_query_execute_phase_attribution(
222        phase: GroupedExecutePhaseAttribution,
223        executor_invocation_local_instructions: u64,
224        response_finalization_local_instructions: u64,
225    ) -> QueryExecutePhaseAttribution {
226        QueryExecutePhaseAttribution {
227            executor_invocation_local_instructions,
228            response_finalization_local_instructions,
229            runtime_local_instructions: phase
230                .stream_local_instructions
231                .saturating_add(phase.fold_local_instructions),
232            finalize_local_instructions: phase.finalize_local_instructions,
233            direct_data_row_scan_local_instructions: 0,
234            direct_data_row_key_stream_local_instructions: 0,
235            direct_data_row_row_read_local_instructions: 0,
236            direct_data_row_key_encode_local_instructions: 0,
237            direct_data_row_store_get_local_instructions: 0,
238            direct_data_row_order_window_local_instructions: 0,
239            direct_data_row_page_window_local_instructions: 0,
240            grouped_stream_local_instructions: phase.stream_local_instructions,
241            grouped_fold_local_instructions: phase.fold_local_instructions,
242            grouped_finalize_local_instructions: phase.finalize_local_instructions,
243            grouped_count: phase.grouped_count,
244        }
245    }
246
247    // Compile one typed query using only the indexes currently visible for the
248    // query's recovered store.
249    pub(in crate::db) fn compile_query_with_visible_indexes<E>(
250        &self,
251        query: &Query<E>,
252    ) -> Result<CompiledQuery<E>, QueryError>
253    where
254        E: EntityKind<Canister = C>,
255    {
256        self.map_cached_shared_query_plan_for_entity(query, CompiledQuery::<E>::from_plan)
257    }
258
259    // Build one logical planned-query shell using only the indexes currently
260    // visible for the query's recovered store.
261    pub(in crate::db) fn planned_query_with_visible_indexes<E>(
262        &self,
263        query: &Query<E>,
264    ) -> Result<PlannedQuery<E>, QueryError>
265    where
266        E: EntityKind<Canister = C>,
267    {
268        self.map_cached_shared_query_plan_for_entity(query, PlannedQuery::<E>::from_plan)
269    }
270
271    // Project one logical explain payload using only planner-visible indexes.
272    pub(in crate::db) fn explain_query_with_visible_indexes<E>(
273        &self,
274        query: &Query<E>,
275    ) -> Result<ExplainPlan, QueryError>
276    where
277        E: EntityKind<Canister = C>,
278    {
279        self.with_query_visible_indexes(query, Query::<E>::explain_with_visible_indexes)
280    }
281
282    // Hash one typed query plan using only the indexes currently visible for
283    // the query's recovered store.
284    pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
285        &self,
286        query: &Query<E>,
287    ) -> Result<String, QueryError>
288    where
289        E: EntityKind<Canister = C>,
290    {
291        self.with_query_visible_indexes(query, Query::<E>::plan_hash_hex_with_visible_indexes)
292    }
293
294    // Explain one load execution shape using only planner-visible
295    // indexes from the recovered store state.
296    pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
297        &self,
298        query: &Query<E>,
299    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
300    where
301        E: EntityValue + EntityKind<Canister = C>,
302    {
303        self.with_query_visible_indexes(query, Query::<E>::explain_execution_with_visible_indexes)
304    }
305
306    // Render one load execution descriptor plus route diagnostics using
307    // only planner-visible indexes from the recovered store state.
308    pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
309        &self,
310        query: &Query<E>,
311    ) -> Result<String, QueryError>
312    where
313        E: EntityValue + EntityKind<Canister = C>,
314    {
315        self.with_query_visible_indexes(query, |query, visible_indexes| {
316            let (prepared_plan, cache_attribution) =
317                self.cached_prepared_query_plan_for_entity(query)?;
318            let mut plan = prepared_plan.logical_plan().clone();
319
320            // Freeze the same planner-owned explain access-choice snapshot used
321            // by the direct non-cached explain path before rendering verbose
322            // diagnostics from the reused logical plan.
323            plan.finalize_access_choice_for_model_with_indexes(
324                query.structural().model(),
325                visible_indexes.as_slice(),
326            );
327
328            query
329                .structural()
330                .finalized_execution_diagnostics_from_plan_with_descriptor_mutator(
331                    &plan,
332                    Some(query_plan_cache_reuse_event(cache_attribution)),
333                    |_| {},
334                )
335                .map(|diagnostics| diagnostics.render_text_verbose())
336        })
337    }
338
339    // Explain one prepared fluent aggregate terminal using only
340    // planner-visible indexes from the recovered store state.
341    pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
342        &self,
343        query: &Query<E>,
344        strategy: &S,
345    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
346    where
347        E: EntityValue + EntityKind<Canister = C>,
348        S: PreparedFluentAggregateExplainStrategy,
349    {
350        self.with_query_visible_indexes(query, |query, visible_indexes| {
351            query
352                .explain_prepared_aggregate_terminal_with_visible_indexes(visible_indexes, strategy)
353        })
354    }
355
356    // Explain one `bytes_by(field)` terminal using only planner-visible
357    // indexes from the recovered store state.
358    pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
359        &self,
360        query: &Query<E>,
361        target_field: &str,
362    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
363    where
364        E: EntityValue + EntityKind<Canister = C>,
365    {
366        self.with_query_visible_indexes(query, |query, visible_indexes| {
367            query.explain_bytes_by_with_visible_indexes(visible_indexes, target_field)
368        })
369    }
370
371    // Explain one prepared fluent projection terminal using only
372    // planner-visible indexes from the recovered store state.
373    pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
374        &self,
375        query: &Query<E>,
376        strategy: &PreparedFluentProjectionStrategy,
377    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
378    where
379        E: EntityValue + EntityKind<Canister = C>,
380    {
381        self.with_query_visible_indexes(query, |query, visible_indexes| {
382            query.explain_prepared_projection_terminal_with_visible_indexes(
383                visible_indexes,
384                strategy,
385            )
386        })
387    }
388
389    // Validate that one execution strategy is admissible for scalar paged load
390    // execution and fail closed on grouped/primary-key-only routes.
391    fn ensure_scalar_paged_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
392        match family {
393            ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
394                CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
395            )),
396            ExecutionFamily::Ordered => Ok(()),
397            ExecutionFamily::Grouped => Err(QueryError::invariant(
398                "grouped queries execute via execute(), not page().execute()",
399            )),
400        }
401    }
402
403    // Validate that one execution strategy is admissible for the grouped
404    // execution surface.
405    fn ensure_grouped_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
406        match family {
407            ExecutionFamily::Grouped => Ok(()),
408            ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
409                "grouped execution requires grouped logical plans",
410            )),
411        }
412    }
413
414    // Finalize one grouped cursor page into the outward grouped execution
415    // payload so grouped cursor encoding and continuation-shape validation
416    // stay owned by the session boundary.
417    fn finalize_grouped_execution_page(
418        page: GroupedCursorPage,
419        trace: Option<ExecutionTrace>,
420    ) -> Result<PagedGroupedExecutionWithTrace, QueryError> {
421        finalize_grouped_paged_execution(page, trace)
422    }
423
424    /// Execute one scalar load/delete query and return materialized response rows.
425    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
426    where
427        E: PersistedRow<Canister = C> + EntityValue,
428    {
429        // Phase 1: compile typed intent into one prepared execution-plan contract.
430        let mode = query.mode();
431        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
432
433        // Phase 2: delegate execution to the shared compiled-plan entry path.
434        self.execute_query_dyn(mode, plan)
435    }
436
437    /// Execute one typed query while reporting the compile/execute split at
438    /// the shared fluent query seam.
439    #[cfg(feature = "diagnostics")]
440    #[doc(hidden)]
441    #[expect(
442        clippy::too_many_lines,
443        reason = "the diagnostics-only attribution path keeps grouped and scalar execution on one explicit compile/execute accounting seam"
444    )]
445    pub fn execute_query_result_with_attribution<E>(
446        &self,
447        query: &Query<E>,
448    ) -> Result<(LoadQueryResult<E>, QueryExecutionAttribution), QueryError>
449    where
450        E: PersistedRow<Canister = C> + EntityValue,
451    {
452        // Phase 1: measure compile work at the typed/fluent boundary,
453        // including the shared lower query-plan cache lookup/build exactly
454        // once. This preserves honest hit/miss attribution without
455        // double-building plans on one-shot cache misses.
456        let (plan_lookup_local_instructions, plan_and_cache) =
457            measure_query_stage(|| self.cached_prepared_query_plan_for_entity::<E>(query));
458        let (plan, cache_attribution) = plan_and_cache?;
459        let compile_local_instructions = plan_lookup_local_instructions;
460
461        // Phase 2: execute one query result using the prepared plan produced
462        // by the compile/cache boundary above.
463        let result =
464            || -> Result<(LoadQueryResult<E>, QueryExecutePhaseAttribution, u64), QueryError> {
465                if query.has_grouping() {
466                    let (executor_invocation_local_instructions, grouped_page) =
467                        measure_query_stage(|| {
468                            self.execute_grouped_plan_with(plan, None, |executor, plan, cursor| {
469                                executor
470                                    .execute_grouped_paged_with_cursor_traced_with_phase_attribution(
471                                        plan, cursor,
472                                    )
473                            })
474                        });
475                    let (page, trace, phase_attribution) = grouped_page?;
476                    let (response_finalization_local_instructions, grouped) =
477                        measure_query_stage(|| Self::finalize_grouped_execution_page(page, trace));
478                    let grouped = grouped?;
479
480                    Ok((
481                        LoadQueryResult::Grouped(grouped),
482                        Self::grouped_query_execute_phase_attribution(
483                            phase_attribution,
484                            executor_invocation_local_instructions,
485                            response_finalization_local_instructions,
486                        ),
487                        0,
488                    ))
489                } else {
490                    match query.mode() {
491                        QueryMode::Load(_) => {
492                            let (executor_invocation_local_instructions, executed) =
493                                measure_query_stage(|| {
494                                    self.load_executor::<E>()
495                                        .execute_with_phase_attribution(plan)
496                                        .map_err(QueryError::execute)
497                                });
498                            let (rows, phase_attribution, response_decode_local_instructions) =
499                                executed?;
500
501                            Ok((
502                                LoadQueryResult::Rows(rows),
503                                Self::scalar_query_execute_phase_attribution(
504                                    phase_attribution,
505                                    executor_invocation_local_instructions,
506                                ),
507                                response_decode_local_instructions,
508                            ))
509                        }
510                        QueryMode::Delete(_) => {
511                            let (executor_invocation_local_instructions, result) =
512                                measure_query_stage(|| self.execute_query_dyn(query.mode(), plan));
513                            let result = result?;
514
515                            Ok((
516                                LoadQueryResult::Rows(result),
517                                QueryExecutePhaseAttribution {
518                                    executor_invocation_local_instructions,
519                                    ..Self::empty_query_execute_phase_attribution()
520                                },
521                                0,
522                            ))
523                        }
524                    }
525                }
526            }();
527        let (result, execute_phase_attribution, response_decode_local_instructions) = result?;
528        let execute_local_instructions = execute_phase_attribution
529            .executor_invocation_local_instructions
530            .saturating_add(execute_phase_attribution.response_finalization_local_instructions);
531        let total_local_instructions =
532            compile_local_instructions.saturating_add(execute_local_instructions);
533
534        Ok((
535            result,
536            QueryExecutionAttribution {
537                compile_local_instructions,
538                plan_lookup_local_instructions,
539                executor_invocation_local_instructions: execute_phase_attribution
540                    .executor_invocation_local_instructions,
541                response_finalization_local_instructions: execute_phase_attribution
542                    .response_finalization_local_instructions,
543                runtime_local_instructions: execute_phase_attribution.runtime_local_instructions,
544                finalize_local_instructions: execute_phase_attribution.finalize_local_instructions,
545                direct_data_row_scan_local_instructions: execute_phase_attribution
546                    .direct_data_row_scan_local_instructions,
547                direct_data_row_key_stream_local_instructions: execute_phase_attribution
548                    .direct_data_row_key_stream_local_instructions,
549                direct_data_row_row_read_local_instructions: execute_phase_attribution
550                    .direct_data_row_row_read_local_instructions,
551                direct_data_row_key_encode_local_instructions: execute_phase_attribution
552                    .direct_data_row_key_encode_local_instructions,
553                direct_data_row_store_get_local_instructions: execute_phase_attribution
554                    .direct_data_row_store_get_local_instructions,
555                direct_data_row_order_window_local_instructions: execute_phase_attribution
556                    .direct_data_row_order_window_local_instructions,
557                direct_data_row_page_window_local_instructions: execute_phase_attribution
558                    .direct_data_row_page_window_local_instructions,
559                grouped_stream_local_instructions: execute_phase_attribution
560                    .grouped_stream_local_instructions,
561                grouped_fold_local_instructions: execute_phase_attribution
562                    .grouped_fold_local_instructions,
563                grouped_finalize_local_instructions: execute_phase_attribution
564                    .grouped_finalize_local_instructions,
565                grouped_count_borrowed_hash_computations: execute_phase_attribution
566                    .grouped_count
567                    .borrowed_hash_computations,
568                grouped_count_bucket_candidate_checks: execute_phase_attribution
569                    .grouped_count
570                    .bucket_candidate_checks,
571                grouped_count_existing_group_hits: execute_phase_attribution
572                    .grouped_count
573                    .existing_group_hits,
574                grouped_count_new_group_inserts: execute_phase_attribution
575                    .grouped_count
576                    .new_group_inserts,
577                grouped_count_row_materialization_local_instructions: execute_phase_attribution
578                    .grouped_count
579                    .row_materialization_local_instructions,
580                grouped_count_group_lookup_local_instructions: execute_phase_attribution
581                    .grouped_count
582                    .group_lookup_local_instructions,
583                grouped_count_existing_group_update_local_instructions: execute_phase_attribution
584                    .grouped_count
585                    .existing_group_update_local_instructions,
586                grouped_count_new_group_insert_local_instructions: execute_phase_attribution
587                    .grouped_count
588                    .new_group_insert_local_instructions,
589                response_decode_local_instructions,
590                execute_local_instructions,
591                total_local_instructions,
592                shared_query_plan_cache_hits: cache_attribution.hits,
593                shared_query_plan_cache_misses: cache_attribution.misses,
594            },
595        ))
596    }
597
598    // Execute one typed query through the unified row/grouped result surface so
599    // higher layers do not need to branch on grouped shape themselves.
600    #[doc(hidden)]
601    pub fn execute_query_result<E>(
602        &self,
603        query: &Query<E>,
604    ) -> Result<LoadQueryResult<E>, QueryError>
605    where
606        E: PersistedRow<Canister = C> + EntityValue,
607    {
608        if query.has_grouping() {
609            return self
610                .execute_grouped(query, None)
611                .map(LoadQueryResult::Grouped);
612        }
613
614        self.execute_query(query).map(LoadQueryResult::Rows)
615    }
616
617    /// Execute one typed delete query and return only the affected-row count.
618    #[doc(hidden)]
619    pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
620    where
621        E: PersistedRow<Canister = C> + EntityValue,
622    {
623        // Phase 1: fail closed if the caller routes a non-delete query here.
624        if !query.mode().is_delete() {
625            return Err(QueryError::unsupported_query(
626                "delete count execution requires delete query mode",
627            ));
628        }
629
630        // Phase 2: resolve one cached prepared execution-plan contract directly
631        // from the shared lower boundary instead of rebuilding it through the
632        // typed compiled-query wrapper.
633        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
634
635        // Phase 3: execute the shared delete core while skipping response-row materialization.
636        self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
637            .map_err(QueryError::execute)
638    }
639
640    /// Execute one scalar query from one pre-built prepared execution contract.
641    ///
642    /// This is the shared compiled-plan entry boundary used by the typed
643    /// `execute_query(...)` surface and adjacent query execution facades.
644    pub(in crate::db) fn execute_query_dyn<E>(
645        &self,
646        mode: QueryMode,
647        plan: PreparedExecutionPlan<E>,
648    ) -> Result<EntityResponse<E>, QueryError>
649    where
650        E: PersistedRow<Canister = C> + EntityValue,
651    {
652        let result = match mode {
653            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
654            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
655        };
656
657        result.map_err(QueryError::execute)
658    }
659
660    // Shared load-query terminal wrapper: build plan, run under metrics, map
661    // execution errors into query-facing errors.
662    pub(in crate::db) fn execute_load_query_with<E, T>(
663        &self,
664        query: &Query<E>,
665        op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
666    ) -> Result<T, QueryError>
667    where
668        E: PersistedRow<Canister = C> + EntityValue,
669    {
670        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
671
672        self.with_metrics(|| op(self.load_executor::<E>(), plan))
673            .map_err(QueryError::execute)
674    }
675
676    // Execute one scalar terminal boundary and keep the executor-specific
677    // request/output types contained inside the session adapter.
678    fn execute_scalar_terminal_boundary<E>(
679        &self,
680        query: &Query<E>,
681        request: ScalarTerminalBoundaryRequest,
682    ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
683    where
684        E: PersistedRow<Canister = C> + EntityValue,
685    {
686        self.execute_load_query_with(query, move |load, plan| {
687            load.execute_scalar_terminal_request(plan, request)
688        })
689    }
690
691    // Execute one projection terminal boundary and keep field projection
692    // executor details out of fluent query modules.
693    fn execute_scalar_projection_boundary<E>(
694        &self,
695        query: &Query<E>,
696        target_field: FieldSlot,
697        request: ScalarProjectionBoundaryRequest,
698    ) -> Result<ScalarProjectionBoundaryOutput, QueryError>
699    where
700        E: PersistedRow<Canister = C> + EntityValue,
701    {
702        self.execute_load_query_with(query, move |load, plan| {
703            load.execute_scalar_projection_boundary(plan, target_field, request)
704        })
705    }
706
707    // Execute one fluent count/exists terminal through a query-owned result
708    // shape so fluent terminals do not import executor aggregate outputs.
709    pub(in crate::db) fn execute_fluent_existing_rows_terminal<E>(
710        &self,
711        query: &Query<E>,
712        strategy: PreparedFluentExistingRowsTerminalStrategy,
713    ) -> Result<FluentScalarTerminalOutput<E>, QueryError>
714    where
715        E: PersistedRow<Canister = C> + EntityValue,
716    {
717        match strategy.into_runtime_request() {
718            PreparedFluentExistingRowsTerminalRuntimeRequest::CountRows => self
719                .execute_scalar_terminal_boundary(query, ScalarTerminalBoundaryRequest::Count)?
720                .into_count()
721                .map(FluentScalarTerminalOutput::Count)
722                .map_err(QueryError::execute),
723            PreparedFluentExistingRowsTerminalRuntimeRequest::ExistsRows => self
724                .execute_scalar_terminal_boundary(query, ScalarTerminalBoundaryRequest::Exists)?
725                .into_exists()
726                .map(FluentScalarTerminalOutput::Exists)
727                .map_err(QueryError::execute),
728        }
729    }
730
731    // Execute one fluent id/extrema terminal through a query-owned result
732    // shape after the session adapter has decoded storage keys into typed ids.
733    pub(in crate::db) fn execute_fluent_scalar_terminal<E>(
734        &self,
735        query: &Query<E>,
736        strategy: PreparedFluentScalarTerminalStrategy,
737    ) -> Result<FluentScalarTerminalOutput<E>, QueryError>
738    where
739        E: PersistedRow<Canister = C> + EntityValue,
740    {
741        let request = match strategy.into_runtime_request() {
742            PreparedFluentScalarTerminalRuntimeRequest::IdTerminal { kind } => {
743                ScalarTerminalBoundaryRequest::IdTerminal { kind }
744            }
745            PreparedFluentScalarTerminalRuntimeRequest::IdBySlot { kind, target_field } => {
746                ScalarTerminalBoundaryRequest::IdBySlot { kind, target_field }
747            }
748        };
749
750        self.execute_scalar_terminal_boundary(query, request)?
751            .into_id::<E>()
752            .map(FluentScalarTerminalOutput::Id)
753            .map_err(QueryError::execute)
754    }
755
756    // Execute one fluent order-sensitive terminal through the session adapter.
757    // The min/max pair request remains distinguished because it returns two ids.
758    pub(in crate::db) fn execute_fluent_order_sensitive_terminal<E>(
759        &self,
760        query: &Query<E>,
761        strategy: PreparedFluentOrderSensitiveTerminalStrategy,
762    ) -> Result<FluentScalarTerminalOutput<E>, QueryError>
763    where
764        E: PersistedRow<Canister = C> + EntityValue,
765    {
766        match strategy.into_runtime_request() {
767            PreparedFluentOrderSensitiveTerminalRuntimeRequest::ResponseOrder { kind } => self
768                .execute_scalar_terminal_boundary(
769                    query,
770                    ScalarTerminalBoundaryRequest::IdTerminal { kind },
771                )?
772                .into_id::<E>()
773                .map(FluentScalarTerminalOutput::Id)
774                .map_err(QueryError::execute),
775            PreparedFluentOrderSensitiveTerminalRuntimeRequest::NthBySlot { target_field, nth } => {
776                self.execute_scalar_terminal_boundary(
777                    query,
778                    ScalarTerminalBoundaryRequest::NthBySlot { target_field, nth },
779                )?
780                .into_id::<E>()
781                .map(FluentScalarTerminalOutput::Id)
782                .map_err(QueryError::execute)
783            }
784            PreparedFluentOrderSensitiveTerminalRuntimeRequest::MedianBySlot { target_field } => {
785                self.execute_scalar_terminal_boundary(
786                    query,
787                    ScalarTerminalBoundaryRequest::MedianBySlot { target_field },
788                )?
789                .into_id::<E>()
790                .map(FluentScalarTerminalOutput::Id)
791                .map_err(QueryError::execute)
792            }
793            PreparedFluentOrderSensitiveTerminalRuntimeRequest::MinMaxBySlot { target_field } => {
794                self.execute_scalar_terminal_boundary(
795                    query,
796                    ScalarTerminalBoundaryRequest::MinMaxBySlot { target_field },
797                )?
798                .into_id_pair::<E>()
799                .map(FluentScalarTerminalOutput::IdPair)
800                .map_err(QueryError::execute)
801            }
802        }
803    }
804
805    // Execute one fluent numeric-field terminal through the session-owned
806    // request conversion layer.
807    pub(in crate::db) fn execute_fluent_numeric_field_terminal<E>(
808        &self,
809        query: &Query<E>,
810        strategy: PreparedFluentNumericFieldStrategy,
811    ) -> Result<Option<Decimal>, QueryError>
812    where
813        E: PersistedRow<Canister = C> + EntityValue,
814    {
815        let (target_field, runtime_request) = strategy.into_runtime_parts();
816        let request = match runtime_request {
817            PreparedFluentNumericFieldRuntimeRequest::Sum => ScalarNumericFieldBoundaryRequest::Sum,
818            PreparedFluentNumericFieldRuntimeRequest::SumDistinct => {
819                ScalarNumericFieldBoundaryRequest::SumDistinct
820            }
821            PreparedFluentNumericFieldRuntimeRequest::Avg => ScalarNumericFieldBoundaryRequest::Avg,
822            PreparedFluentNumericFieldRuntimeRequest::AvgDistinct => {
823                ScalarNumericFieldBoundaryRequest::AvgDistinct
824            }
825        };
826
827        self.execute_load_query_with(query, move |load, plan| {
828            load.execute_numeric_field_boundary(plan, target_field, request)
829        })
830    }
831
832    // Execute one fluent projection terminal through a query-owned output
833    // shape after the session adapter has decoded any data keys into typed ids.
834    pub(in crate::db) fn execute_fluent_projection_terminal<E>(
835        &self,
836        query: &Query<E>,
837        strategy: PreparedFluentProjectionStrategy,
838    ) -> Result<FluentProjectionTerminalOutput<E>, QueryError>
839    where
840        E: PersistedRow<Canister = C> + EntityValue,
841    {
842        let (target_field, runtime_request) = strategy.into_runtime_parts();
843
844        match runtime_request {
845            PreparedFluentProjectionRuntimeRequest::Values => self
846                .execute_scalar_projection_boundary(
847                    query,
848                    target_field,
849                    ScalarProjectionBoundaryRequest::Values,
850                )?
851                .into_values()
852                .map(FluentProjectionTerminalOutput::Values)
853                .map_err(QueryError::execute),
854            PreparedFluentProjectionRuntimeRequest::DistinctValues => self
855                .execute_scalar_projection_boundary(
856                    query,
857                    target_field,
858                    ScalarProjectionBoundaryRequest::DistinctValues,
859                )?
860                .into_values()
861                .map(FluentProjectionTerminalOutput::Values)
862                .map_err(QueryError::execute),
863            PreparedFluentProjectionRuntimeRequest::CountDistinct => self
864                .execute_scalar_projection_boundary(
865                    query,
866                    target_field,
867                    ScalarProjectionBoundaryRequest::CountDistinct,
868                )?
869                .into_count()
870                .map(FluentProjectionTerminalOutput::Count)
871                .map_err(QueryError::execute),
872            PreparedFluentProjectionRuntimeRequest::ValuesWithIds => self
873                .execute_scalar_projection_boundary(
874                    query,
875                    target_field,
876                    ScalarProjectionBoundaryRequest::ValuesWithIds,
877                )?
878                .into_values_with_ids::<E>()
879                .map(FluentProjectionTerminalOutput::ValuesWithIds)
880                .map_err(QueryError::execute),
881            PreparedFluentProjectionRuntimeRequest::TerminalValue { terminal_kind } => self
882                .execute_scalar_projection_boundary(
883                    query,
884                    target_field,
885                    ScalarProjectionBoundaryRequest::TerminalValue { terminal_kind },
886                )?
887                .into_terminal_value()
888                .map(FluentProjectionTerminalOutput::TerminalValue)
889                .map_err(QueryError::execute),
890        }
891    }
892
893    // Execute the fluent `bytes()` terminal without leaking `LoadExecutor`
894    // closure assembly into query fluent code.
895    pub(in crate::db) fn execute_fluent_bytes<E>(&self, query: &Query<E>) -> Result<u64, QueryError>
896    where
897        E: PersistedRow<Canister = C> + EntityValue,
898    {
899        self.execute_load_query_with(query, |load, plan| load.bytes(plan))
900    }
901
902    // Execute the fluent `bytes_by(field)` terminal at the session boundary.
903    pub(in crate::db) fn execute_fluent_bytes_by_slot<E>(
904        &self,
905        query: &Query<E>,
906        target_slot: FieldSlot,
907    ) -> Result<u64, QueryError>
908    where
909        E: PersistedRow<Canister = C> + EntityValue,
910    {
911        self.execute_load_query_with(query, move |load, plan| {
912            load.bytes_by_slot(plan, target_slot)
913        })
914    }
915
916    // Execute the fluent `take(k)` terminal at the session boundary.
917    pub(in crate::db) fn execute_fluent_take<E>(
918        &self,
919        query: &Query<E>,
920        take_count: u32,
921    ) -> Result<EntityResponse<E>, QueryError>
922    where
923        E: PersistedRow<Canister = C> + EntityValue,
924    {
925        self.execute_load_query_with(query, move |load, plan| load.take(plan, take_count))
926    }
927
928    // Execute one row-returning fluent top/bottom-k terminal at the session boundary.
929    pub(in crate::db) fn execute_fluent_ranked_rows_by_slot<E>(
930        &self,
931        query: &Query<E>,
932        target_slot: FieldSlot,
933        take_count: u32,
934        descending: bool,
935    ) -> Result<EntityResponse<E>, QueryError>
936    where
937        E: PersistedRow<Canister = C> + EntityValue,
938    {
939        self.execute_load_query_with(query, move |load, plan| {
940            if descending {
941                load.top_k_by_slot(plan, target_slot, take_count)
942            } else {
943                load.bottom_k_by_slot(plan, target_slot, take_count)
944            }
945        })
946    }
947
948    // Execute one value-returning fluent top/bottom-k terminal at the session boundary.
949    pub(in crate::db) fn execute_fluent_ranked_values_by_slot<E>(
950        &self,
951        query: &Query<E>,
952        target_slot: FieldSlot,
953        take_count: u32,
954        descending: bool,
955    ) -> Result<Vec<Value>, QueryError>
956    where
957        E: PersistedRow<Canister = C> + EntityValue,
958    {
959        self.execute_load_query_with(query, move |load, plan| {
960            if descending {
961                load.top_k_by_values_slot(plan, target_slot, take_count)
962            } else {
963                load.bottom_k_by_values_slot(plan, target_slot, take_count)
964            }
965        })
966    }
967
968    // Execute one id/value-returning fluent top/bottom-k terminal at the session boundary.
969    pub(in crate::db) fn execute_fluent_ranked_values_with_ids_by_slot<E>(
970        &self,
971        query: &Query<E>,
972        target_slot: FieldSlot,
973        take_count: u32,
974        descending: bool,
975    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
976    where
977        E: PersistedRow<Canister = C> + EntityValue,
978    {
979        self.execute_load_query_with(query, move |load, plan| {
980            if descending {
981                load.top_k_by_with_ids_slot(plan, target_slot, take_count)
982            } else {
983                load.bottom_k_by_with_ids_slot(plan, target_slot, take_count)
984            }
985        })
986    }
987
988    /// Build one trace payload for a query without executing it.
989    ///
990    /// This lightweight surface is intended for developer diagnostics:
991    /// plan hash, access strategy summary, and planner/executor route shape.
992    pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
993    where
994        E: EntityKind<Canister = C>,
995    {
996        let visibility = self.query_plan_visibility_for_store_path(E::Store::PATH)?;
997        let visible_indexes = Self::visible_indexes_for_model(E::MODEL, visibility);
998        let (prepared_plan, cache_attribution) =
999            self.cached_prepared_query_plan_for_entity::<E>(query)?;
1000        let logical_plan = prepared_plan.logical_plan();
1001        let explain = logical_plan.explain();
1002        let plan_hash = query.plan_hash_hex_with_visible_indexes(&visible_indexes)?;
1003        let executable_access = prepared_plan.access().executable_contract();
1004        let access_strategy = summarize_executable_access_plan(&executable_access);
1005        let execution_family = match query.mode() {
1006            QueryMode::Load(_) => Some(trace_execution_family_from_executor(
1007                prepared_plan
1008                    .execution_family()
1009                    .map_err(QueryError::execute)?,
1010            )),
1011            QueryMode::Delete(_) => None,
1012        };
1013        let reuse = query_plan_cache_reuse_event(cache_attribution);
1014
1015        Ok(QueryTracePlan::new(
1016            plan_hash,
1017            access_strategy,
1018            execution_family,
1019            reuse,
1020            explain,
1021        ))
1022    }
1023
1024    /// Execute one scalar paged load query and return optional continuation cursor plus trace.
1025    pub(crate) fn execute_load_query_paged_with_trace<E>(
1026        &self,
1027        query: &Query<E>,
1028        cursor_token: Option<&str>,
1029    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
1030    where
1031        E: PersistedRow<Canister = C> + EntityValue,
1032    {
1033        // Phase 1: build/validate prepared execution plan and reject grouped plans.
1034        let plan = self.cached_prepared_query_plan_for_entity::<E>(query)?.0;
1035        Self::ensure_scalar_paged_execution_family(
1036            plan.execution_family().map_err(QueryError::execute)?,
1037        )?;
1038
1039        // Phase 2: decode external cursor token and validate it against plan surface.
1040        let cursor_bytes = decode_optional_cursor_token(cursor_token)
1041            .map_err(QueryError::from_cursor_plan_error)?;
1042        let cursor = plan
1043            .prepare_cursor(cursor_bytes.as_deref())
1044            .map_err(query_error_from_executor_plan_error)?;
1045
1046        // Phase 3: execute one traced page and encode outbound continuation token.
1047        let (page, trace) = self
1048            .with_metrics(|| {
1049                self.load_executor::<E>()
1050                    .execute_paged_with_cursor_traced(plan, cursor)
1051            })
1052            .map_err(QueryError::execute)?;
1053        finalize_scalar_paged_execution(page, trace)
1054    }
1055
1056    /// Execute one grouped query page with optional grouped continuation cursor.
1057    ///
1058    /// This is the explicit grouped execution boundary; scalar load APIs reject
1059    /// grouped plans to preserve scalar response contracts.
1060    pub(in crate::db) fn execute_grouped<E>(
1061        &self,
1062        query: &Query<E>,
1063        cursor_token: Option<&str>,
1064    ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
1065    where
1066        E: PersistedRow<Canister = C> + EntityValue,
1067    {
1068        // Phase 1: build the prepared execution plan once from the typed query.
1069        let plan = self.cached_prepared_query_plan_for_entity::<E>(query)?.0;
1070
1071        // Phase 2: reuse the shared prepared grouped execution path and then
1072        // finalize the outward grouped payload at the session boundary.
1073        let (page, trace) = self.execute_grouped_plan_with_trace(plan, cursor_token)?;
1074
1075        Self::finalize_grouped_execution_page(page, trace)
1076    }
1077
1078    // Execute one grouped prepared plan page with optional grouped cursor
1079    // while letting the caller choose the final grouped-runtime dispatch.
1080    fn execute_grouped_plan_with<E, T>(
1081        &self,
1082        plan: PreparedExecutionPlan<E>,
1083        cursor_token: Option<&str>,
1084        op: impl FnOnce(
1085            LoadExecutor<E>,
1086            PreparedExecutionPlan<E>,
1087            crate::db::cursor::GroupedPlannedCursor,
1088        ) -> Result<T, InternalError>,
1089    ) -> Result<T, QueryError>
1090    where
1091        E: PersistedRow<Canister = C> + EntityValue,
1092    {
1093        // Phase 1: validate the prepared plan shape before decoding cursors.
1094        Self::ensure_grouped_execution_family(
1095            plan.execution_family().map_err(QueryError::execute)?,
1096        )?;
1097
1098        // Phase 2: decode external grouped cursor token and validate against plan.
1099        let cursor = decode_optional_grouped_cursor_token(cursor_token)
1100            .map_err(QueryError::from_cursor_plan_error)?;
1101        let cursor = plan
1102            .prepare_grouped_cursor_token(cursor)
1103            .map_err(query_error_from_executor_plan_error)?;
1104
1105        // Phase 3: execute one grouped page while preserving the structural
1106        // grouped cursor payload for whichever outward cursor format the caller needs.
1107        self.with_metrics(|| op(self.load_executor::<E>(), plan, cursor))
1108            .map_err(QueryError::execute)
1109    }
1110
1111    // Execute one grouped prepared plan page with optional grouped cursor.
1112    fn execute_grouped_plan_with_trace<E>(
1113        &self,
1114        plan: PreparedExecutionPlan<E>,
1115        cursor_token: Option<&str>,
1116    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
1117    where
1118        E: PersistedRow<Canister = C> + EntityValue,
1119    {
1120        self.execute_grouped_plan_with(plan, cursor_token, |executor, plan, cursor| {
1121            executor.execute_grouped_paged_with_cursor_traced(plan, cursor)
1122        })
1123    }
1124}