Skip to main content

icydb_core/db/session/query/
execution.rs

1//! Module: db::session::query::execution
2//! Responsibility: canonical query execution dispatch and executor error mapping.
3//! Does not own: diagnostics attribution, cursor decoding, fluent adaptation, or explain surfaces.
4//! Boundary: maps prepared plans into executor calls and query-facing response/error types.
5
6#[cfg(feature = "diagnostics")]
7use crate::db::executor::{GroupedExecutePhaseAttribution, ScalarExecutePhaseAttribution};
8use crate::{
9    db::{
10        DbSession, EntityResponse, LoadQueryResult, PersistedRow, Query, QueryError,
11        diagnostics::ExecutionTrace,
12        executor::{
13            ExecutionFamily, ExecutorPlanError, LoadExecutor, PreparedExecutionPlan,
14            StructuralGroupedProjectionResult,
15        },
16        query::plan::QueryMode,
17        session::finalize_structural_grouped_projection_result,
18    },
19    error::InternalError,
20    traits::{CanisterKind, EntityValue},
21};
22
23///
24/// PreparedQueryExecutionOutcome
25///
26/// PreparedQueryExecutionOutcome is the private shared result shape for one
27/// prepared query execution. Normal execution and diagnostics attribution use
28/// it to share scalar/grouped/delete dispatch without exposing executor DTOs
29/// outside the session query module.
30///
31#[allow(
32    clippy::large_enum_variant,
33    reason = "private dispatch enum keeps scalar, grouped, and delete outcomes explicit at the session boundary; boxing the grouped trace would add indirection to avoid a non-hot-path size warning"
34)]
35pub(in crate::db::session::query) enum PreparedQueryExecutionOutcome<E>
36where
37    E: PersistedRow,
38{
39    Scalar {
40        rows: EntityResponse<E>,
41        #[cfg(feature = "diagnostics")]
42        phase: Option<ScalarExecutePhaseAttribution>,
43        #[cfg(feature = "diagnostics")]
44        response_decode_local_instructions: u64,
45    },
46    Grouped {
47        result: StructuralGroupedProjectionResult,
48        trace: Option<ExecutionTrace>,
49        #[cfg(feature = "diagnostics")]
50        phase: Option<GroupedExecutePhaseAttribution>,
51    },
52    Delete {
53        rows: EntityResponse<E>,
54    },
55    DeleteCount {
56        row_count: u32,
57    },
58}
59
60///
61/// PreparedQueryExecutionOutput
62///
63/// PreparedQueryExecutionOutput tells the shared prepared-plan seam whether a
64/// delete query should materialize deleted rows or use the count-only executor
65/// terminal. The mode exists so `execute_delete_count` can share the same
66/// session dispatch core without forcing row allocation.
67///
68
69#[derive(Clone, Copy, Debug, Eq, PartialEq)]
70pub(in crate::db::session::query) enum PreparedQueryExecutionOutput {
71    Rows,
72    DeleteCount,
73}
74
75// Convert executor plan-surface failures at the session boundary so query error
76// types do not import executor-owned error enums.
77pub(in crate::db::session) fn query_error_from_executor_plan_error(
78    err: ExecutorPlanError,
79) -> QueryError {
80    match err {
81        ExecutorPlanError::Cursor(err) => QueryError::from_cursor_plan_error(*err),
82    }
83}
84
85impl<C: CanisterKind> DbSession<C> {
86    // Validate that one execution strategy is admissible for scalar paged load
87    // execution and fail closed on grouped/primary-key-only routes.
88    pub(in crate::db::session::query) fn ensure_scalar_paged_execution_family(
89        family: ExecutionFamily,
90    ) -> Result<(), QueryError> {
91        match family {
92            ExecutionFamily::Ordered => Ok(()),
93            ExecutionFamily::PrimaryKey | ExecutionFamily::Grouped => Err(QueryError::invariant()),
94        }
95    }
96
97    // Validate that one execution strategy is admissible for the grouped
98    // execution surface.
99    pub(in crate::db::session::query) fn ensure_grouped_execution_family(
100        family: ExecutionFamily,
101    ) -> Result<(), QueryError> {
102        match family {
103            ExecutionFamily::Grouped => Ok(()),
104            ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant()),
105        }
106    }
107
108    /// Execute one scalar load/delete query and return materialized response rows.
109    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
110    where
111        E: PersistedRow<Canister = C> + EntityValue,
112    {
113        self.execute_query_result(query)
114            .and_then(LoadQueryResult::into_rows)
115    }
116
117    /// Execute one scalar load query through a rows-only dispatch path.
118    ///
119    /// This keeps row-only fluent terminals from retaining grouped and delete
120    /// executor branches through the broad `LoadQueryResult` boundary.
121    pub fn execute_scalar_query_rows<E>(
122        &self,
123        query: &Query<E>,
124    ) -> Result<EntityResponse<E>, QueryError>
125    where
126        E: PersistedRow<Canister = C> + EntityValue,
127    {
128        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
129
130        if plan.is_grouped() {
131            return Err(QueryError::invariant());
132        }
133
134        match plan.mode() {
135            QueryMode::Load(_) => self
136                .with_metrics(|| self.load_executor::<E>().execute(plan))
137                .map_err(QueryError::execute),
138            QueryMode::Delete(_) => Err(QueryError::unsupported_query()),
139        }
140    }
141
142    // Execute one typed query through the unified row/grouped result surface so
143    // higher layers do not need to branch on grouped shape themselves.
144    #[doc(hidden)]
145    pub fn execute_query_result<E>(
146        &self,
147        query: &Query<E>,
148    ) -> Result<LoadQueryResult<E>, QueryError>
149    where
150        E: PersistedRow<Canister = C> + EntityValue,
151    {
152        // Phase 1: compile typed intent into one prepared execution-plan
153        // contract shared by scalar, grouped, and delete execution.
154        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
155
156        // Phase 2: execute through the canonical prepared-plan seam and adapt
157        // the private executor outcome into the public session result shape.
158        self.execute_prepared(plan, false, PreparedQueryExecutionOutput::Rows)
159            .and_then(Self::load_result_from_prepared_outcome)
160    }
161
162    /// Execute one typed delete query and return only the affected-row count.
163    #[doc(hidden)]
164    pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
165    where
166        E: PersistedRow<Canister = C> + EntityValue,
167    {
168        // Phase 1: fail closed if the caller routes a non-delete query here.
169        if !query.mode().is_delete() {
170            return Err(QueryError::unsupported_query());
171        }
172
173        // Phase 2: resolve one cached prepared execution-plan contract directly
174        // from the shared lower boundary instead of rebuilding it through the
175        // typed compiled-query wrapper.
176        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
177
178        // Phase 3: execute through the shared prepared-plan seam while keeping
179        // the count-only delete terminal that skips response-row materialization.
180        match self.execute_prepared(plan, false, PreparedQueryExecutionOutput::DeleteCount)? {
181            PreparedQueryExecutionOutcome::DeleteCount { row_count } => Ok(row_count),
182            PreparedQueryExecutionOutcome::Scalar { .. }
183            | PreparedQueryExecutionOutcome::Grouped { .. }
184            | PreparedQueryExecutionOutcome::Delete { .. } => Err(QueryError::invariant()),
185        }
186    }
187
188    // Execute one prepared plan through the shared scalar/grouped/delete
189    // dispatch. Diagnostics can request phase-attribution executor entrypoints;
190    // normal execution keeps the existing non-attribution calls.
191    pub(in crate::db::session::query) fn execute_prepared<E>(
192        &self,
193        plan: PreparedExecutionPlan<E>,
194        collect_attribution: bool,
195        output: PreparedQueryExecutionOutput,
196    ) -> Result<PreparedQueryExecutionOutcome<E>, QueryError>
197    where
198        E: PersistedRow<Canister = C> + EntityValue,
199    {
200        #[cfg(not(feature = "diagnostics"))]
201        let _ = collect_attribution;
202
203        if plan.is_grouped() {
204            if output == PreparedQueryExecutionOutput::DeleteCount {
205                return Err(QueryError::invariant());
206            }
207
208            #[cfg(feature = "diagnostics")]
209            if collect_attribution {
210                let (result, trace, phase) =
211                    self.execute_grouped_with_cursor(plan, None, |executor, plan, cursor| {
212                        executor.execute_grouped_paged_with_cursor_traced_with_phase_attribution(
213                            plan, cursor,
214                        )
215                    })?;
216
217                return Ok(PreparedQueryExecutionOutcome::Grouped {
218                    result,
219                    trace,
220                    phase: Some(phase),
221                });
222            }
223
224            let (result, trace) = self.execute_grouped_with_trace(plan, None)?;
225
226            return Ok(PreparedQueryExecutionOutcome::Grouped {
227                result,
228                trace,
229                #[cfg(feature = "diagnostics")]
230                phase: None,
231            });
232        }
233
234        match plan.mode() {
235            QueryMode::Load(_) => {
236                if output == PreparedQueryExecutionOutput::DeleteCount {
237                    return Err(QueryError::invariant());
238                }
239
240                #[cfg(feature = "diagnostics")]
241                if collect_attribution {
242                    let (rows, phase, response_decode_local_instructions) = self
243                        .load_executor::<E>()
244                        .execute_with_phase_attribution(plan)
245                        .map_err(QueryError::execute)?;
246
247                    return Ok(PreparedQueryExecutionOutcome::Scalar {
248                        rows,
249                        phase: Some(phase),
250                        response_decode_local_instructions,
251                    });
252                }
253
254                let rows = self
255                    .with_metrics(|| self.load_executor::<E>().execute(plan))
256                    .map_err(QueryError::execute)?;
257
258                Ok(PreparedQueryExecutionOutcome::Scalar {
259                    rows,
260                    #[cfg(feature = "diagnostics")]
261                    phase: None,
262                    #[cfg(feature = "diagnostics")]
263                    response_decode_local_instructions: 0,
264                })
265            }
266            QueryMode::Delete(_) => match output {
267                PreparedQueryExecutionOutput::Rows => {
268                    let rows = self
269                        .with_metrics(|| self.delete_executor::<E>().execute(plan))
270                        .map_err(QueryError::execute)?;
271
272                    Ok(PreparedQueryExecutionOutcome::Delete { rows })
273                }
274                PreparedQueryExecutionOutput::DeleteCount => {
275                    let row_count = self
276                        .with_metrics(|| self.delete_executor::<E>().execute_count(plan))
277                        .map_err(QueryError::execute)?;
278
279                    Ok(PreparedQueryExecutionOutcome::DeleteCount { row_count })
280                }
281            },
282        }
283    }
284
285    // Adapt the canonical prepared-plan outcome to the public load-query
286    // result shape. This is the only non-diagnostics adapter that understands
287    // the private scalar/grouped/delete execution outcome variants.
288    fn load_result_from_prepared_outcome<E>(
289        outcome: PreparedQueryExecutionOutcome<E>,
290    ) -> Result<LoadQueryResult<E>, QueryError>
291    where
292        E: PersistedRow<Canister = C> + EntityValue,
293    {
294        match outcome {
295            PreparedQueryExecutionOutcome::Scalar { rows, .. }
296            | PreparedQueryExecutionOutcome::Delete { rows } => Ok(LoadQueryResult::Rows(rows)),
297            PreparedQueryExecutionOutcome::Grouped { result, trace, .. } => {
298                finalize_structural_grouped_projection_result(result, trace)
299                    .map(LoadQueryResult::Grouped)
300            }
301            PreparedQueryExecutionOutcome::DeleteCount { .. } => Err(QueryError::invariant()),
302        }
303    }
304
305    // Shared load-query terminal wrapper: build plan, run under metrics, map
306    // execution errors into query-facing errors.
307    pub(in crate::db) fn execute_with_plan<E, T>(
308        &self,
309        query: &Query<E>,
310        op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
311    ) -> Result<T, QueryError>
312    where
313        E: PersistedRow<Canister = C> + EntityValue,
314    {
315        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
316
317        self.with_metrics(|| op(self.load_executor::<E>(), plan))
318            .map_err(QueryError::execute)
319    }
320}