Skip to main content

icydb_core/db/session/query/
execution.rs

1//! Module: db::session::query::execution
2//! Responsibility: canonical query execution dispatch and executor error mapping.
3//! Does not own: diagnostics attribution, cursor decoding, fluent adaptation, or explain surfaces.
4//! Boundary: maps prepared plans into executor calls and query-facing response/error types.
5
6#[cfg(feature = "diagnostics")]
7use crate::db::executor::{GroupedExecutePhaseAttribution, ScalarExecutePhaseAttribution};
8use crate::{
9    db::{
10        DbSession, EntityResponse, LoadQueryResult, PersistedRow, Query, QueryError,
11        cursor::CursorPlanError,
12        diagnostics::ExecutionTrace,
13        executor::{
14            ExecutionFamily, ExecutorPlanError, LoadExecutor, PreparedExecutionPlan,
15            StructuralGroupedProjectionResult,
16        },
17        query::plan::QueryMode,
18        session::finalize_structural_grouped_projection_result,
19    },
20    error::InternalError,
21    traits::{CanisterKind, EntityValue},
22};
23
24///
25/// PreparedQueryExecutionOutcome
26///
27/// PreparedQueryExecutionOutcome is the private shared result shape for one
28/// prepared query execution. Normal execution and diagnostics attribution use
29/// it to share scalar/grouped/delete dispatch without exposing executor DTOs
30/// outside the session query module.
31///
32#[expect(
33    clippy::large_enum_variant,
34    reason = "the grouped execution result stays inline to avoid adding a boxed allocation on query execution paths"
35)]
36pub(in crate::db::session::query) enum PreparedQueryExecutionOutcome<E>
37where
38    E: PersistedRow,
39{
40    Scalar {
41        rows: EntityResponse<E>,
42        #[cfg(feature = "diagnostics")]
43        phase: Option<ScalarExecutePhaseAttribution>,
44        #[cfg(feature = "diagnostics")]
45        response_decode_local_instructions: u64,
46    },
47    Grouped {
48        result: StructuralGroupedProjectionResult,
49        trace: Option<ExecutionTrace>,
50        #[cfg(feature = "diagnostics")]
51        phase: Option<GroupedExecutePhaseAttribution>,
52    },
53    Delete {
54        rows: EntityResponse<E>,
55    },
56    DeleteCount {
57        row_count: u32,
58    },
59}
60
61///
62/// PreparedQueryExecutionOutput
63///
64/// PreparedQueryExecutionOutput tells the shared prepared-plan seam whether a
65/// delete query should materialize deleted rows or use the count-only executor
66/// terminal. The mode exists so `execute_delete_count` can share the same
67/// session dispatch core without forcing row allocation.
68///
69
70#[derive(Clone, Copy, Debug, Eq, PartialEq)]
71pub(in crate::db::session::query) enum PreparedQueryExecutionOutput {
72    Rows,
73    DeleteCount,
74}
75
76// Convert executor plan-surface failures at the session boundary so query error
77// types do not import executor-owned error enums.
78pub(in crate::db::session) fn query_error_from_executor_plan_error(
79    err: ExecutorPlanError,
80) -> QueryError {
81    match err {
82        ExecutorPlanError::Cursor(err) => QueryError::from_cursor_plan_error(*err),
83    }
84}
85
86impl<C: CanisterKind> DbSession<C> {
87    // Validate that one execution strategy is admissible for scalar paged load
88    // execution and fail closed on grouped/primary-key-only routes.
89    pub(in crate::db::session::query) fn ensure_scalar_paged_execution_family(
90        family: ExecutionFamily,
91    ) -> Result<(), QueryError> {
92        match family {
93            ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
94                CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
95            )),
96            ExecutionFamily::Ordered => Ok(()),
97            ExecutionFamily::Grouped => Err(QueryError::invariant(
98                "grouped queries execute via execute(), not page().execute()",
99            )),
100        }
101    }
102
103    // Validate that one execution strategy is admissible for the grouped
104    // execution surface.
105    pub(in crate::db::session::query) fn ensure_grouped_execution_family(
106        family: ExecutionFamily,
107    ) -> Result<(), QueryError> {
108        match family {
109            ExecutionFamily::Grouped => Ok(()),
110            ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
111                "grouped execution requires grouped logical plans",
112            )),
113        }
114    }
115
116    /// Execute one scalar load/delete query and return materialized response rows.
117    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
118    where
119        E: PersistedRow<Canister = C> + EntityValue,
120    {
121        self.execute_query_result(query)
122            .and_then(LoadQueryResult::into_rows)
123    }
124
125    // Execute one typed query through the unified row/grouped result surface so
126    // higher layers do not need to branch on grouped shape themselves.
127    #[doc(hidden)]
128    pub fn execute_query_result<E>(
129        &self,
130        query: &Query<E>,
131    ) -> Result<LoadQueryResult<E>, QueryError>
132    where
133        E: PersistedRow<Canister = C> + EntityValue,
134    {
135        // Phase 1: compile typed intent into one prepared execution-plan
136        // contract shared by scalar, grouped, and delete execution.
137        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
138
139        // Phase 2: execute through the canonical prepared-plan seam and adapt
140        // the private executor outcome into the public session result shape.
141        self.execute_prepared(query, plan, false, PreparedQueryExecutionOutput::Rows)
142            .and_then(Self::load_result_from_prepared_outcome)
143    }
144
145    /// Execute one typed delete query and return only the affected-row count.
146    #[doc(hidden)]
147    pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
148    where
149        E: PersistedRow<Canister = C> + EntityValue,
150    {
151        // Phase 1: fail closed if the caller routes a non-delete query here.
152        if !query.mode().is_delete() {
153            return Err(QueryError::unsupported_query(
154                "delete count execution requires delete query mode",
155            ));
156        }
157
158        // Phase 2: resolve one cached prepared execution-plan contract directly
159        // from the shared lower boundary instead of rebuilding it through the
160        // typed compiled-query wrapper.
161        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
162
163        // Phase 3: execute through the shared prepared-plan seam while keeping
164        // the count-only delete terminal that skips response-row materialization.
165        match self.execute_prepared(
166            query,
167            plan,
168            false,
169            PreparedQueryExecutionOutput::DeleteCount,
170        )? {
171            PreparedQueryExecutionOutcome::DeleteCount { row_count } => Ok(row_count),
172            PreparedQueryExecutionOutcome::Scalar { .. }
173            | PreparedQueryExecutionOutcome::Grouped { .. }
174            | PreparedQueryExecutionOutcome::Delete { .. } => Err(QueryError::invariant(
175                "delete count execution returned non-count result",
176            )),
177        }
178    }
179
180    // Execute one prepared plan through the shared scalar/grouped/delete
181    // dispatch. Diagnostics can request phase-attribution executor entrypoints;
182    // normal execution keeps the existing non-attribution calls.
183    pub(in crate::db::session::query) fn execute_prepared<E>(
184        &self,
185        query: &Query<E>,
186        plan: PreparedExecutionPlan<E>,
187        collect_attribution: bool,
188        output: PreparedQueryExecutionOutput,
189    ) -> Result<PreparedQueryExecutionOutcome<E>, QueryError>
190    where
191        E: PersistedRow<Canister = C> + EntityValue,
192    {
193        #[cfg(not(feature = "diagnostics"))]
194        let _ = collect_attribution;
195
196        if query.has_grouping() {
197            if output == PreparedQueryExecutionOutput::DeleteCount {
198                return Err(QueryError::invariant(
199                    "delete count execution requires delete query mode",
200                ));
201            }
202
203            #[cfg(feature = "diagnostics")]
204            if collect_attribution {
205                let (result, trace, phase) =
206                    self.execute_grouped_with_cursor(plan, None, |executor, plan, cursor| {
207                        executor.execute_grouped_paged_with_cursor_traced_with_phase_attribution(
208                            plan, cursor,
209                        )
210                    })?;
211
212                return Ok(PreparedQueryExecutionOutcome::Grouped {
213                    result,
214                    trace,
215                    phase: Some(phase),
216                });
217            }
218
219            let (result, trace) = self.execute_grouped_with_trace(plan, None)?;
220
221            return Ok(PreparedQueryExecutionOutcome::Grouped {
222                result,
223                trace,
224                #[cfg(feature = "diagnostics")]
225                phase: None,
226            });
227        }
228
229        match query.mode() {
230            QueryMode::Load(_) => {
231                if output == PreparedQueryExecutionOutput::DeleteCount {
232                    return Err(QueryError::invariant(
233                        "delete count execution requires delete query mode",
234                    ));
235                }
236
237                #[cfg(feature = "diagnostics")]
238                if collect_attribution {
239                    let (rows, phase, response_decode_local_instructions) = self
240                        .load_executor::<E>()
241                        .execute_with_phase_attribution(plan)
242                        .map_err(QueryError::execute)?;
243
244                    return Ok(PreparedQueryExecutionOutcome::Scalar {
245                        rows,
246                        phase: Some(phase),
247                        response_decode_local_instructions,
248                    });
249                }
250
251                let rows = self
252                    .with_metrics(|| self.load_executor::<E>().execute(plan))
253                    .map_err(QueryError::execute)?;
254
255                Ok(PreparedQueryExecutionOutcome::Scalar {
256                    rows,
257                    #[cfg(feature = "diagnostics")]
258                    phase: None,
259                    #[cfg(feature = "diagnostics")]
260                    response_decode_local_instructions: 0,
261                })
262            }
263            QueryMode::Delete(_) => match output {
264                PreparedQueryExecutionOutput::Rows => {
265                    let rows = self
266                        .with_metrics(|| self.delete_executor::<E>().execute(plan))
267                        .map_err(QueryError::execute)?;
268
269                    Ok(PreparedQueryExecutionOutcome::Delete { rows })
270                }
271                PreparedQueryExecutionOutput::DeleteCount => {
272                    let row_count = self
273                        .with_metrics(|| self.delete_executor::<E>().execute_count(plan))
274                        .map_err(QueryError::execute)?;
275
276                    Ok(PreparedQueryExecutionOutcome::DeleteCount { row_count })
277                }
278            },
279        }
280    }
281
282    // Adapt the canonical prepared-plan outcome to the public load-query
283    // result shape. This is the only non-diagnostics adapter that understands
284    // the private scalar/grouped/delete execution outcome variants.
285    fn load_result_from_prepared_outcome<E>(
286        outcome: PreparedQueryExecutionOutcome<E>,
287    ) -> Result<LoadQueryResult<E>, QueryError>
288    where
289        E: PersistedRow<Canister = C> + EntityValue,
290    {
291        match outcome {
292            PreparedQueryExecutionOutcome::Scalar { rows, .. }
293            | PreparedQueryExecutionOutcome::Delete { rows } => Ok(LoadQueryResult::Rows(rows)),
294            PreparedQueryExecutionOutcome::Grouped { result, trace, .. } => {
295                finalize_structural_grouped_projection_result(result, trace)
296                    .map(LoadQueryResult::Grouped)
297            }
298            PreparedQueryExecutionOutcome::DeleteCount { .. } => Err(QueryError::invariant(
299                "delete count result cannot be converted to load query result",
300            )),
301        }
302    }
303
304    // Shared load-query terminal wrapper: build plan, run under metrics, map
305    // execution errors into query-facing errors.
306    pub(in crate::db) fn execute_with_plan<E, T>(
307        &self,
308        query: &Query<E>,
309        op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
310    ) -> Result<T, QueryError>
311    where
312        E: PersistedRow<Canister = C> + EntityValue,
313    {
314        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
315
316        self.with_metrics(|| op(self.load_executor::<E>(), plan))
317            .map_err(QueryError::execute)
318    }
319}