Skip to main content

icydb_core/db/session/query/
execution.rs

1//! Module: db::session::query::execution
2//! Responsibility: canonical query execution dispatch and executor error mapping.
3//! Does not own: diagnostics attribution, cursor decoding, fluent adaptation, or explain surfaces.
4//! Boundary: maps prepared plans into executor calls and query-facing response/error types.
5
6#[cfg(feature = "diagnostics")]
7use crate::db::executor::{GroupedExecutePhaseAttribution, ScalarExecutePhaseAttribution};
8use crate::{
9    db::{
10        DbSession, EntityResponse, LoadQueryResult, PersistedRow, Query, QueryError,
11        diagnostics::ExecutionTrace,
12        executor::{
13            ExecutionFamily, ExecutorPlanError, LoadExecutor, PreparedExecutionPlan,
14            StructuralGroupedProjectionResult,
15        },
16        query::plan::QueryMode,
17        session::finalize_structural_grouped_projection_result,
18    },
19    error::InternalError,
20    traits::{CanisterKind, EntityValue},
21};
22
23///
24/// PreparedQueryExecutionOutcome
25///
26/// PreparedQueryExecutionOutcome is the private shared result shape for one
27/// prepared query execution. Normal execution and diagnostics attribution use
28/// it to share scalar/grouped/delete dispatch without exposing executor DTOs
29/// outside the session query module.
30///
31#[cfg_attr(
32    not(feature = "diagnostics"),
33    expect(
34        clippy::large_enum_variant,
35        reason = "non-diagnostics builds keep the grouped execution trace inline to avoid boxing a private session-boundary outcome"
36    )
37)]
38pub(in crate::db::session::query) enum PreparedQueryExecutionOutcome<E>
39where
40    E: PersistedRow,
41{
42    Scalar {
43        rows: EntityResponse<E>,
44        #[cfg(feature = "diagnostics")]
45        phase: Option<ScalarExecutePhaseAttribution>,
46        #[cfg(feature = "diagnostics")]
47        response_decode_local_instructions: u64,
48    },
49    Grouped {
50        result: StructuralGroupedProjectionResult,
51        trace: Option<ExecutionTrace>,
52        #[cfg(feature = "diagnostics")]
53        phase: Option<GroupedExecutePhaseAttribution>,
54    },
55    Delete {
56        rows: EntityResponse<E>,
57    },
58    DeleteCount {
59        row_count: u32,
60    },
61}
62
63///
64/// PreparedQueryExecutionOutput
65///
66/// PreparedQueryExecutionOutput tells the shared prepared-plan seam whether a
67/// delete query should materialize deleted rows or use the count-only executor
68/// terminal. The mode exists so `execute_delete_count` can share the same
69/// session dispatch core without forcing row allocation.
70///
71
72#[derive(Clone, Copy, Debug, Eq, PartialEq)]
73pub(in crate::db::session::query) enum PreparedQueryExecutionOutput {
74    Rows,
75    DeleteCount,
76}
77
78// Convert executor plan-surface failures at the session boundary so query error
79// types do not import executor-owned error enums.
80pub(in crate::db::session) fn query_error_from_executor_plan_error(
81    err: ExecutorPlanError,
82) -> QueryError {
83    match err {
84        ExecutorPlanError::Cursor(err) => QueryError::from_cursor_plan_error(*err),
85    }
86}
87
88impl<C: CanisterKind> DbSession<C> {
89    // Validate that one execution strategy is admissible for scalar paged load
90    // execution and fail closed on grouped/primary-key-only routes.
91    pub(in crate::db::session::query) fn ensure_scalar_paged_execution_family(
92        family: ExecutionFamily,
93    ) -> Result<(), QueryError> {
94        match family {
95            ExecutionFamily::Ordered => Ok(()),
96            ExecutionFamily::PrimaryKey | ExecutionFamily::Grouped => Err(QueryError::invariant()),
97        }
98    }
99
100    // Validate that one execution strategy is admissible for the grouped
101    // execution surface.
102    pub(in crate::db::session::query) fn ensure_grouped_execution_family(
103        family: ExecutionFamily,
104    ) -> Result<(), QueryError> {
105        match family {
106            ExecutionFamily::Grouped => Ok(()),
107            ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant()),
108        }
109    }
110
111    /// Execute one scalar load/delete query and return materialized response rows.
112    pub(in crate::db) fn execute_query<E>(
113        &self,
114        query: &Query<E>,
115    ) -> Result<EntityResponse<E>, QueryError>
116    where
117        E: PersistedRow<Canister = C> + EntityValue,
118    {
119        self.execute_query_result(query)
120            .and_then(LoadQueryResult::into_rows)
121    }
122
123    /// Execute one scalar load query through a rows-only dispatch path.
124    ///
125    /// This keeps row-only fluent terminals from retaining grouped and delete
126    /// executor branches through the broad `LoadQueryResult` boundary.
127    pub fn execute_scalar_query_rows<E>(
128        &self,
129        query: &Query<E>,
130    ) -> Result<EntityResponse<E>, QueryError>
131    where
132        E: PersistedRow<Canister = C> + EntityValue,
133    {
134        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
135
136        if plan.is_grouped() {
137            return Err(QueryError::invariant());
138        }
139
140        match plan.mode() {
141            QueryMode::Load(_) => self
142                .with_metrics(|| self.load_executor::<E>().execute(plan))
143                .map_err(QueryError::execute),
144            QueryMode::Delete(_) => Err(QueryError::unsupported_query()),
145        }
146    }
147
148    // Execute one typed query through the unified row/grouped result surface so
149    // higher layers do not need to branch on grouped shape themselves.
150    #[doc(hidden)]
151    pub fn execute_query_result<E>(
152        &self,
153        query: &Query<E>,
154    ) -> Result<LoadQueryResult<E>, QueryError>
155    where
156        E: PersistedRow<Canister = C> + EntityValue,
157    {
158        // Phase 1: compile typed intent into one prepared execution-plan
159        // contract shared by scalar, grouped, and delete execution.
160        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
161
162        // Phase 2: execute through the canonical prepared-plan seam and adapt
163        // the private executor outcome into the public session result shape.
164        self.execute_prepared(plan, false, PreparedQueryExecutionOutput::Rows)
165            .and_then(Self::load_result_from_prepared_outcome)
166    }
167
168    /// Execute one typed delete query and return only the affected-row count.
169    #[doc(hidden)]
170    pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
171    where
172        E: PersistedRow<Canister = C> + EntityValue,
173    {
174        // Phase 1: fail closed if the caller routes a non-delete query here.
175        if !query.mode().is_delete() {
176            return Err(QueryError::unsupported_query());
177        }
178
179        // Phase 2: resolve one cached prepared execution-plan contract directly
180        // from the shared lower boundary instead of rebuilding it through the
181        // typed compiled-query wrapper.
182        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
183
184        // Phase 3: execute through the shared prepared-plan seam while keeping
185        // the count-only delete terminal that skips response-row materialization.
186        match self.execute_prepared(plan, false, PreparedQueryExecutionOutput::DeleteCount)? {
187            PreparedQueryExecutionOutcome::DeleteCount { row_count } => Ok(row_count),
188            PreparedQueryExecutionOutcome::Scalar { .. }
189            | PreparedQueryExecutionOutcome::Grouped { .. }
190            | PreparedQueryExecutionOutcome::Delete { .. } => Err(QueryError::invariant()),
191        }
192    }
193
194    // Execute one prepared plan through the shared scalar/grouped/delete
195    // dispatch. Diagnostics can request phase-attribution executor entrypoints;
196    // normal execution keeps the existing non-attribution calls.
197    pub(in crate::db::session::query) fn execute_prepared<E>(
198        &self,
199        plan: PreparedExecutionPlan<E>,
200        collect_attribution: bool,
201        output: PreparedQueryExecutionOutput,
202    ) -> Result<PreparedQueryExecutionOutcome<E>, QueryError>
203    where
204        E: PersistedRow<Canister = C> + EntityValue,
205    {
206        #[cfg(not(feature = "diagnostics"))]
207        let _ = collect_attribution;
208
209        if plan.is_grouped() {
210            if output == PreparedQueryExecutionOutput::DeleteCount {
211                return Err(QueryError::invariant());
212            }
213
214            #[cfg(feature = "diagnostics")]
215            if collect_attribution {
216                let (result, trace, phase) =
217                    self.execute_grouped_with_phase_attribution(plan, None)?;
218
219                return Ok(PreparedQueryExecutionOutcome::Grouped {
220                    result,
221                    trace,
222                    phase: Some(phase),
223                });
224            }
225
226            let (result, trace) = self.execute_grouped_with_trace(plan, None)?;
227
228            return Ok(PreparedQueryExecutionOutcome::Grouped {
229                result,
230                trace,
231                #[cfg(feature = "diagnostics")]
232                phase: None,
233            });
234        }
235
236        match plan.mode() {
237            QueryMode::Load(_) => {
238                if output == PreparedQueryExecutionOutput::DeleteCount {
239                    return Err(QueryError::invariant());
240                }
241
242                #[cfg(feature = "diagnostics")]
243                if collect_attribution {
244                    let (rows, phase, response_decode_local_instructions) = self
245                        .load_executor::<E>()
246                        .execute_with_phase_attribution(plan)
247                        .map_err(QueryError::execute)?;
248
249                    return Ok(PreparedQueryExecutionOutcome::Scalar {
250                        rows,
251                        phase: Some(phase),
252                        response_decode_local_instructions,
253                    });
254                }
255
256                let rows = self
257                    .with_metrics(|| self.load_executor::<E>().execute(plan))
258                    .map_err(QueryError::execute)?;
259
260                Ok(PreparedQueryExecutionOutcome::Scalar {
261                    rows,
262                    #[cfg(feature = "diagnostics")]
263                    phase: None,
264                    #[cfg(feature = "diagnostics")]
265                    response_decode_local_instructions: 0,
266                })
267            }
268            QueryMode::Delete(_) => match output {
269                PreparedQueryExecutionOutput::Rows => {
270                    let rows = self
271                        .with_metrics(|| self.delete_executor::<E>().execute(plan))
272                        .map_err(QueryError::execute)?;
273
274                    Ok(PreparedQueryExecutionOutcome::Delete { rows })
275                }
276                PreparedQueryExecutionOutput::DeleteCount => {
277                    let row_count = self
278                        .with_metrics(|| self.delete_executor::<E>().execute_count(plan))
279                        .map_err(QueryError::execute)?;
280
281                    Ok(PreparedQueryExecutionOutcome::DeleteCount { row_count })
282                }
283            },
284        }
285    }
286
287    // Adapt the canonical prepared-plan outcome to the public load-query
288    // result shape. This is the only non-diagnostics adapter that understands
289    // the private scalar/grouped/delete execution outcome variants.
290    fn load_result_from_prepared_outcome<E>(
291        outcome: PreparedQueryExecutionOutcome<E>,
292    ) -> Result<LoadQueryResult<E>, QueryError>
293    where
294        E: PersistedRow<Canister = C> + EntityValue,
295    {
296        match outcome {
297            PreparedQueryExecutionOutcome::Scalar { rows, .. }
298            | PreparedQueryExecutionOutcome::Delete { rows } => Ok(LoadQueryResult::Rows(rows)),
299            PreparedQueryExecutionOutcome::Grouped { result, trace, .. } => {
300                finalize_structural_grouped_projection_result(result, trace)
301                    .map(LoadQueryResult::Grouped)
302            }
303            PreparedQueryExecutionOutcome::DeleteCount { .. } => Err(QueryError::invariant()),
304        }
305    }
306
307    // Shared load-query terminal wrapper: build plan, run under metrics, map
308    // execution errors into query-facing errors.
309    pub(in crate::db) fn execute_with_plan<E, T>(
310        &self,
311        query: &Query<E>,
312        op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
313    ) -> Result<T, QueryError>
314    where
315        E: PersistedRow<Canister = C> + EntityValue,
316    {
317        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
318
319        self.with_metrics(|| op(self.load_executor::<E>(), plan))
320            .map_err(QueryError::execute)
321    }
322}