Skip to main content

icydb_core/db/session/query/
execution.rs

1//! Module: db::session::query::execution
2//! Responsibility: canonical query execution dispatch and executor error mapping.
3//! Does not own: diagnostics attribution, cursor decoding, fluent adaptation, or explain surfaces.
4//! Boundary: maps prepared plans into executor calls and query-facing response/error types.
5
6#[cfg(feature = "diagnostics")]
7use crate::db::executor::{GroupedExecutePhaseAttribution, ScalarExecutePhaseAttribution};
8use crate::{
9    db::{
10        DbSession, EntityResponse, LoadQueryResult, PersistedRow, Query, QueryError,
11        cursor::CursorPlanError,
12        diagnostics::ExecutionTrace,
13        executor::{
14            ExecutionFamily, ExecutorPlanError, LoadExecutor, PreparedExecutionPlan,
15            StructuralGroupedProjectionResult,
16        },
17        query::plan::QueryMode,
18        session::finalize_structural_grouped_projection_result,
19    },
20    error::InternalError,
21    traits::{CanisterKind, EntityValue},
22};
23
24///
25/// PreparedQueryExecutionOutcome
26///
27/// PreparedQueryExecutionOutcome is the private shared result shape for one
28/// prepared query execution. Normal execution and diagnostics attribution use
29/// it to share scalar/grouped/delete dispatch without exposing executor DTOs
30/// outside the session query module.
31///
32#[expect(
33    clippy::large_enum_variant,
34    reason = "the grouped execution result stays inline to avoid adding a boxed allocation on query execution paths"
35)]
36pub(in crate::db::session::query) enum PreparedQueryExecutionOutcome<E>
37where
38    E: PersistedRow,
39{
40    Scalar {
41        rows: EntityResponse<E>,
42        #[cfg(feature = "diagnostics")]
43        phase: Option<ScalarExecutePhaseAttribution>,
44        #[cfg(feature = "diagnostics")]
45        response_decode_local_instructions: u64,
46    },
47    Grouped {
48        result: StructuralGroupedProjectionResult,
49        trace: Option<ExecutionTrace>,
50        #[cfg(feature = "diagnostics")]
51        phase: Option<GroupedExecutePhaseAttribution>,
52    },
53    Delete {
54        rows: EntityResponse<E>,
55    },
56    DeleteCount {
57        row_count: u32,
58    },
59}
60
61///
62/// PreparedQueryExecutionOutput
63///
64/// PreparedQueryExecutionOutput tells the shared prepared-plan seam whether a
65/// delete query should materialize deleted rows or use the count-only executor
66/// terminal. The mode exists so `execute_delete_count` can share the same
67/// session dispatch core without forcing row allocation.
68///
69
70#[derive(Clone, Copy, Debug, Eq, PartialEq)]
71pub(in crate::db::session::query) enum PreparedQueryExecutionOutput {
72    Rows,
73    DeleteCount,
74}
75
76// Convert executor plan-surface failures at the session boundary so query error
77// types do not import executor-owned error enums.
78pub(in crate::db::session) fn query_error_from_executor_plan_error(
79    err: ExecutorPlanError,
80) -> QueryError {
81    match err {
82        ExecutorPlanError::Cursor(err) => QueryError::from_cursor_plan_error(*err),
83    }
84}
85
86impl<C: CanisterKind> DbSession<C> {
87    // Validate that one execution strategy is admissible for scalar paged load
88    // execution and fail closed on grouped/primary-key-only routes.
89    pub(in crate::db::session::query) fn ensure_scalar_paged_execution_family(
90        family: ExecutionFamily,
91    ) -> Result<(), QueryError> {
92        match family {
93            ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
94                CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
95            )),
96            ExecutionFamily::Ordered => Ok(()),
97            ExecutionFamily::Grouped => Err(QueryError::invariant(
98                "grouped queries execute via execute(), not page().execute()",
99            )),
100        }
101    }
102
103    // Validate that one execution strategy is admissible for the grouped
104    // execution surface.
105    pub(in crate::db::session::query) fn ensure_grouped_execution_family(
106        family: ExecutionFamily,
107    ) -> Result<(), QueryError> {
108        match family {
109            ExecutionFamily::Grouped => Ok(()),
110            ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
111                "grouped execution requires grouped logical plans",
112            )),
113        }
114    }
115
116    /// Execute one scalar load/delete query and return materialized response rows.
117    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
118    where
119        E: PersistedRow<Canister = C> + EntityValue,
120    {
121        self.execute_query_result(query)
122            .and_then(LoadQueryResult::into_rows)
123    }
124
125    // Execute one typed query through the unified row/grouped result surface so
126    // higher layers do not need to branch on grouped shape themselves.
127    #[doc(hidden)]
128    pub fn execute_query_result<E>(
129        &self,
130        query: &Query<E>,
131    ) -> Result<LoadQueryResult<E>, QueryError>
132    where
133        E: PersistedRow<Canister = C> + EntityValue,
134    {
135        // Phase 1: compile typed intent into one prepared execution-plan
136        // contract shared by scalar, grouped, and delete execution.
137        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
138
139        // Phase 2: execute through the canonical prepared-plan seam and adapt
140        // the private executor outcome into the public session result shape.
141        self.execute_prepared(plan, false, PreparedQueryExecutionOutput::Rows)
142            .and_then(Self::load_result_from_prepared_outcome)
143    }
144
145    /// Execute one typed delete query and return only the affected-row count.
146    #[doc(hidden)]
147    pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
148    where
149        E: PersistedRow<Canister = C> + EntityValue,
150    {
151        // Phase 1: fail closed if the caller routes a non-delete query here.
152        if !query.mode().is_delete() {
153            return Err(QueryError::unsupported_query(
154                "delete count execution requires delete query mode",
155            ));
156        }
157
158        // Phase 2: resolve one cached prepared execution-plan contract directly
159        // from the shared lower boundary instead of rebuilding it through the
160        // typed compiled-query wrapper.
161        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
162
163        // Phase 3: execute through the shared prepared-plan seam while keeping
164        // the count-only delete terminal that skips response-row materialization.
165        match self.execute_prepared(plan, false, PreparedQueryExecutionOutput::DeleteCount)? {
166            PreparedQueryExecutionOutcome::DeleteCount { row_count } => Ok(row_count),
167            PreparedQueryExecutionOutcome::Scalar { .. }
168            | PreparedQueryExecutionOutcome::Grouped { .. }
169            | PreparedQueryExecutionOutcome::Delete { .. } => Err(QueryError::invariant(
170                "delete count execution returned non-count result",
171            )),
172        }
173    }
174
175    // Execute one prepared plan through the shared scalar/grouped/delete
176    // dispatch. Diagnostics can request phase-attribution executor entrypoints;
177    // normal execution keeps the existing non-attribution calls.
178    pub(in crate::db::session::query) fn execute_prepared<E>(
179        &self,
180        plan: PreparedExecutionPlan<E>,
181        collect_attribution: bool,
182        output: PreparedQueryExecutionOutput,
183    ) -> Result<PreparedQueryExecutionOutcome<E>, QueryError>
184    where
185        E: PersistedRow<Canister = C> + EntityValue,
186    {
187        #[cfg(not(feature = "diagnostics"))]
188        let _ = collect_attribution;
189
190        if plan.is_grouped() {
191            if output == PreparedQueryExecutionOutput::DeleteCount {
192                return Err(QueryError::invariant(
193                    "delete count execution requires delete query mode",
194                ));
195            }
196
197            #[cfg(feature = "diagnostics")]
198            if collect_attribution {
199                let (result, trace, phase) =
200                    self.execute_grouped_with_cursor(plan, None, |executor, plan, cursor| {
201                        executor.execute_grouped_paged_with_cursor_traced_with_phase_attribution(
202                            plan, cursor,
203                        )
204                    })?;
205
206                return Ok(PreparedQueryExecutionOutcome::Grouped {
207                    result,
208                    trace,
209                    phase: Some(phase),
210                });
211            }
212
213            let (result, trace) = self.execute_grouped_with_trace(plan, None)?;
214
215            return Ok(PreparedQueryExecutionOutcome::Grouped {
216                result,
217                trace,
218                #[cfg(feature = "diagnostics")]
219                phase: None,
220            });
221        }
222
223        match plan.mode() {
224            QueryMode::Load(_) => {
225                if output == PreparedQueryExecutionOutput::DeleteCount {
226                    return Err(QueryError::invariant(
227                        "delete count execution requires delete query mode",
228                    ));
229                }
230
231                #[cfg(feature = "diagnostics")]
232                if collect_attribution {
233                    let (rows, phase, response_decode_local_instructions) = self
234                        .load_executor::<E>()
235                        .execute_with_phase_attribution(plan)
236                        .map_err(QueryError::execute)?;
237
238                    return Ok(PreparedQueryExecutionOutcome::Scalar {
239                        rows,
240                        phase: Some(phase),
241                        response_decode_local_instructions,
242                    });
243                }
244
245                let rows = self
246                    .with_metrics(|| self.load_executor::<E>().execute(plan))
247                    .map_err(QueryError::execute)?;
248
249                Ok(PreparedQueryExecutionOutcome::Scalar {
250                    rows,
251                    #[cfg(feature = "diagnostics")]
252                    phase: None,
253                    #[cfg(feature = "diagnostics")]
254                    response_decode_local_instructions: 0,
255                })
256            }
257            QueryMode::Delete(_) => match output {
258                PreparedQueryExecutionOutput::Rows => {
259                    let rows = self
260                        .with_metrics(|| self.delete_executor::<E>().execute(plan))
261                        .map_err(QueryError::execute)?;
262
263                    Ok(PreparedQueryExecutionOutcome::Delete { rows })
264                }
265                PreparedQueryExecutionOutput::DeleteCount => {
266                    let row_count = self
267                        .with_metrics(|| self.delete_executor::<E>().execute_count(plan))
268                        .map_err(QueryError::execute)?;
269
270                    Ok(PreparedQueryExecutionOutcome::DeleteCount { row_count })
271                }
272            },
273        }
274    }
275
276    // Adapt the canonical prepared-plan outcome to the public load-query
277    // result shape. This is the only non-diagnostics adapter that understands
278    // the private scalar/grouped/delete execution outcome variants.
279    fn load_result_from_prepared_outcome<E>(
280        outcome: PreparedQueryExecutionOutcome<E>,
281    ) -> Result<LoadQueryResult<E>, QueryError>
282    where
283        E: PersistedRow<Canister = C> + EntityValue,
284    {
285        match outcome {
286            PreparedQueryExecutionOutcome::Scalar { rows, .. }
287            | PreparedQueryExecutionOutcome::Delete { rows } => Ok(LoadQueryResult::Rows(rows)),
288            PreparedQueryExecutionOutcome::Grouped { result, trace, .. } => {
289                finalize_structural_grouped_projection_result(result, trace)
290                    .map(LoadQueryResult::Grouped)
291            }
292            PreparedQueryExecutionOutcome::DeleteCount { .. } => Err(QueryError::invariant(
293                "delete count result cannot be converted to load query result",
294            )),
295        }
296    }
297
298    // Shared load-query terminal wrapper: build plan, run under metrics, map
299    // execution errors into query-facing errors.
300    pub(in crate::db) fn execute_with_plan<E, T>(
301        &self,
302        query: &Query<E>,
303        op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
304    ) -> Result<T, QueryError>
305    where
306        E: PersistedRow<Canister = C> + EntityValue,
307    {
308        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
309
310        self.with_metrics(|| op(self.load_executor::<E>(), plan))
311            .map_err(QueryError::execute)
312    }
313}