Skip to main content

icydb_core/db/session/
query.rs

1//! Module: db::session::query
2//! Responsibility: session-bound query planning, explain, and cursor execution
3//! helpers that recover store visibility before delegating to query-owned logic.
4//! Does not own: query intent construction or executor runtime semantics.
5//! Boundary: resolves session visibility and cursor policy before handing work to the planner/executor.
6
7mod cache;
8
9#[cfg(feature = "diagnostics")]
10use crate::db::diagnostics::measure_local_instruction_delta as measure_query_stage;
11#[cfg(feature = "diagnostics")]
12use crate::db::executor::{
13    GroupedCountAttribution, GroupedExecutePhaseAttribution, ScalarExecutePhaseAttribution,
14};
15use crate::{
16    db::{
17        DbSession, EntityResponse, LoadQueryResult, PagedGroupedExecutionWithTrace,
18        PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
19        TraceExecutionFamily,
20        access::summarize_executable_access_plan,
21        cursor::{
22            CursorPlanError, decode_optional_cursor_token, decode_optional_grouped_cursor_token,
23        },
24        diagnostics::ExecutionTrace,
25        executor::{
26            ExecutionFamily, ExecutorPlanError, LoadExecutor, PreparedExecutionPlan,
27            ScalarProjectionBoundaryOutput, ScalarProjectionBoundaryRequest,
28            ScalarTerminalBoundaryOutput, ScalarTerminalBoundaryRequest,
29            StructuralGroupedProjectionResult,
30        },
31        query::builder::{
32            PreparedFluentAggregateExplainStrategy,
33            PreparedFluentExistingRowsTerminalRuntimeRequest,
34            PreparedFluentExistingRowsTerminalStrategy, PreparedFluentNumericFieldStrategy,
35            PreparedFluentOrderSensitiveTerminalStrategy, PreparedFluentProjectionRuntimeRequest,
36            PreparedFluentProjectionStrategy, PreparedFluentScalarTerminalStrategy,
37        },
38        query::explain::{
39            ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
40        },
41        query::fluent::load::{FluentProjectionTerminalOutput, FluentScalarTerminalOutput},
42        query::{
43            intent::{CompiledQuery, PlannedQuery},
44            plan::{AccessPlannedQuery, FieldSlot, QueryMode, VisibleIndexes},
45        },
46        session::{finalize_scalar_paged_execution, finalize_structural_grouped_projection_result},
47    },
48    error::InternalError,
49    traits::{CanisterKind, EntityKind, EntityValue},
50    types::{Decimal, Id},
51    value::Value,
52};
53pub(in crate::db) use cache::QueryPlanCacheAttribution;
54#[cfg(test)]
55pub(in crate::db) use cache::QueryPlanVisibility;
56pub(in crate::db::session) use cache::query_plan_cache_reuse_event;
57#[cfg(feature = "diagnostics")]
58use candid::CandidType;
59#[cfg(feature = "diagnostics")]
60use serde::Deserialize;
61
62// Translate executor route-family selection into the query-owned trace label
63// at the session boundary so trace DTOs do not depend on executor types.
64const fn trace_execution_family_from_executor(family: ExecutionFamily) -> TraceExecutionFamily {
65    match family {
66        ExecutionFamily::PrimaryKey => TraceExecutionFamily::PrimaryKey,
67        ExecutionFamily::Ordered => TraceExecutionFamily::Ordered,
68        ExecutionFamily::Grouped => TraceExecutionFamily::Grouped,
69    }
70}
71
72// Convert executor plan-surface failures at the session boundary so query error
73// types do not import executor-owned error enums.
74pub(in crate::db::session) fn query_error_from_executor_plan_error(
75    err: ExecutorPlanError,
76) -> QueryError {
77    match err {
78        ExecutorPlanError::Cursor(err) => QueryError::from_cursor_plan_error(*err),
79    }
80}
81
82///
83/// QueryExecutionAttribution
84///
85/// QueryExecutionAttribution records the top-level compile/execute split for
86/// typed/fluent query execution at the session boundary.
87/// Every field is an additive counter where zero means no observed work or no
88/// observed event for that bucket. Future non-additive diagnostics must use an
89/// explicit presence type instead of relying on `Default`.
90///
91#[cfg(feature = "diagnostics")]
92#[derive(CandidType, Clone, Debug, Default, Deserialize, Eq, PartialEq)]
93pub struct QueryExecutionAttribution {
94    pub compile_local_instructions: u64,
95    pub plan_lookup_local_instructions: u64,
96    pub executor_invocation_local_instructions: u64,
97    pub response_finalization_local_instructions: u64,
98    pub runtime_local_instructions: u64,
99    pub finalize_local_instructions: u64,
100    pub direct_data_row_scan_local_instructions: u64,
101    pub direct_data_row_key_stream_local_instructions: u64,
102    pub direct_data_row_row_read_local_instructions: u64,
103    pub direct_data_row_key_encode_local_instructions: u64,
104    pub direct_data_row_store_get_local_instructions: u64,
105    pub direct_data_row_order_window_local_instructions: u64,
106    pub direct_data_row_page_window_local_instructions: u64,
107    pub grouped_stream_local_instructions: u64,
108    pub grouped_fold_local_instructions: u64,
109    pub grouped_finalize_local_instructions: u64,
110    pub grouped_count_borrowed_hash_computations: u64,
111    pub grouped_count_bucket_candidate_checks: u64,
112    pub grouped_count_existing_group_hits: u64,
113    pub grouped_count_new_group_inserts: u64,
114    pub grouped_count_row_materialization_local_instructions: u64,
115    pub grouped_count_group_lookup_local_instructions: u64,
116    pub grouped_count_existing_group_update_local_instructions: u64,
117    pub grouped_count_new_group_insert_local_instructions: u64,
118    pub response_decode_local_instructions: u64,
119    pub execute_local_instructions: u64,
120    pub total_local_instructions: u64,
121    pub shared_query_plan_cache_hits: u64,
122    pub shared_query_plan_cache_misses: u64,
123}
124
125#[cfg(feature = "diagnostics")]
126#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
127struct QueryExecutePhaseAttribution {
128    executor_invocation_local_instructions: u64,
129    response_finalization_local_instructions: u64,
130    runtime_local_instructions: u64,
131    finalize_local_instructions: u64,
132    direct_data_row_scan_local_instructions: u64,
133    direct_data_row_key_stream_local_instructions: u64,
134    direct_data_row_row_read_local_instructions: u64,
135    direct_data_row_key_encode_local_instructions: u64,
136    direct_data_row_store_get_local_instructions: u64,
137    direct_data_row_order_window_local_instructions: u64,
138    direct_data_row_page_window_local_instructions: u64,
139    grouped_stream_local_instructions: u64,
140    grouped_fold_local_instructions: u64,
141    grouped_finalize_local_instructions: u64,
142    grouped_count: GroupedCountAttribution,
143}
144
145impl<C: CanisterKind> DbSession<C> {
146    #[cfg(feature = "diagnostics")]
147    const fn empty_query_execute_phase_attribution() -> QueryExecutePhaseAttribution {
148        QueryExecutePhaseAttribution {
149            executor_invocation_local_instructions: 0,
150            response_finalization_local_instructions: 0,
151            runtime_local_instructions: 0,
152            finalize_local_instructions: 0,
153            direct_data_row_scan_local_instructions: 0,
154            direct_data_row_key_stream_local_instructions: 0,
155            direct_data_row_row_read_local_instructions: 0,
156            direct_data_row_key_encode_local_instructions: 0,
157            direct_data_row_store_get_local_instructions: 0,
158            direct_data_row_order_window_local_instructions: 0,
159            direct_data_row_page_window_local_instructions: 0,
160            grouped_stream_local_instructions: 0,
161            grouped_fold_local_instructions: 0,
162            grouped_finalize_local_instructions: 0,
163            grouped_count: GroupedCountAttribution::none(),
164        }
165    }
166
167    #[cfg(feature = "diagnostics")]
168    const fn scalar_query_execute_phase_attribution(
169        phase: ScalarExecutePhaseAttribution,
170        executor_invocation_local_instructions: u64,
171    ) -> QueryExecutePhaseAttribution {
172        QueryExecutePhaseAttribution {
173            executor_invocation_local_instructions,
174            response_finalization_local_instructions: 0,
175            runtime_local_instructions: phase.runtime_local_instructions,
176            finalize_local_instructions: phase.finalize_local_instructions,
177            direct_data_row_scan_local_instructions: phase.direct_data_row_scan_local_instructions,
178            direct_data_row_key_stream_local_instructions: phase
179                .direct_data_row_key_stream_local_instructions,
180            direct_data_row_row_read_local_instructions: phase
181                .direct_data_row_row_read_local_instructions,
182            direct_data_row_key_encode_local_instructions: phase
183                .direct_data_row_key_encode_local_instructions,
184            direct_data_row_store_get_local_instructions: phase
185                .direct_data_row_store_get_local_instructions,
186            direct_data_row_order_window_local_instructions: phase
187                .direct_data_row_order_window_local_instructions,
188            direct_data_row_page_window_local_instructions: phase
189                .direct_data_row_page_window_local_instructions,
190            grouped_stream_local_instructions: 0,
191            grouped_fold_local_instructions: 0,
192            grouped_finalize_local_instructions: 0,
193            grouped_count: GroupedCountAttribution::none(),
194        }
195    }
196
197    #[cfg(feature = "diagnostics")]
198    const fn grouped_query_execute_phase_attribution(
199        phase: GroupedExecutePhaseAttribution,
200        executor_invocation_local_instructions: u64,
201        response_finalization_local_instructions: u64,
202    ) -> QueryExecutePhaseAttribution {
203        QueryExecutePhaseAttribution {
204            executor_invocation_local_instructions,
205            response_finalization_local_instructions,
206            runtime_local_instructions: phase
207                .stream_local_instructions
208                .saturating_add(phase.fold_local_instructions),
209            finalize_local_instructions: phase.finalize_local_instructions,
210            direct_data_row_scan_local_instructions: 0,
211            direct_data_row_key_stream_local_instructions: 0,
212            direct_data_row_row_read_local_instructions: 0,
213            direct_data_row_key_encode_local_instructions: 0,
214            direct_data_row_store_get_local_instructions: 0,
215            direct_data_row_order_window_local_instructions: 0,
216            direct_data_row_page_window_local_instructions: 0,
217            grouped_stream_local_instructions: phase.stream_local_instructions,
218            grouped_fold_local_instructions: phase.fold_local_instructions,
219            grouped_finalize_local_instructions: phase.finalize_local_instructions,
220            grouped_count: phase.grouped_count,
221        }
222    }
223
224    // Compile one typed query using only the indexes currently visible for the
225    // query's recovered store.
226    pub(in crate::db) fn compile_query_with_visible_indexes<E>(
227        &self,
228        query: &Query<E>,
229    ) -> Result<CompiledQuery<E>, QueryError>
230    where
231        E: EntityKind<Canister = C>,
232    {
233        self.map_cached_shared_query_plan_for_entity(query, CompiledQuery::<E>::from_plan)
234    }
235
236    // Build one logical planned-query shell using only the indexes currently
237    // visible for the query's recovered store.
238    pub(in crate::db) fn planned_query_with_visible_indexes<E>(
239        &self,
240        query: &Query<E>,
241    ) -> Result<PlannedQuery<E>, QueryError>
242    where
243        E: EntityKind<Canister = C>,
244    {
245        self.map_cached_shared_query_plan_for_entity(query, PlannedQuery::<E>::from_plan)
246    }
247
248    // Borrow the cached logical plan for read-only query diagnostics so explain
249    // and hash surfaces do not clone the full access-planned query.
250    fn try_map_cached_logical_query_plan<E, T>(
251        &self,
252        query: &Query<E>,
253        map: impl FnOnce(&AccessPlannedQuery) -> Result<T, QueryError>,
254    ) -> Result<T, QueryError>
255    where
256        E: EntityKind<Canister = C>,
257    {
258        self.try_map_cached_shared_query_plan_ref_for_entity::<E, T>(query, |prepared_plan| {
259            map(prepared_plan.logical_plan())
260        })
261    }
262
263    // Reuse the same cached logical plan as execution explain, then freeze the
264    // explain-only access-choice facts for the effective session-visible index
265    // slice before route facts are assembled.
266    fn cached_finalized_execution_explain_plan<E>(
267        &self,
268        query: &Query<E>,
269        visible_indexes: &VisibleIndexes<'_>,
270    ) -> Result<(AccessPlannedQuery, QueryPlanCacheAttribution), QueryError>
271    where
272        E: EntityKind<Canister = C>,
273    {
274        let (prepared_plan, cache_attribution) =
275            self.cached_shared_query_plan_for_entity::<E>(query)?;
276        let mut plan = prepared_plan.logical_plan().clone();
277
278        plan.finalize_access_choice_for_model_with_indexes(
279            query.structural().model(),
280            visible_indexes.as_slice(),
281        );
282
283        Ok((plan, cache_attribution))
284    }
285
286    // Project one logical explain payload using only planner-visible indexes.
287    pub(in crate::db) fn explain_query_with_visible_indexes<E>(
288        &self,
289        query: &Query<E>,
290    ) -> Result<ExplainPlan, QueryError>
291    where
292        E: EntityKind<Canister = C>,
293    {
294        self.try_map_cached_logical_query_plan(query, |plan| Ok(plan.explain()))
295    }
296
297    // Hash one typed query plan using only the indexes currently visible for
298    // the query's recovered store.
299    pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
300        &self,
301        query: &Query<E>,
302    ) -> Result<String, QueryError>
303    where
304        E: EntityKind<Canister = C>,
305    {
306        self.try_map_cached_logical_query_plan(query, |plan| Ok(plan.fingerprint().to_string()))
307    }
308
309    // Explain one load execution shape using only planner-visible
310    // indexes from the recovered store state.
311    pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
312        &self,
313        query: &Query<E>,
314    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
315    where
316        E: EntityValue + EntityKind<Canister = C>,
317    {
318        self.with_query_visible_indexes(query, |query, visible_indexes| {
319            let (plan, _) =
320                self.cached_finalized_execution_explain_plan::<E>(query, visible_indexes)?;
321
322            query
323                .structural()
324                .explain_execution_descriptor_from_plan(&plan)
325        })
326    }
327
328    // Render one load execution descriptor plus route diagnostics using
329    // only planner-visible indexes from the recovered store state.
330    pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
331        &self,
332        query: &Query<E>,
333    ) -> Result<String, QueryError>
334    where
335        E: EntityValue + EntityKind<Canister = C>,
336    {
337        self.with_query_visible_indexes(query, |query, visible_indexes| {
338            let (plan, cache_attribution) =
339                self.cached_finalized_execution_explain_plan::<E>(query, visible_indexes)?;
340
341            query
342                .structural()
343                .finalized_execution_diagnostics_from_plan_with_descriptor_mutator(
344                    &plan,
345                    Some(query_plan_cache_reuse_event(cache_attribution)),
346                    |_| {},
347                )
348                .map(|diagnostics| diagnostics.render_text_verbose())
349        })
350    }
351
352    // Explain one prepared fluent aggregate terminal from the same cached
353    // prepared plan used by execution.
354    pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
355        &self,
356        query: &Query<E>,
357        strategy: &S,
358    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
359    where
360        E: EntityValue + EntityKind<Canister = C>,
361        S: PreparedFluentAggregateExplainStrategy,
362    {
363        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
364
365        plan.explain_prepared_aggregate_terminal(strategy)
366    }
367
368    // Explain one `bytes_by(field)` terminal from the same cached prepared
369    // plan used by execution.
370    pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
371        &self,
372        query: &Query<E>,
373        target_field: &str,
374    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
375    where
376        E: EntityValue + EntityKind<Canister = C>,
377    {
378        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
379
380        plan.explain_bytes_by_terminal(target_field)
381    }
382
383    // Explain one prepared fluent projection terminal from the same cached
384    // prepared plan used by execution.
385    pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
386        &self,
387        query: &Query<E>,
388        strategy: &PreparedFluentProjectionStrategy,
389    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
390    where
391        E: EntityValue + EntityKind<Canister = C>,
392    {
393        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
394
395        plan.explain_prepared_projection_terminal(strategy)
396    }
397
398    // Validate that one execution strategy is admissible for scalar paged load
399    // execution and fail closed on grouped/primary-key-only routes.
400    fn ensure_scalar_paged_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
401        match family {
402            ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
403                CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
404            )),
405            ExecutionFamily::Ordered => Ok(()),
406            ExecutionFamily::Grouped => Err(QueryError::invariant(
407                "grouped queries execute via execute(), not page().execute()",
408            )),
409        }
410    }
411
412    // Validate that one execution strategy is admissible for the grouped
413    // execution surface.
414    fn ensure_grouped_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
415        match family {
416            ExecutionFamily::Grouped => Ok(()),
417            ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
418                "grouped execution requires grouped logical plans",
419            )),
420        }
421    }
422
423    /// Execute one scalar load/delete query and return materialized response rows.
424    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
425    where
426        E: PersistedRow<Canister = C> + EntityValue,
427    {
428        // Phase 1: compile typed intent into one prepared execution-plan contract.
429        let mode = query.mode();
430        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
431
432        // Phase 2: delegate execution to the shared compiled-plan entry path.
433        self.execute_query_dyn(mode, plan)
434    }
435
436    /// Execute one typed query while reporting the compile/execute split at
437    /// the shared fluent query seam.
438    #[cfg(feature = "diagnostics")]
439    #[doc(hidden)]
440    #[expect(
441        clippy::too_many_lines,
442        reason = "the diagnostics-only attribution path keeps grouped and scalar execution on one explicit compile/execute accounting seam"
443    )]
444    #[expect(
445        clippy::needless_update,
446        reason = "diagnostics attribution literals stay default-backed so future counters do not break every initializer"
447    )]
448    pub fn execute_query_result_with_attribution<E>(
449        &self,
450        query: &Query<E>,
451    ) -> Result<(LoadQueryResult<E>, QueryExecutionAttribution), QueryError>
452    where
453        E: PersistedRow<Canister = C> + EntityValue,
454    {
455        // Phase 1: measure compile work at the typed/fluent boundary,
456        // including the shared lower query-plan cache lookup/build exactly
457        // once. This preserves honest hit/miss attribution without
458        // double-building plans on one-shot cache misses.
459        let (plan_lookup_local_instructions, plan_and_cache) =
460            measure_query_stage(|| self.cached_prepared_query_plan_for_entity::<E>(query));
461        let (plan, cache_attribution) = plan_and_cache?;
462        let compile_local_instructions = plan_lookup_local_instructions;
463
464        // Phase 2: execute one query result using the prepared plan produced
465        // by the compile/cache boundary above.
466        let result =
467            || -> Result<(LoadQueryResult<E>, QueryExecutePhaseAttribution, u64), QueryError> {
468                if query.has_grouping() {
469                    let (executor_invocation_local_instructions, grouped_page) =
470                        measure_query_stage(|| {
471                            self.execute_grouped_plan_with(plan, None, |executor, plan, cursor| {
472                                executor
473                                    .execute_grouped_paged_with_cursor_traced_with_phase_attribution(
474                                        plan, cursor,
475                                    )
476                            })
477                        });
478                    let (result, trace, phase_attribution) = grouped_page?;
479                    let (response_finalization_local_instructions, grouped) =
480                        measure_query_stage(|| {
481                            finalize_structural_grouped_projection_result(result, trace)
482                        });
483                    let grouped = grouped?;
484
485                    Ok((
486                        LoadQueryResult::Grouped(grouped),
487                        Self::grouped_query_execute_phase_attribution(
488                            phase_attribution,
489                            executor_invocation_local_instructions,
490                            response_finalization_local_instructions,
491                        ),
492                        0,
493                    ))
494                } else {
495                    match query.mode() {
496                        QueryMode::Load(_) => {
497                            let (executor_invocation_local_instructions, executed) =
498                                measure_query_stage(|| {
499                                    self.load_executor::<E>()
500                                        .execute_with_phase_attribution(plan)
501                                        .map_err(QueryError::execute)
502                                });
503                            let (rows, phase_attribution, response_decode_local_instructions) =
504                                executed?;
505
506                            Ok((
507                                LoadQueryResult::Rows(rows),
508                                Self::scalar_query_execute_phase_attribution(
509                                    phase_attribution,
510                                    executor_invocation_local_instructions,
511                                ),
512                                response_decode_local_instructions,
513                            ))
514                        }
515                        QueryMode::Delete(_) => {
516                            let (executor_invocation_local_instructions, result) =
517                                measure_query_stage(|| self.execute_query_dyn(query.mode(), plan));
518                            let result = result?;
519
520                            Ok((
521                                LoadQueryResult::Rows(result),
522                                QueryExecutePhaseAttribution {
523                                    executor_invocation_local_instructions,
524                                    ..Self::empty_query_execute_phase_attribution()
525                                },
526                                0,
527                            ))
528                        }
529                    }
530                }
531            }();
532        let (result, execute_phase_attribution, response_decode_local_instructions) = result?;
533        let execute_local_instructions = execute_phase_attribution
534            .executor_invocation_local_instructions
535            .saturating_add(execute_phase_attribution.response_finalization_local_instructions);
536        let total_local_instructions =
537            compile_local_instructions.saturating_add(execute_local_instructions);
538
539        Ok((
540            result,
541            QueryExecutionAttribution {
542                compile_local_instructions,
543                plan_lookup_local_instructions,
544                executor_invocation_local_instructions: execute_phase_attribution
545                    .executor_invocation_local_instructions,
546                response_finalization_local_instructions: execute_phase_attribution
547                    .response_finalization_local_instructions,
548                runtime_local_instructions: execute_phase_attribution.runtime_local_instructions,
549                finalize_local_instructions: execute_phase_attribution.finalize_local_instructions,
550                direct_data_row_scan_local_instructions: execute_phase_attribution
551                    .direct_data_row_scan_local_instructions,
552                direct_data_row_key_stream_local_instructions: execute_phase_attribution
553                    .direct_data_row_key_stream_local_instructions,
554                direct_data_row_row_read_local_instructions: execute_phase_attribution
555                    .direct_data_row_row_read_local_instructions,
556                direct_data_row_key_encode_local_instructions: execute_phase_attribution
557                    .direct_data_row_key_encode_local_instructions,
558                direct_data_row_store_get_local_instructions: execute_phase_attribution
559                    .direct_data_row_store_get_local_instructions,
560                direct_data_row_order_window_local_instructions: execute_phase_attribution
561                    .direct_data_row_order_window_local_instructions,
562                direct_data_row_page_window_local_instructions: execute_phase_attribution
563                    .direct_data_row_page_window_local_instructions,
564                grouped_stream_local_instructions: execute_phase_attribution
565                    .grouped_stream_local_instructions,
566                grouped_fold_local_instructions: execute_phase_attribution
567                    .grouped_fold_local_instructions,
568                grouped_finalize_local_instructions: execute_phase_attribution
569                    .grouped_finalize_local_instructions,
570                grouped_count_borrowed_hash_computations: execute_phase_attribution
571                    .grouped_count
572                    .borrowed_hash_computations,
573                grouped_count_bucket_candidate_checks: execute_phase_attribution
574                    .grouped_count
575                    .bucket_candidate_checks,
576                grouped_count_existing_group_hits: execute_phase_attribution
577                    .grouped_count
578                    .existing_group_hits,
579                grouped_count_new_group_inserts: execute_phase_attribution
580                    .grouped_count
581                    .new_group_inserts,
582                grouped_count_row_materialization_local_instructions: execute_phase_attribution
583                    .grouped_count
584                    .row_materialization_local_instructions,
585                grouped_count_group_lookup_local_instructions: execute_phase_attribution
586                    .grouped_count
587                    .group_lookup_local_instructions,
588                grouped_count_existing_group_update_local_instructions: execute_phase_attribution
589                    .grouped_count
590                    .existing_group_update_local_instructions,
591                grouped_count_new_group_insert_local_instructions: execute_phase_attribution
592                    .grouped_count
593                    .new_group_insert_local_instructions,
594                response_decode_local_instructions,
595                execute_local_instructions,
596                total_local_instructions,
597                shared_query_plan_cache_hits: cache_attribution.hits,
598                shared_query_plan_cache_misses: cache_attribution.misses,
599                ..QueryExecutionAttribution::default()
600            },
601        ))
602    }
603
604    // Execute one typed query through the unified row/grouped result surface so
605    // higher layers do not need to branch on grouped shape themselves.
606    #[doc(hidden)]
607    pub fn execute_query_result<E>(
608        &self,
609        query: &Query<E>,
610    ) -> Result<LoadQueryResult<E>, QueryError>
611    where
612        E: PersistedRow<Canister = C> + EntityValue,
613    {
614        if query.has_grouping() {
615            return self
616                .execute_grouped(query, None)
617                .map(LoadQueryResult::Grouped);
618        }
619
620        self.execute_query(query).map(LoadQueryResult::Rows)
621    }
622
623    /// Execute one typed delete query and return only the affected-row count.
624    #[doc(hidden)]
625    pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
626    where
627        E: PersistedRow<Canister = C> + EntityValue,
628    {
629        // Phase 1: fail closed if the caller routes a non-delete query here.
630        if !query.mode().is_delete() {
631            return Err(QueryError::unsupported_query(
632                "delete count execution requires delete query mode",
633            ));
634        }
635
636        // Phase 2: resolve one cached prepared execution-plan contract directly
637        // from the shared lower boundary instead of rebuilding it through the
638        // typed compiled-query wrapper.
639        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
640
641        // Phase 3: execute the shared delete core while skipping response-row materialization.
642        self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
643            .map_err(QueryError::execute)
644    }
645
646    /// Execute one scalar query from one pre-built prepared execution contract.
647    ///
648    /// This is the shared compiled-plan entry boundary used by the typed
649    /// `execute_query(...)` surface and adjacent query execution facades.
650    pub(in crate::db) fn execute_query_dyn<E>(
651        &self,
652        mode: QueryMode,
653        plan: PreparedExecutionPlan<E>,
654    ) -> Result<EntityResponse<E>, QueryError>
655    where
656        E: PersistedRow<Canister = C> + EntityValue,
657    {
658        let result = match mode {
659            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
660            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
661        };
662
663        result.map_err(QueryError::execute)
664    }
665
666    // Shared load-query terminal wrapper: build plan, run under metrics, map
667    // execution errors into query-facing errors.
668    pub(in crate::db) fn execute_load_query_with<E, T>(
669        &self,
670        query: &Query<E>,
671        op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
672    ) -> Result<T, QueryError>
673    where
674        E: PersistedRow<Canister = C> + EntityValue,
675    {
676        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
677
678        self.with_metrics(|| op(self.load_executor::<E>(), plan))
679            .map_err(QueryError::execute)
680    }
681
682    // Execute one scalar terminal boundary and keep the executor-specific
683    // request/output types contained inside the session adapter.
684    fn execute_scalar_terminal_boundary<E>(
685        &self,
686        query: &Query<E>,
687        request: ScalarTerminalBoundaryRequest,
688    ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
689    where
690        E: PersistedRow<Canister = C> + EntityValue,
691    {
692        self.execute_load_query_with(query, move |load, plan| {
693            load.execute_scalar_terminal_request(plan, request)
694        })
695    }
696
697    // Execute one projection terminal boundary and keep field projection
698    // executor details out of fluent query modules.
699    fn execute_scalar_projection_boundary<E>(
700        &self,
701        query: &Query<E>,
702        target_field: FieldSlot,
703        request: ScalarProjectionBoundaryRequest,
704    ) -> Result<ScalarProjectionBoundaryOutput, QueryError>
705    where
706        E: PersistedRow<Canister = C> + EntityValue,
707    {
708        self.execute_load_query_with(query, move |load, plan| {
709            load.execute_scalar_projection_boundary(plan, target_field, request)
710        })
711    }
712
713    // Execute one fluent count/exists terminal through a query-owned result
714    // shape so fluent terminals do not import executor aggregate outputs.
715    pub(in crate::db) fn execute_fluent_existing_rows_terminal<E>(
716        &self,
717        query: &Query<E>,
718        strategy: PreparedFluentExistingRowsTerminalStrategy,
719    ) -> Result<FluentScalarTerminalOutput<E>, QueryError>
720    where
721        E: PersistedRow<Canister = C> + EntityValue,
722    {
723        let (request, output_shape) = strategy.into_executor_request();
724        let output = self.execute_scalar_terminal_boundary(query, request)?;
725
726        match output_shape {
727            PreparedFluentExistingRowsTerminalRuntimeRequest::CountRows => output
728                .into_count()
729                .map(FluentScalarTerminalOutput::Count)
730                .map_err(QueryError::execute),
731            PreparedFluentExistingRowsTerminalRuntimeRequest::ExistsRows => output
732                .into_exists()
733                .map(FluentScalarTerminalOutput::Exists)
734                .map_err(QueryError::execute),
735        }
736    }
737
738    // Execute one fluent id/extrema terminal through a query-owned result
739    // shape after the session adapter has decoded storage keys into typed ids.
740    pub(in crate::db) fn execute_fluent_scalar_terminal<E>(
741        &self,
742        query: &Query<E>,
743        strategy: PreparedFluentScalarTerminalStrategy,
744    ) -> Result<FluentScalarTerminalOutput<E>, QueryError>
745    where
746        E: PersistedRow<Canister = C> + EntityValue,
747    {
748        let request = strategy.into_executor_request();
749
750        self.execute_scalar_terminal_boundary(query, request)?
751            .into_id::<E>()
752            .map(FluentScalarTerminalOutput::Id)
753            .map_err(QueryError::execute)
754    }
755
756    // Execute one fluent order-sensitive terminal through the session adapter.
757    // The min/max pair request remains distinguished because it returns two ids.
758    pub(in crate::db) fn execute_fluent_order_sensitive_terminal<E>(
759        &self,
760        query: &Query<E>,
761        strategy: PreparedFluentOrderSensitiveTerminalStrategy,
762    ) -> Result<FluentScalarTerminalOutput<E>, QueryError>
763    where
764        E: PersistedRow<Canister = C> + EntityValue,
765    {
766        let (request, returns_id_pair) = strategy.into_executor_request();
767        let output = self.execute_scalar_terminal_boundary(query, request)?;
768
769        if returns_id_pair {
770            return output
771                .into_id_pair::<E>()
772                .map(FluentScalarTerminalOutput::IdPair)
773                .map_err(QueryError::execute);
774        }
775
776        output
777            .into_id::<E>()
778            .map(FluentScalarTerminalOutput::Id)
779            .map_err(QueryError::execute)
780    }
781
782    // Execute one fluent numeric-field terminal through the session-owned
783    // request conversion layer.
784    pub(in crate::db) fn execute_fluent_numeric_field_terminal<E>(
785        &self,
786        query: &Query<E>,
787        strategy: PreparedFluentNumericFieldStrategy,
788    ) -> Result<Option<Decimal>, QueryError>
789    where
790        E: PersistedRow<Canister = C> + EntityValue,
791    {
792        let (target_field, request) = strategy.into_executor_request();
793
794        self.execute_load_query_with(query, move |load, plan| {
795            load.execute_numeric_field_boundary(plan, target_field, request)
796        })
797    }
798
799    // Execute one fluent projection terminal through a query-owned output
800    // shape after the session adapter has decoded any data keys into typed ids.
801    pub(in crate::db) fn execute_fluent_projection_terminal<E>(
802        &self,
803        query: &Query<E>,
804        strategy: PreparedFluentProjectionStrategy,
805    ) -> Result<FluentProjectionTerminalOutput<E>, QueryError>
806    where
807        E: PersistedRow<Canister = C> + EntityValue,
808    {
809        let (target_field, request, output_shape) = strategy.into_executor_request();
810        let output = self.execute_scalar_projection_boundary(query, target_field, request)?;
811
812        match output_shape {
813            PreparedFluentProjectionRuntimeRequest::Values
814            | PreparedFluentProjectionRuntimeRequest::DistinctValues => output
815                .into_values()
816                .map(FluentProjectionTerminalOutput::Values)
817                .map_err(QueryError::execute),
818            PreparedFluentProjectionRuntimeRequest::CountDistinct => output
819                .into_count()
820                .map(FluentProjectionTerminalOutput::Count)
821                .map_err(QueryError::execute),
822            PreparedFluentProjectionRuntimeRequest::ValuesWithIds => output
823                .into_values_with_ids::<E>()
824                .map(FluentProjectionTerminalOutput::ValuesWithIds)
825                .map_err(QueryError::execute),
826            PreparedFluentProjectionRuntimeRequest::TerminalValue { .. } => output
827                .into_terminal_value()
828                .map(FluentProjectionTerminalOutput::TerminalValue)
829                .map_err(QueryError::execute),
830        }
831    }
832
833    // Execute the fluent `bytes()` terminal without leaking `LoadExecutor`
834    // closure assembly into query fluent code.
835    pub(in crate::db) fn execute_fluent_bytes<E>(&self, query: &Query<E>) -> Result<u64, QueryError>
836    where
837        E: PersistedRow<Canister = C> + EntityValue,
838    {
839        self.execute_load_query_with(query, |load, plan| load.bytes(plan))
840    }
841
842    // Execute the fluent `bytes_by(field)` terminal at the session boundary.
843    pub(in crate::db) fn execute_fluent_bytes_by_slot<E>(
844        &self,
845        query: &Query<E>,
846        target_slot: FieldSlot,
847    ) -> Result<u64, QueryError>
848    where
849        E: PersistedRow<Canister = C> + EntityValue,
850    {
851        self.execute_load_query_with(query, move |load, plan| {
852            load.bytes_by_slot(plan, target_slot)
853        })
854    }
855
856    // Execute the fluent `take(k)` terminal at the session boundary.
857    pub(in crate::db) fn execute_fluent_take<E>(
858        &self,
859        query: &Query<E>,
860        take_count: u32,
861    ) -> Result<EntityResponse<E>, QueryError>
862    where
863        E: PersistedRow<Canister = C> + EntityValue,
864    {
865        self.execute_load_query_with(query, move |load, plan| load.take(plan, take_count))
866    }
867
868    // Execute one row-returning fluent top/bottom-k terminal at the session boundary.
869    pub(in crate::db) fn execute_fluent_ranked_rows_by_slot<E>(
870        &self,
871        query: &Query<E>,
872        target_slot: FieldSlot,
873        take_count: u32,
874        descending: bool,
875    ) -> Result<EntityResponse<E>, QueryError>
876    where
877        E: PersistedRow<Canister = C> + EntityValue,
878    {
879        self.execute_load_query_with(query, move |load, plan| {
880            if descending {
881                load.top_k_by_slot(plan, target_slot, take_count)
882            } else {
883                load.bottom_k_by_slot(plan, target_slot, take_count)
884            }
885        })
886    }
887
888    // Execute one value-returning fluent top/bottom-k terminal at the session boundary.
889    pub(in crate::db) fn execute_fluent_ranked_values_by_slot<E>(
890        &self,
891        query: &Query<E>,
892        target_slot: FieldSlot,
893        take_count: u32,
894        descending: bool,
895    ) -> Result<Vec<Value>, QueryError>
896    where
897        E: PersistedRow<Canister = C> + EntityValue,
898    {
899        self.execute_load_query_with(query, move |load, plan| {
900            if descending {
901                load.top_k_by_values_slot(plan, target_slot, take_count)
902            } else {
903                load.bottom_k_by_values_slot(plan, target_slot, take_count)
904            }
905        })
906    }
907
908    // Execute one id/value-returning fluent top/bottom-k terminal at the session boundary.
909    pub(in crate::db) fn execute_fluent_ranked_values_with_ids_by_slot<E>(
910        &self,
911        query: &Query<E>,
912        target_slot: FieldSlot,
913        take_count: u32,
914        descending: bool,
915    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
916    where
917        E: PersistedRow<Canister = C> + EntityValue,
918    {
919        self.execute_load_query_with(query, move |load, plan| {
920            if descending {
921                load.top_k_by_with_ids_slot(plan, target_slot, take_count)
922            } else {
923                load.bottom_k_by_with_ids_slot(plan, target_slot, take_count)
924            }
925        })
926    }
927
928    /// Build one trace payload for a query without executing it.
929    ///
930    /// This lightweight surface is intended for developer diagnostics:
931    /// plan hash, access strategy summary, and planner/executor route shape.
932    pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
933    where
934        E: EntityKind<Canister = C>,
935    {
936        let (prepared_plan, cache_attribution) =
937            self.cached_prepared_query_plan_for_entity::<E>(query)?;
938        let logical_plan = prepared_plan.logical_plan();
939        let explain = logical_plan.explain();
940        let plan_hash = logical_plan.fingerprint().to_string();
941        let executable_access = prepared_plan.access().executable_contract();
942        let access_strategy = summarize_executable_access_plan(&executable_access);
943        let execution_family = match query.mode() {
944            QueryMode::Load(_) => Some(trace_execution_family_from_executor(
945                prepared_plan
946                    .execution_family()
947                    .map_err(QueryError::execute)?,
948            )),
949            QueryMode::Delete(_) => None,
950        };
951        let reuse = query_plan_cache_reuse_event(cache_attribution);
952
953        Ok(QueryTracePlan::new(
954            plan_hash,
955            access_strategy,
956            execution_family,
957            reuse,
958            explain,
959        ))
960    }
961
962    /// Execute one scalar paged load query and return optional continuation cursor plus trace.
963    pub(crate) fn execute_load_query_paged_with_trace<E>(
964        &self,
965        query: &Query<E>,
966        cursor_token: Option<&str>,
967    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
968    where
969        E: PersistedRow<Canister = C> + EntityValue,
970    {
971        // Phase 1: build/validate prepared execution plan and reject grouped plans.
972        let plan = self.cached_prepared_query_plan_for_entity::<E>(query)?.0;
973        Self::ensure_scalar_paged_execution_family(
974            plan.execution_family().map_err(QueryError::execute)?,
975        )?;
976
977        // Phase 2: decode external cursor token and validate it against plan surface.
978        let cursor_bytes = decode_optional_cursor_token(cursor_token)
979            .map_err(QueryError::from_cursor_plan_error)?;
980        let cursor = plan
981            .prepare_cursor(cursor_bytes.as_deref())
982            .map_err(query_error_from_executor_plan_error)?;
983
984        // Phase 3: execute one traced page and encode outbound continuation token.
985        let (page, trace) = self
986            .with_metrics(|| {
987                self.load_executor::<E>()
988                    .execute_paged_with_cursor_traced(plan, cursor)
989            })
990            .map_err(QueryError::execute)?;
991        finalize_scalar_paged_execution(page, trace)
992    }
993
994    /// Execute one grouped query page with optional grouped continuation cursor.
995    ///
996    /// This is the explicit grouped execution boundary; scalar load APIs reject
997    /// grouped plans to preserve scalar response contracts.
998    pub(in crate::db) fn execute_grouped<E>(
999        &self,
1000        query: &Query<E>,
1001        cursor_token: Option<&str>,
1002    ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
1003    where
1004        E: PersistedRow<Canister = C> + EntityValue,
1005    {
1006        // Phase 1: build the prepared execution plan once from the typed query.
1007        let plan = self.cached_prepared_query_plan_for_entity::<E>(query)?.0;
1008
1009        // Phase 2: reuse the shared prepared grouped execution path and then
1010        // finalize the outward grouped payload at the session boundary.
1011        let (result, trace) = self.execute_grouped_plan_with_trace(plan, cursor_token)?;
1012
1013        finalize_structural_grouped_projection_result(result, trace)
1014    }
1015
1016    // Execute one grouped prepared plan page with optional grouped cursor
1017    // while letting the caller choose the final grouped-runtime dispatch.
1018    pub(in crate::db::session) fn execute_grouped_plan_with<E, T>(
1019        &self,
1020        plan: PreparedExecutionPlan<E>,
1021        cursor_token: Option<&str>,
1022        op: impl FnOnce(
1023            LoadExecutor<E>,
1024            PreparedExecutionPlan<E>,
1025            crate::db::cursor::GroupedPlannedCursor,
1026        ) -> Result<T, InternalError>,
1027    ) -> Result<T, QueryError>
1028    where
1029        E: PersistedRow<Canister = C> + EntityValue,
1030    {
1031        // Phase 1: validate the prepared plan shape before decoding cursors.
1032        Self::ensure_grouped_execution_family(
1033            plan.execution_family().map_err(QueryError::execute)?,
1034        )?;
1035
1036        // Phase 2: decode external grouped cursor token and validate against plan.
1037        let cursor = decode_optional_grouped_cursor_token(cursor_token)
1038            .map_err(QueryError::from_cursor_plan_error)?;
1039        let cursor = plan
1040            .prepare_grouped_cursor_token(cursor)
1041            .map_err(query_error_from_executor_plan_error)?;
1042
1043        // Phase 3: execute one grouped page while preserving the structural
1044        // grouped cursor payload for whichever outward cursor format the caller needs.
1045        self.with_metrics(|| op(self.load_executor::<E>(), plan, cursor))
1046            .map_err(QueryError::execute)
1047    }
1048
1049    // Execute one grouped prepared plan result with optional grouped cursor.
1050    pub(in crate::db::session) fn execute_grouped_plan_with_trace<E>(
1051        &self,
1052        plan: PreparedExecutionPlan<E>,
1053        cursor_token: Option<&str>,
1054    ) -> Result<(StructuralGroupedProjectionResult, Option<ExecutionTrace>), QueryError>
1055    where
1056        E: PersistedRow<Canister = C> + EntityValue,
1057    {
1058        self.execute_grouped_plan_with(plan, cursor_token, |executor, plan, cursor| {
1059            executor.execute_grouped_paged_with_cursor_traced(plan, cursor)
1060        })
1061    }
1062}