Skip to main content

icydb_core/db/session/
query.rs

1//! Module: db::session::query
2//! Responsibility: session-bound query planning, explain, and cursor execution
3//! helpers that recover store visibility before delegating to query-owned logic.
4//! Does not own: query intent construction or executor runtime semantics.
5//! Boundary: resolves session visibility and cursor policy before handing work to the planner/executor.
6
7mod cache;
8
9#[cfg(feature = "diagnostics")]
10use crate::db::diagnostics::measure_local_instruction_delta as measure_query_stage;
11#[cfg(feature = "diagnostics")]
12use crate::db::executor::{
13    GroupedCountAttribution, GroupedExecutePhaseAttribution, ScalarExecutePhaseAttribution,
14};
15use crate::{
16    db::{
17        DbSession, EntityResponse, LoadQueryResult, PagedGroupedExecutionWithTrace,
18        PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
19        TraceExecutionFamily,
20        access::summarize_executable_access_plan,
21        cursor::{
22            CursorPlanError, decode_optional_cursor_token, decode_optional_grouped_cursor_token,
23        },
24        diagnostics::ExecutionTrace,
25        executor::{
26            ExecutionFamily, ExecutorPlanError, LoadExecutor, PreparedExecutionPlan,
27            ScalarProjectionBoundaryOutput, ScalarProjectionBoundaryRequest,
28            ScalarTerminalBoundaryOutput, ScalarTerminalBoundaryRequest,
29            StructuralGroupedProjectionResult,
30        },
31        query::builder::{
32            PreparedFluentAggregateExplainStrategy,
33            PreparedFluentExistingRowsTerminalRuntimeRequest,
34            PreparedFluentExistingRowsTerminalStrategy, PreparedFluentNumericFieldStrategy,
35            PreparedFluentOrderSensitiveTerminalStrategy, PreparedFluentProjectionRuntimeRequest,
36            PreparedFluentProjectionStrategy, PreparedFluentScalarTerminalStrategy,
37        },
38        query::explain::{
39            ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
40        },
41        query::fluent::load::{FluentProjectionTerminalOutput, FluentScalarTerminalOutput},
42        query::{
43            intent::{CompiledQuery, PlannedQuery},
44            plan::{AccessPlannedQuery, FieldSlot, QueryMode, VisibleIndexes},
45        },
46        session::{finalize_scalar_paged_execution, finalize_structural_grouped_projection_result},
47    },
48    error::InternalError,
49    traits::{CanisterKind, EntityKind, EntityValue},
50    types::{Decimal, Id},
51    value::Value,
52};
53pub(in crate::db) use cache::QueryPlanCacheAttribution;
54#[cfg(test)]
55pub(in crate::db) use cache::QueryPlanVisibility;
56pub(in crate::db::session) use cache::query_plan_cache_reuse_event;
57#[cfg(feature = "diagnostics")]
58use candid::CandidType;
59#[cfg(feature = "diagnostics")]
60use serde::Deserialize;
61
62// Translate executor route-family selection into the query-owned trace label
63// at the session boundary so trace DTOs do not depend on executor types.
64const fn trace_execution_family_from_executor(family: ExecutionFamily) -> TraceExecutionFamily {
65    match family {
66        ExecutionFamily::PrimaryKey => TraceExecutionFamily::PrimaryKey,
67        ExecutionFamily::Ordered => TraceExecutionFamily::Ordered,
68        ExecutionFamily::Grouped => TraceExecutionFamily::Grouped,
69    }
70}
71
72// Convert executor plan-surface failures at the session boundary so query error
73// types do not import executor-owned error enums.
74pub(in crate::db::session) fn query_error_from_executor_plan_error(
75    err: ExecutorPlanError,
76) -> QueryError {
77    match err {
78        ExecutorPlanError::Cursor(err) => QueryError::from_cursor_plan_error(*err),
79    }
80}
81
82///
83/// QueryExecutionAttribution
84///
85/// QueryExecutionAttribution records the top-level compile/execute split for
86/// typed/fluent query execution at the session boundary.
87/// Every field is an additive counter where zero means no observed work or no
88/// observed event for that bucket. Future non-additive diagnostics must use an
89/// explicit presence type instead of relying on `Default`.
90///
91#[cfg(feature = "diagnostics")]
92#[derive(CandidType, Clone, Debug, Default, Deserialize, Eq, PartialEq)]
93pub struct QueryExecutionAttribution {
94    pub compile_local_instructions: u64,
95    pub plan_lookup_local_instructions: u64,
96    pub executor_invocation_local_instructions: u64,
97    pub response_finalization_local_instructions: u64,
98    pub runtime_local_instructions: u64,
99    pub finalize_local_instructions: u64,
100    pub direct_data_row_scan_local_instructions: u64,
101    pub direct_data_row_key_stream_local_instructions: u64,
102    pub direct_data_row_row_read_local_instructions: u64,
103    pub direct_data_row_key_encode_local_instructions: u64,
104    pub direct_data_row_store_get_local_instructions: u64,
105    pub direct_data_row_order_window_local_instructions: u64,
106    pub direct_data_row_page_window_local_instructions: u64,
107    pub grouped_stream_local_instructions: u64,
108    pub grouped_fold_local_instructions: u64,
109    pub grouped_finalize_local_instructions: u64,
110    pub grouped_count_borrowed_hash_computations: u64,
111    pub grouped_count_bucket_candidate_checks: u64,
112    pub grouped_count_existing_group_hits: u64,
113    pub grouped_count_new_group_inserts: u64,
114    pub grouped_count_row_materialization_local_instructions: u64,
115    pub grouped_count_group_lookup_local_instructions: u64,
116    pub grouped_count_existing_group_update_local_instructions: u64,
117    pub grouped_count_new_group_insert_local_instructions: u64,
118    pub response_decode_local_instructions: u64,
119    pub execute_local_instructions: u64,
120    pub total_local_instructions: u64,
121    pub shared_query_plan_cache_hits: u64,
122    pub shared_query_plan_cache_misses: u64,
123}
124
125#[cfg(feature = "diagnostics")]
126#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
127struct QueryExecutePhaseAttribution {
128    executor_invocation_local_instructions: u64,
129    response_finalization_local_instructions: u64,
130    runtime_local_instructions: u64,
131    finalize_local_instructions: u64,
132    direct_data_row_scan_local_instructions: u64,
133    direct_data_row_key_stream_local_instructions: u64,
134    direct_data_row_row_read_local_instructions: u64,
135    direct_data_row_key_encode_local_instructions: u64,
136    direct_data_row_store_get_local_instructions: u64,
137    direct_data_row_order_window_local_instructions: u64,
138    direct_data_row_page_window_local_instructions: u64,
139    grouped_stream_local_instructions: u64,
140    grouped_fold_local_instructions: u64,
141    grouped_finalize_local_instructions: u64,
142    grouped_count: GroupedCountAttribution,
143}
144
145impl<C: CanisterKind> DbSession<C> {
146    #[cfg(feature = "diagnostics")]
147    const fn empty_query_execute_phase_attribution() -> QueryExecutePhaseAttribution {
148        QueryExecutePhaseAttribution {
149            executor_invocation_local_instructions: 0,
150            response_finalization_local_instructions: 0,
151            runtime_local_instructions: 0,
152            finalize_local_instructions: 0,
153            direct_data_row_scan_local_instructions: 0,
154            direct_data_row_key_stream_local_instructions: 0,
155            direct_data_row_row_read_local_instructions: 0,
156            direct_data_row_key_encode_local_instructions: 0,
157            direct_data_row_store_get_local_instructions: 0,
158            direct_data_row_order_window_local_instructions: 0,
159            direct_data_row_page_window_local_instructions: 0,
160            grouped_stream_local_instructions: 0,
161            grouped_fold_local_instructions: 0,
162            grouped_finalize_local_instructions: 0,
163            grouped_count: GroupedCountAttribution::none(),
164        }
165    }
166
167    #[cfg(feature = "diagnostics")]
168    const fn scalar_query_execute_phase_attribution(
169        phase: ScalarExecutePhaseAttribution,
170        executor_invocation_local_instructions: u64,
171    ) -> QueryExecutePhaseAttribution {
172        QueryExecutePhaseAttribution {
173            executor_invocation_local_instructions,
174            response_finalization_local_instructions: 0,
175            runtime_local_instructions: phase.runtime_local_instructions,
176            finalize_local_instructions: phase.finalize_local_instructions,
177            direct_data_row_scan_local_instructions: phase.direct_data_row_scan_local_instructions,
178            direct_data_row_key_stream_local_instructions: phase
179                .direct_data_row_key_stream_local_instructions,
180            direct_data_row_row_read_local_instructions: phase
181                .direct_data_row_row_read_local_instructions,
182            direct_data_row_key_encode_local_instructions: phase
183                .direct_data_row_key_encode_local_instructions,
184            direct_data_row_store_get_local_instructions: phase
185                .direct_data_row_store_get_local_instructions,
186            direct_data_row_order_window_local_instructions: phase
187                .direct_data_row_order_window_local_instructions,
188            direct_data_row_page_window_local_instructions: phase
189                .direct_data_row_page_window_local_instructions,
190            grouped_stream_local_instructions: 0,
191            grouped_fold_local_instructions: 0,
192            grouped_finalize_local_instructions: 0,
193            grouped_count: GroupedCountAttribution::none(),
194        }
195    }
196
197    #[cfg(feature = "diagnostics")]
198    const fn grouped_query_execute_phase_attribution(
199        phase: GroupedExecutePhaseAttribution,
200        executor_invocation_local_instructions: u64,
201        response_finalization_local_instructions: u64,
202    ) -> QueryExecutePhaseAttribution {
203        QueryExecutePhaseAttribution {
204            executor_invocation_local_instructions,
205            response_finalization_local_instructions,
206            runtime_local_instructions: phase
207                .stream_local_instructions
208                .saturating_add(phase.fold_local_instructions),
209            finalize_local_instructions: phase.finalize_local_instructions,
210            direct_data_row_scan_local_instructions: 0,
211            direct_data_row_key_stream_local_instructions: 0,
212            direct_data_row_row_read_local_instructions: 0,
213            direct_data_row_key_encode_local_instructions: 0,
214            direct_data_row_store_get_local_instructions: 0,
215            direct_data_row_order_window_local_instructions: 0,
216            direct_data_row_page_window_local_instructions: 0,
217            grouped_stream_local_instructions: phase.stream_local_instructions,
218            grouped_fold_local_instructions: phase.fold_local_instructions,
219            grouped_finalize_local_instructions: phase.finalize_local_instructions,
220            grouped_count: phase.grouped_count,
221        }
222    }
223
224    // Compile one typed query using only the indexes currently visible for the
225    // query's recovered store.
226    pub(in crate::db) fn compile_query_with_visible_indexes<E>(
227        &self,
228        query: &Query<E>,
229    ) -> Result<CompiledQuery<E>, QueryError>
230    where
231        E: EntityKind<Canister = C>,
232    {
233        self.map_cached_shared_query_plan_for_entity(query, CompiledQuery::<E>::from_plan)
234    }
235
236    // Build one logical planned-query shell using only the indexes currently
237    // visible for the query's recovered store.
238    pub(in crate::db) fn planned_query_with_visible_indexes<E>(
239        &self,
240        query: &Query<E>,
241    ) -> Result<PlannedQuery<E>, QueryError>
242    where
243        E: EntityKind<Canister = C>,
244    {
245        self.map_cached_shared_query_plan_for_entity(query, PlannedQuery::<E>::from_plan)
246    }
247
248    // Reuse the shared prepared-plan cache for read-only query diagnostics
249    // that only need the logical access-planned contract.
250    fn cached_logical_query_plan<E>(
251        &self,
252        query: &Query<E>,
253    ) -> Result<AccessPlannedQuery, QueryError>
254    where
255        E: EntityKind<Canister = C>,
256    {
257        let (prepared_plan, _) = self.cached_shared_query_plan_for_entity::<E>(query)?;
258
259        Ok(prepared_plan.logical_plan().clone())
260    }
261
262    // Reuse the same cached logical plan as execution explain, then freeze the
263    // explain-only access-choice facts for the effective session-visible index
264    // slice before route facts are assembled.
265    fn cached_finalized_execution_explain_plan<E>(
266        &self,
267        query: &Query<E>,
268        visible_indexes: &VisibleIndexes<'_>,
269    ) -> Result<(AccessPlannedQuery, QueryPlanCacheAttribution), QueryError>
270    where
271        E: EntityKind<Canister = C>,
272    {
273        let (prepared_plan, cache_attribution) =
274            self.cached_shared_query_plan_for_entity::<E>(query)?;
275        let mut plan = prepared_plan.logical_plan().clone();
276
277        plan.finalize_access_choice_for_model_with_indexes(
278            query.structural().model(),
279            visible_indexes.as_slice(),
280        );
281
282        Ok((plan, cache_attribution))
283    }
284
285    // Project one logical explain payload using only planner-visible indexes.
286    pub(in crate::db) fn explain_query_with_visible_indexes<E>(
287        &self,
288        query: &Query<E>,
289    ) -> Result<ExplainPlan, QueryError>
290    where
291        E: EntityKind<Canister = C>,
292    {
293        let plan = self.cached_logical_query_plan(query)?;
294
295        Ok(plan.explain())
296    }
297
298    // Hash one typed query plan using only the indexes currently visible for
299    // the query's recovered store.
300    pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
301        &self,
302        query: &Query<E>,
303    ) -> Result<String, QueryError>
304    where
305        E: EntityKind<Canister = C>,
306    {
307        let plan = self.cached_logical_query_plan(query)?;
308
309        Ok(plan.fingerprint().to_string())
310    }
311
312    // Explain one load execution shape using only planner-visible
313    // indexes from the recovered store state.
314    pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
315        &self,
316        query: &Query<E>,
317    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
318    where
319        E: EntityValue + EntityKind<Canister = C>,
320    {
321        self.with_query_visible_indexes(query, |query, visible_indexes| {
322            let (plan, _) =
323                self.cached_finalized_execution_explain_plan::<E>(query, visible_indexes)?;
324
325            query
326                .structural()
327                .explain_execution_descriptor_from_plan(&plan)
328        })
329    }
330
331    // Render one load execution descriptor plus route diagnostics using
332    // only planner-visible indexes from the recovered store state.
333    pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
334        &self,
335        query: &Query<E>,
336    ) -> Result<String, QueryError>
337    where
338        E: EntityValue + EntityKind<Canister = C>,
339    {
340        self.with_query_visible_indexes(query, |query, visible_indexes| {
341            let (plan, cache_attribution) =
342                self.cached_finalized_execution_explain_plan::<E>(query, visible_indexes)?;
343
344            query
345                .structural()
346                .finalized_execution_diagnostics_from_plan_with_descriptor_mutator(
347                    &plan,
348                    Some(query_plan_cache_reuse_event(cache_attribution)),
349                    |_| {},
350                )
351                .map(|diagnostics| diagnostics.render_text_verbose())
352        })
353    }
354
355    // Explain one prepared fluent aggregate terminal using only
356    // planner-visible indexes from the recovered store state.
357    pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
358        &self,
359        query: &Query<E>,
360        strategy: &S,
361    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
362    where
363        E: EntityValue + EntityKind<Canister = C>,
364        S: PreparedFluentAggregateExplainStrategy,
365    {
366        self.with_query_visible_indexes(query, |query, visible_indexes| {
367            query
368                .explain_prepared_aggregate_terminal_with_visible_indexes(visible_indexes, strategy)
369        })
370    }
371
372    // Explain one `bytes_by(field)` terminal using only planner-visible
373    // indexes from the recovered store state.
374    pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
375        &self,
376        query: &Query<E>,
377        target_field: &str,
378    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
379    where
380        E: EntityValue + EntityKind<Canister = C>,
381    {
382        self.with_query_visible_indexes(query, |query, visible_indexes| {
383            query.explain_bytes_by_with_visible_indexes(visible_indexes, target_field)
384        })
385    }
386
387    // Explain one prepared fluent projection terminal using only
388    // planner-visible indexes from the recovered store state.
389    pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
390        &self,
391        query: &Query<E>,
392        strategy: &PreparedFluentProjectionStrategy,
393    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
394    where
395        E: EntityValue + EntityKind<Canister = C>,
396    {
397        self.with_query_visible_indexes(query, |query, visible_indexes| {
398            query.explain_prepared_projection_terminal_with_visible_indexes(
399                visible_indexes,
400                strategy,
401            )
402        })
403    }
404
405    // Validate that one execution strategy is admissible for scalar paged load
406    // execution and fail closed on grouped/primary-key-only routes.
407    fn ensure_scalar_paged_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
408        match family {
409            ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
410                CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
411            )),
412            ExecutionFamily::Ordered => Ok(()),
413            ExecutionFamily::Grouped => Err(QueryError::invariant(
414                "grouped queries execute via execute(), not page().execute()",
415            )),
416        }
417    }
418
419    // Validate that one execution strategy is admissible for the grouped
420    // execution surface.
421    fn ensure_grouped_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
422        match family {
423            ExecutionFamily::Grouped => Ok(()),
424            ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
425                "grouped execution requires grouped logical plans",
426            )),
427        }
428    }
429
430    /// Execute one scalar load/delete query and return materialized response rows.
431    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
432    where
433        E: PersistedRow<Canister = C> + EntityValue,
434    {
435        // Phase 1: compile typed intent into one prepared execution-plan contract.
436        let mode = query.mode();
437        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
438
439        // Phase 2: delegate execution to the shared compiled-plan entry path.
440        self.execute_query_dyn(mode, plan)
441    }
442
443    /// Execute one typed query while reporting the compile/execute split at
444    /// the shared fluent query seam.
445    #[cfg(feature = "diagnostics")]
446    #[doc(hidden)]
447    #[expect(
448        clippy::too_many_lines,
449        reason = "the diagnostics-only attribution path keeps grouped and scalar execution on one explicit compile/execute accounting seam"
450    )]
451    #[expect(
452        clippy::needless_update,
453        reason = "diagnostics attribution literals stay default-backed so future counters do not break every initializer"
454    )]
455    pub fn execute_query_result_with_attribution<E>(
456        &self,
457        query: &Query<E>,
458    ) -> Result<(LoadQueryResult<E>, QueryExecutionAttribution), QueryError>
459    where
460        E: PersistedRow<Canister = C> + EntityValue,
461    {
462        // Phase 1: measure compile work at the typed/fluent boundary,
463        // including the shared lower query-plan cache lookup/build exactly
464        // once. This preserves honest hit/miss attribution without
465        // double-building plans on one-shot cache misses.
466        let (plan_lookup_local_instructions, plan_and_cache) =
467            measure_query_stage(|| self.cached_prepared_query_plan_for_entity::<E>(query));
468        let (plan, cache_attribution) = plan_and_cache?;
469        let compile_local_instructions = plan_lookup_local_instructions;
470
471        // Phase 2: execute one query result using the prepared plan produced
472        // by the compile/cache boundary above.
473        let result =
474            || -> Result<(LoadQueryResult<E>, QueryExecutePhaseAttribution, u64), QueryError> {
475                if query.has_grouping() {
476                    let (executor_invocation_local_instructions, grouped_page) =
477                        measure_query_stage(|| {
478                            self.execute_grouped_plan_with(plan, None, |executor, plan, cursor| {
479                                executor
480                                    .execute_grouped_paged_with_cursor_traced_with_phase_attribution(
481                                        plan, cursor,
482                                    )
483                            })
484                        });
485                    let (result, trace, phase_attribution) = grouped_page?;
486                    let (response_finalization_local_instructions, grouped) =
487                        measure_query_stage(|| {
488                            finalize_structural_grouped_projection_result(result, trace)
489                        });
490                    let grouped = grouped?;
491
492                    Ok((
493                        LoadQueryResult::Grouped(grouped),
494                        Self::grouped_query_execute_phase_attribution(
495                            phase_attribution,
496                            executor_invocation_local_instructions,
497                            response_finalization_local_instructions,
498                        ),
499                        0,
500                    ))
501                } else {
502                    match query.mode() {
503                        QueryMode::Load(_) => {
504                            let (executor_invocation_local_instructions, executed) =
505                                measure_query_stage(|| {
506                                    self.load_executor::<E>()
507                                        .execute_with_phase_attribution(plan)
508                                        .map_err(QueryError::execute)
509                                });
510                            let (rows, phase_attribution, response_decode_local_instructions) =
511                                executed?;
512
513                            Ok((
514                                LoadQueryResult::Rows(rows),
515                                Self::scalar_query_execute_phase_attribution(
516                                    phase_attribution,
517                                    executor_invocation_local_instructions,
518                                ),
519                                response_decode_local_instructions,
520                            ))
521                        }
522                        QueryMode::Delete(_) => {
523                            let (executor_invocation_local_instructions, result) =
524                                measure_query_stage(|| self.execute_query_dyn(query.mode(), plan));
525                            let result = result?;
526
527                            Ok((
528                                LoadQueryResult::Rows(result),
529                                QueryExecutePhaseAttribution {
530                                    executor_invocation_local_instructions,
531                                    ..Self::empty_query_execute_phase_attribution()
532                                },
533                                0,
534                            ))
535                        }
536                    }
537                }
538            }();
539        let (result, execute_phase_attribution, response_decode_local_instructions) = result?;
540        let execute_local_instructions = execute_phase_attribution
541            .executor_invocation_local_instructions
542            .saturating_add(execute_phase_attribution.response_finalization_local_instructions);
543        let total_local_instructions =
544            compile_local_instructions.saturating_add(execute_local_instructions);
545
546        Ok((
547            result,
548            QueryExecutionAttribution {
549                compile_local_instructions,
550                plan_lookup_local_instructions,
551                executor_invocation_local_instructions: execute_phase_attribution
552                    .executor_invocation_local_instructions,
553                response_finalization_local_instructions: execute_phase_attribution
554                    .response_finalization_local_instructions,
555                runtime_local_instructions: execute_phase_attribution.runtime_local_instructions,
556                finalize_local_instructions: execute_phase_attribution.finalize_local_instructions,
557                direct_data_row_scan_local_instructions: execute_phase_attribution
558                    .direct_data_row_scan_local_instructions,
559                direct_data_row_key_stream_local_instructions: execute_phase_attribution
560                    .direct_data_row_key_stream_local_instructions,
561                direct_data_row_row_read_local_instructions: execute_phase_attribution
562                    .direct_data_row_row_read_local_instructions,
563                direct_data_row_key_encode_local_instructions: execute_phase_attribution
564                    .direct_data_row_key_encode_local_instructions,
565                direct_data_row_store_get_local_instructions: execute_phase_attribution
566                    .direct_data_row_store_get_local_instructions,
567                direct_data_row_order_window_local_instructions: execute_phase_attribution
568                    .direct_data_row_order_window_local_instructions,
569                direct_data_row_page_window_local_instructions: execute_phase_attribution
570                    .direct_data_row_page_window_local_instructions,
571                grouped_stream_local_instructions: execute_phase_attribution
572                    .grouped_stream_local_instructions,
573                grouped_fold_local_instructions: execute_phase_attribution
574                    .grouped_fold_local_instructions,
575                grouped_finalize_local_instructions: execute_phase_attribution
576                    .grouped_finalize_local_instructions,
577                grouped_count_borrowed_hash_computations: execute_phase_attribution
578                    .grouped_count
579                    .borrowed_hash_computations,
580                grouped_count_bucket_candidate_checks: execute_phase_attribution
581                    .grouped_count
582                    .bucket_candidate_checks,
583                grouped_count_existing_group_hits: execute_phase_attribution
584                    .grouped_count
585                    .existing_group_hits,
586                grouped_count_new_group_inserts: execute_phase_attribution
587                    .grouped_count
588                    .new_group_inserts,
589                grouped_count_row_materialization_local_instructions: execute_phase_attribution
590                    .grouped_count
591                    .row_materialization_local_instructions,
592                grouped_count_group_lookup_local_instructions: execute_phase_attribution
593                    .grouped_count
594                    .group_lookup_local_instructions,
595                grouped_count_existing_group_update_local_instructions: execute_phase_attribution
596                    .grouped_count
597                    .existing_group_update_local_instructions,
598                grouped_count_new_group_insert_local_instructions: execute_phase_attribution
599                    .grouped_count
600                    .new_group_insert_local_instructions,
601                response_decode_local_instructions,
602                execute_local_instructions,
603                total_local_instructions,
604                shared_query_plan_cache_hits: cache_attribution.hits,
605                shared_query_plan_cache_misses: cache_attribution.misses,
606                ..QueryExecutionAttribution::default()
607            },
608        ))
609    }
610
611    // Execute one typed query through the unified row/grouped result surface so
612    // higher layers do not need to branch on grouped shape themselves.
613    #[doc(hidden)]
614    pub fn execute_query_result<E>(
615        &self,
616        query: &Query<E>,
617    ) -> Result<LoadQueryResult<E>, QueryError>
618    where
619        E: PersistedRow<Canister = C> + EntityValue,
620    {
621        if query.has_grouping() {
622            return self
623                .execute_grouped(query, None)
624                .map(LoadQueryResult::Grouped);
625        }
626
627        self.execute_query(query).map(LoadQueryResult::Rows)
628    }
629
630    /// Execute one typed delete query and return only the affected-row count.
631    #[doc(hidden)]
632    pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
633    where
634        E: PersistedRow<Canister = C> + EntityValue,
635    {
636        // Phase 1: fail closed if the caller routes a non-delete query here.
637        if !query.mode().is_delete() {
638            return Err(QueryError::unsupported_query(
639                "delete count execution requires delete query mode",
640            ));
641        }
642
643        // Phase 2: resolve one cached prepared execution-plan contract directly
644        // from the shared lower boundary instead of rebuilding it through the
645        // typed compiled-query wrapper.
646        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
647
648        // Phase 3: execute the shared delete core while skipping response-row materialization.
649        self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
650            .map_err(QueryError::execute)
651    }
652
653    /// Execute one scalar query from one pre-built prepared execution contract.
654    ///
655    /// This is the shared compiled-plan entry boundary used by the typed
656    /// `execute_query(...)` surface and adjacent query execution facades.
657    pub(in crate::db) fn execute_query_dyn<E>(
658        &self,
659        mode: QueryMode,
660        plan: PreparedExecutionPlan<E>,
661    ) -> Result<EntityResponse<E>, QueryError>
662    where
663        E: PersistedRow<Canister = C> + EntityValue,
664    {
665        let result = match mode {
666            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
667            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
668        };
669
670        result.map_err(QueryError::execute)
671    }
672
673    // Shared load-query terminal wrapper: build plan, run under metrics, map
674    // execution errors into query-facing errors.
675    pub(in crate::db) fn execute_load_query_with<E, T>(
676        &self,
677        query: &Query<E>,
678        op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
679    ) -> Result<T, QueryError>
680    where
681        E: PersistedRow<Canister = C> + EntityValue,
682    {
683        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
684
685        self.with_metrics(|| op(self.load_executor::<E>(), plan))
686            .map_err(QueryError::execute)
687    }
688
689    // Execute one scalar terminal boundary and keep the executor-specific
690    // request/output types contained inside the session adapter.
691    fn execute_scalar_terminal_boundary<E>(
692        &self,
693        query: &Query<E>,
694        request: ScalarTerminalBoundaryRequest,
695    ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
696    where
697        E: PersistedRow<Canister = C> + EntityValue,
698    {
699        self.execute_load_query_with(query, move |load, plan| {
700            load.execute_scalar_terminal_request(plan, request)
701        })
702    }
703
704    // Execute one projection terminal boundary and keep field projection
705    // executor details out of fluent query modules.
706    fn execute_scalar_projection_boundary<E>(
707        &self,
708        query: &Query<E>,
709        target_field: FieldSlot,
710        request: ScalarProjectionBoundaryRequest,
711    ) -> Result<ScalarProjectionBoundaryOutput, QueryError>
712    where
713        E: PersistedRow<Canister = C> + EntityValue,
714    {
715        self.execute_load_query_with(query, move |load, plan| {
716            load.execute_scalar_projection_boundary(plan, target_field, request)
717        })
718    }
719
720    // Execute one fluent count/exists terminal through a query-owned result
721    // shape so fluent terminals do not import executor aggregate outputs.
722    pub(in crate::db) fn execute_fluent_existing_rows_terminal<E>(
723        &self,
724        query: &Query<E>,
725        strategy: PreparedFluentExistingRowsTerminalStrategy,
726    ) -> Result<FluentScalarTerminalOutput<E>, QueryError>
727    where
728        E: PersistedRow<Canister = C> + EntityValue,
729    {
730        let (request, output_shape) = strategy.into_executor_request();
731        let output = self.execute_scalar_terminal_boundary(query, request)?;
732
733        match output_shape {
734            PreparedFluentExistingRowsTerminalRuntimeRequest::CountRows => output
735                .into_count()
736                .map(FluentScalarTerminalOutput::Count)
737                .map_err(QueryError::execute),
738            PreparedFluentExistingRowsTerminalRuntimeRequest::ExistsRows => output
739                .into_exists()
740                .map(FluentScalarTerminalOutput::Exists)
741                .map_err(QueryError::execute),
742        }
743    }
744
745    // Execute one fluent id/extrema terminal through a query-owned result
746    // shape after the session adapter has decoded storage keys into typed ids.
747    pub(in crate::db) fn execute_fluent_scalar_terminal<E>(
748        &self,
749        query: &Query<E>,
750        strategy: PreparedFluentScalarTerminalStrategy,
751    ) -> Result<FluentScalarTerminalOutput<E>, QueryError>
752    where
753        E: PersistedRow<Canister = C> + EntityValue,
754    {
755        let request = strategy.into_executor_request();
756
757        self.execute_scalar_terminal_boundary(query, request)?
758            .into_id::<E>()
759            .map(FluentScalarTerminalOutput::Id)
760            .map_err(QueryError::execute)
761    }
762
763    // Execute one fluent order-sensitive terminal through the session adapter.
764    // The min/max pair request remains distinguished because it returns two ids.
765    pub(in crate::db) fn execute_fluent_order_sensitive_terminal<E>(
766        &self,
767        query: &Query<E>,
768        strategy: PreparedFluentOrderSensitiveTerminalStrategy,
769    ) -> Result<FluentScalarTerminalOutput<E>, QueryError>
770    where
771        E: PersistedRow<Canister = C> + EntityValue,
772    {
773        let (request, returns_id_pair) = strategy.into_executor_request();
774        let output = self.execute_scalar_terminal_boundary(query, request)?;
775
776        if returns_id_pair {
777            return output
778                .into_id_pair::<E>()
779                .map(FluentScalarTerminalOutput::IdPair)
780                .map_err(QueryError::execute);
781        }
782
783        output
784            .into_id::<E>()
785            .map(FluentScalarTerminalOutput::Id)
786            .map_err(QueryError::execute)
787    }
788
789    // Execute one fluent numeric-field terminal through the session-owned
790    // request conversion layer.
791    pub(in crate::db) fn execute_fluent_numeric_field_terminal<E>(
792        &self,
793        query: &Query<E>,
794        strategy: PreparedFluentNumericFieldStrategy,
795    ) -> Result<Option<Decimal>, QueryError>
796    where
797        E: PersistedRow<Canister = C> + EntityValue,
798    {
799        let (target_field, request) = strategy.into_executor_request();
800
801        self.execute_load_query_with(query, move |load, plan| {
802            load.execute_numeric_field_boundary(plan, target_field, request)
803        })
804    }
805
806    // Execute one fluent projection terminal through a query-owned output
807    // shape after the session adapter has decoded any data keys into typed ids.
808    pub(in crate::db) fn execute_fluent_projection_terminal<E>(
809        &self,
810        query: &Query<E>,
811        strategy: PreparedFluentProjectionStrategy,
812    ) -> Result<FluentProjectionTerminalOutput<E>, QueryError>
813    where
814        E: PersistedRow<Canister = C> + EntityValue,
815    {
816        let (target_field, request, output_shape) = strategy.into_executor_request();
817        let output = self.execute_scalar_projection_boundary(query, target_field, request)?;
818
819        match output_shape {
820            PreparedFluentProjectionRuntimeRequest::Values
821            | PreparedFluentProjectionRuntimeRequest::DistinctValues => output
822                .into_values()
823                .map(FluentProjectionTerminalOutput::Values)
824                .map_err(QueryError::execute),
825            PreparedFluentProjectionRuntimeRequest::CountDistinct => output
826                .into_count()
827                .map(FluentProjectionTerminalOutput::Count)
828                .map_err(QueryError::execute),
829            PreparedFluentProjectionRuntimeRequest::ValuesWithIds => output
830                .into_values_with_ids::<E>()
831                .map(FluentProjectionTerminalOutput::ValuesWithIds)
832                .map_err(QueryError::execute),
833            PreparedFluentProjectionRuntimeRequest::TerminalValue { .. } => output
834                .into_terminal_value()
835                .map(FluentProjectionTerminalOutput::TerminalValue)
836                .map_err(QueryError::execute),
837        }
838    }
839
840    // Execute the fluent `bytes()` terminal without leaking `LoadExecutor`
841    // closure assembly into query fluent code.
842    pub(in crate::db) fn execute_fluent_bytes<E>(&self, query: &Query<E>) -> Result<u64, QueryError>
843    where
844        E: PersistedRow<Canister = C> + EntityValue,
845    {
846        self.execute_load_query_with(query, |load, plan| load.bytes(plan))
847    }
848
849    // Execute the fluent `bytes_by(field)` terminal at the session boundary.
850    pub(in crate::db) fn execute_fluent_bytes_by_slot<E>(
851        &self,
852        query: &Query<E>,
853        target_slot: FieldSlot,
854    ) -> Result<u64, QueryError>
855    where
856        E: PersistedRow<Canister = C> + EntityValue,
857    {
858        self.execute_load_query_with(query, move |load, plan| {
859            load.bytes_by_slot(plan, target_slot)
860        })
861    }
862
863    // Execute the fluent `take(k)` terminal at the session boundary.
864    pub(in crate::db) fn execute_fluent_take<E>(
865        &self,
866        query: &Query<E>,
867        take_count: u32,
868    ) -> Result<EntityResponse<E>, QueryError>
869    where
870        E: PersistedRow<Canister = C> + EntityValue,
871    {
872        self.execute_load_query_with(query, move |load, plan| load.take(plan, take_count))
873    }
874
875    // Execute one row-returning fluent top/bottom-k terminal at the session boundary.
876    pub(in crate::db) fn execute_fluent_ranked_rows_by_slot<E>(
877        &self,
878        query: &Query<E>,
879        target_slot: FieldSlot,
880        take_count: u32,
881        descending: bool,
882    ) -> Result<EntityResponse<E>, QueryError>
883    where
884        E: PersistedRow<Canister = C> + EntityValue,
885    {
886        self.execute_load_query_with(query, move |load, plan| {
887            if descending {
888                load.top_k_by_slot(plan, target_slot, take_count)
889            } else {
890                load.bottom_k_by_slot(plan, target_slot, take_count)
891            }
892        })
893    }
894
895    // Execute one value-returning fluent top/bottom-k terminal at the session boundary.
896    pub(in crate::db) fn execute_fluent_ranked_values_by_slot<E>(
897        &self,
898        query: &Query<E>,
899        target_slot: FieldSlot,
900        take_count: u32,
901        descending: bool,
902    ) -> Result<Vec<Value>, QueryError>
903    where
904        E: PersistedRow<Canister = C> + EntityValue,
905    {
906        self.execute_load_query_with(query, move |load, plan| {
907            if descending {
908                load.top_k_by_values_slot(plan, target_slot, take_count)
909            } else {
910                load.bottom_k_by_values_slot(plan, target_slot, take_count)
911            }
912        })
913    }
914
915    // Execute one id/value-returning fluent top/bottom-k terminal at the session boundary.
916    pub(in crate::db) fn execute_fluent_ranked_values_with_ids_by_slot<E>(
917        &self,
918        query: &Query<E>,
919        target_slot: FieldSlot,
920        take_count: u32,
921        descending: bool,
922    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
923    where
924        E: PersistedRow<Canister = C> + EntityValue,
925    {
926        self.execute_load_query_with(query, move |load, plan| {
927            if descending {
928                load.top_k_by_with_ids_slot(plan, target_slot, take_count)
929            } else {
930                load.bottom_k_by_with_ids_slot(plan, target_slot, take_count)
931            }
932        })
933    }
934
935    /// Build one trace payload for a query without executing it.
936    ///
937    /// This lightweight surface is intended for developer diagnostics:
938    /// plan hash, access strategy summary, and planner/executor route shape.
939    pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
940    where
941        E: EntityKind<Canister = C>,
942    {
943        let (prepared_plan, cache_attribution) =
944            self.cached_prepared_query_plan_for_entity::<E>(query)?;
945        let logical_plan = prepared_plan.logical_plan();
946        let explain = logical_plan.explain();
947        let plan_hash = logical_plan.fingerprint().to_string();
948        let executable_access = prepared_plan.access().executable_contract();
949        let access_strategy = summarize_executable_access_plan(&executable_access);
950        let execution_family = match query.mode() {
951            QueryMode::Load(_) => Some(trace_execution_family_from_executor(
952                prepared_plan
953                    .execution_family()
954                    .map_err(QueryError::execute)?,
955            )),
956            QueryMode::Delete(_) => None,
957        };
958        let reuse = query_plan_cache_reuse_event(cache_attribution);
959
960        Ok(QueryTracePlan::new(
961            plan_hash,
962            access_strategy,
963            execution_family,
964            reuse,
965            explain,
966        ))
967    }
968
969    /// Execute one scalar paged load query and return optional continuation cursor plus trace.
970    pub(crate) fn execute_load_query_paged_with_trace<E>(
971        &self,
972        query: &Query<E>,
973        cursor_token: Option<&str>,
974    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
975    where
976        E: PersistedRow<Canister = C> + EntityValue,
977    {
978        // Phase 1: build/validate prepared execution plan and reject grouped plans.
979        let plan = self.cached_prepared_query_plan_for_entity::<E>(query)?.0;
980        Self::ensure_scalar_paged_execution_family(
981            plan.execution_family().map_err(QueryError::execute)?,
982        )?;
983
984        // Phase 2: decode external cursor token and validate it against plan surface.
985        let cursor_bytes = decode_optional_cursor_token(cursor_token)
986            .map_err(QueryError::from_cursor_plan_error)?;
987        let cursor = plan
988            .prepare_cursor(cursor_bytes.as_deref())
989            .map_err(query_error_from_executor_plan_error)?;
990
991        // Phase 3: execute one traced page and encode outbound continuation token.
992        let (page, trace) = self
993            .with_metrics(|| {
994                self.load_executor::<E>()
995                    .execute_paged_with_cursor_traced(plan, cursor)
996            })
997            .map_err(QueryError::execute)?;
998        finalize_scalar_paged_execution(page, trace)
999    }
1000
1001    /// Execute one grouped query page with optional grouped continuation cursor.
1002    ///
1003    /// This is the explicit grouped execution boundary; scalar load APIs reject
1004    /// grouped plans to preserve scalar response contracts.
1005    pub(in crate::db) fn execute_grouped<E>(
1006        &self,
1007        query: &Query<E>,
1008        cursor_token: Option<&str>,
1009    ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
1010    where
1011        E: PersistedRow<Canister = C> + EntityValue,
1012    {
1013        // Phase 1: build the prepared execution plan once from the typed query.
1014        let plan = self.cached_prepared_query_plan_for_entity::<E>(query)?.0;
1015
1016        // Phase 2: reuse the shared prepared grouped execution path and then
1017        // finalize the outward grouped payload at the session boundary.
1018        let (result, trace) = self.execute_grouped_plan_with_trace(plan, cursor_token)?;
1019
1020        finalize_structural_grouped_projection_result(result, trace)
1021    }
1022
1023    // Execute one grouped prepared plan page with optional grouped cursor
1024    // while letting the caller choose the final grouped-runtime dispatch.
1025    fn execute_grouped_plan_with<E, T>(
1026        &self,
1027        plan: PreparedExecutionPlan<E>,
1028        cursor_token: Option<&str>,
1029        op: impl FnOnce(
1030            LoadExecutor<E>,
1031            PreparedExecutionPlan<E>,
1032            crate::db::cursor::GroupedPlannedCursor,
1033        ) -> Result<T, InternalError>,
1034    ) -> Result<T, QueryError>
1035    where
1036        E: PersistedRow<Canister = C> + EntityValue,
1037    {
1038        // Phase 1: validate the prepared plan shape before decoding cursors.
1039        Self::ensure_grouped_execution_family(
1040            plan.execution_family().map_err(QueryError::execute)?,
1041        )?;
1042
1043        // Phase 2: decode external grouped cursor token and validate against plan.
1044        let cursor = decode_optional_grouped_cursor_token(cursor_token)
1045            .map_err(QueryError::from_cursor_plan_error)?;
1046        let cursor = plan
1047            .prepare_grouped_cursor_token(cursor)
1048            .map_err(query_error_from_executor_plan_error)?;
1049
1050        // Phase 3: execute one grouped page while preserving the structural
1051        // grouped cursor payload for whichever outward cursor format the caller needs.
1052        self.with_metrics(|| op(self.load_executor::<E>(), plan, cursor))
1053            .map_err(QueryError::execute)
1054    }
1055
1056    // Execute one grouped prepared plan result with optional grouped cursor.
1057    fn execute_grouped_plan_with_trace<E>(
1058        &self,
1059        plan: PreparedExecutionPlan<E>,
1060        cursor_token: Option<&str>,
1061    ) -> Result<(StructuralGroupedProjectionResult, Option<ExecutionTrace>), QueryError>
1062    where
1063        E: PersistedRow<Canister = C> + EntityValue,
1064    {
1065        self.execute_grouped_plan_with(plan, cursor_token, |executor, plan, cursor| {
1066            executor.execute_grouped_paged_with_cursor_traced(plan, cursor)
1067        })
1068    }
1069}