Skip to main content

icydb_core/db/session/query/
execution.rs

1//! Module: db::session::query::execution
2//! Responsibility: canonical query execution dispatch and executor error mapping.
3//! Does not own: diagnostics attribution, cursor decoding, fluent adaptation, or explain surfaces.
4//! Boundary: maps prepared plans into executor calls and query-facing response/error types.
5
6#[cfg(feature = "diagnostics")]
7use crate::db::executor::{GroupedExecutePhaseAttribution, ScalarExecutePhaseAttribution};
8use crate::{
9    db::{
10        DbSession, EntityResponse, LoadQueryResult, PersistedRow, Query, QueryError,
11        diagnostics::ExecutionTrace,
12        executor::{
13            ExecutionFamily, ExecutorPlanError, LoadExecutor, PreparedExecutionPlan,
14            StructuralGroupedProjectionResult,
15        },
16        query::plan::QueryMode,
17        session::finalize_structural_grouped_projection_result,
18    },
19    error::InternalError,
20    traits::{CanisterKind, EntityValue},
21};
22
23///
24/// PreparedQueryExecutionOutcome
25///
26/// PreparedQueryExecutionOutcome is the private shared result shape for one
27/// prepared query execution. Normal execution and diagnostics attribution use
28/// it to share scalar/grouped/delete dispatch without exposing executor DTOs
29/// outside the session query module.
30///
31#[cfg_attr(
32    not(feature = "diagnostics"),
33    expect(
34        clippy::large_enum_variant,
35        reason = "non-diagnostics builds keep the grouped execution trace inline to avoid boxing a private session-boundary outcome"
36    )
37)]
38pub(in crate::db::session::query) enum PreparedQueryExecutionOutcome<E>
39where
40    E: PersistedRow,
41{
42    Scalar {
43        rows: EntityResponse<E>,
44        #[cfg(feature = "diagnostics")]
45        phase: Option<ScalarExecutePhaseAttribution>,
46        #[cfg(feature = "diagnostics")]
47        response_decode_local_instructions: u64,
48    },
49    Grouped {
50        result: StructuralGroupedProjectionResult,
51        trace: Option<ExecutionTrace>,
52        #[cfg(feature = "diagnostics")]
53        phase: Option<GroupedExecutePhaseAttribution>,
54    },
55    Delete {
56        rows: EntityResponse<E>,
57    },
58    DeleteCount {
59        row_count: u32,
60    },
61}
62
63///
64/// PreparedQueryExecutionOutput
65///
66/// PreparedQueryExecutionOutput tells the shared prepared-plan seam whether a
67/// delete query should materialize deleted rows or use the count-only executor
68/// terminal. The mode exists so `execute_delete_count` can share the same
69/// session dispatch core without forcing row allocation.
70///
71
72#[derive(Clone, Copy, Debug, Eq, PartialEq)]
73pub(in crate::db::session::query) enum PreparedQueryExecutionOutput {
74    Rows,
75    DeleteCount,
76}
77
78// Convert executor plan-surface failures at the session boundary so query error
79// types do not import executor-owned error enums.
80pub(in crate::db::session) fn query_error_from_executor_plan_error(
81    err: ExecutorPlanError,
82) -> QueryError {
83    match err {
84        ExecutorPlanError::Cursor(err) => QueryError::from_cursor_plan_error(*err),
85    }
86}
87
88impl<C: CanisterKind> DbSession<C> {
89    // Validate that one execution strategy is admissible for scalar paged load
90    // execution and fail closed on grouped/primary-key-only routes.
91    pub(in crate::db::session::query) fn ensure_scalar_paged_execution_family(
92        family: ExecutionFamily,
93    ) -> Result<(), QueryError> {
94        match family {
95            ExecutionFamily::Ordered => Ok(()),
96            ExecutionFamily::PrimaryKey | ExecutionFamily::Grouped => Err(QueryError::invariant()),
97        }
98    }
99
100    // Validate that one execution strategy is admissible for the grouped
101    // execution surface.
102    pub(in crate::db::session::query) fn ensure_grouped_execution_family(
103        family: ExecutionFamily,
104    ) -> Result<(), QueryError> {
105        match family {
106            ExecutionFamily::Grouped => Ok(()),
107            ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant()),
108        }
109    }
110
111    /// Execute one scalar load/delete query and return materialized response rows.
112    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
113    where
114        E: PersistedRow<Canister = C> + EntityValue,
115    {
116        self.execute_query_result(query)
117            .and_then(LoadQueryResult::into_rows)
118    }
119
120    /// Execute one scalar load query through a rows-only dispatch path.
121    ///
122    /// This keeps row-only fluent terminals from retaining grouped and delete
123    /// executor branches through the broad `LoadQueryResult` boundary.
124    pub fn execute_scalar_query_rows<E>(
125        &self,
126        query: &Query<E>,
127    ) -> Result<EntityResponse<E>, QueryError>
128    where
129        E: PersistedRow<Canister = C> + EntityValue,
130    {
131        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
132
133        if plan.is_grouped() {
134            return Err(QueryError::invariant());
135        }
136
137        match plan.mode() {
138            QueryMode::Load(_) => self
139                .with_metrics(|| self.load_executor::<E>().execute(plan))
140                .map_err(QueryError::execute),
141            QueryMode::Delete(_) => Err(QueryError::unsupported_query()),
142        }
143    }
144
145    // Execute one typed query through the unified row/grouped result surface so
146    // higher layers do not need to branch on grouped shape themselves.
147    #[doc(hidden)]
148    pub fn execute_query_result<E>(
149        &self,
150        query: &Query<E>,
151    ) -> Result<LoadQueryResult<E>, QueryError>
152    where
153        E: PersistedRow<Canister = C> + EntityValue,
154    {
155        // Phase 1: compile typed intent into one prepared execution-plan
156        // contract shared by scalar, grouped, and delete execution.
157        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
158
159        // Phase 2: execute through the canonical prepared-plan seam and adapt
160        // the private executor outcome into the public session result shape.
161        self.execute_prepared(plan, false, PreparedQueryExecutionOutput::Rows)
162            .and_then(Self::load_result_from_prepared_outcome)
163    }
164
165    /// Execute one typed delete query and return only the affected-row count.
166    #[doc(hidden)]
167    pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
168    where
169        E: PersistedRow<Canister = C> + EntityValue,
170    {
171        // Phase 1: fail closed if the caller routes a non-delete query here.
172        if !query.mode().is_delete() {
173            return Err(QueryError::unsupported_query());
174        }
175
176        // Phase 2: resolve one cached prepared execution-plan contract directly
177        // from the shared lower boundary instead of rebuilding it through the
178        // typed compiled-query wrapper.
179        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
180
181        // Phase 3: execute through the shared prepared-plan seam while keeping
182        // the count-only delete terminal that skips response-row materialization.
183        match self.execute_prepared(plan, false, PreparedQueryExecutionOutput::DeleteCount)? {
184            PreparedQueryExecutionOutcome::DeleteCount { row_count } => Ok(row_count),
185            PreparedQueryExecutionOutcome::Scalar { .. }
186            | PreparedQueryExecutionOutcome::Grouped { .. }
187            | PreparedQueryExecutionOutcome::Delete { .. } => Err(QueryError::invariant()),
188        }
189    }
190
191    // Execute one prepared plan through the shared scalar/grouped/delete
192    // dispatch. Diagnostics can request phase-attribution executor entrypoints;
193    // normal execution keeps the existing non-attribution calls.
194    pub(in crate::db::session::query) fn execute_prepared<E>(
195        &self,
196        plan: PreparedExecutionPlan<E>,
197        collect_attribution: bool,
198        output: PreparedQueryExecutionOutput,
199    ) -> Result<PreparedQueryExecutionOutcome<E>, QueryError>
200    where
201        E: PersistedRow<Canister = C> + EntityValue,
202    {
203        #[cfg(not(feature = "diagnostics"))]
204        let _ = collect_attribution;
205
206        if plan.is_grouped() {
207            if output == PreparedQueryExecutionOutput::DeleteCount {
208                return Err(QueryError::invariant());
209            }
210
211            #[cfg(feature = "diagnostics")]
212            if collect_attribution {
213                let (result, trace, phase) =
214                    self.execute_grouped_with_phase_attribution(plan, None)?;
215
216                return Ok(PreparedQueryExecutionOutcome::Grouped {
217                    result,
218                    trace,
219                    phase: Some(phase),
220                });
221            }
222
223            let (result, trace) = self.execute_grouped_with_trace(plan, None)?;
224
225            return Ok(PreparedQueryExecutionOutcome::Grouped {
226                result,
227                trace,
228                #[cfg(feature = "diagnostics")]
229                phase: None,
230            });
231        }
232
233        match plan.mode() {
234            QueryMode::Load(_) => {
235                if output == PreparedQueryExecutionOutput::DeleteCount {
236                    return Err(QueryError::invariant());
237                }
238
239                #[cfg(feature = "diagnostics")]
240                if collect_attribution {
241                    let (rows, phase, response_decode_local_instructions) = self
242                        .load_executor::<E>()
243                        .execute_with_phase_attribution(plan)
244                        .map_err(QueryError::execute)?;
245
246                    return Ok(PreparedQueryExecutionOutcome::Scalar {
247                        rows,
248                        phase: Some(phase),
249                        response_decode_local_instructions,
250                    });
251                }
252
253                let rows = self
254                    .with_metrics(|| self.load_executor::<E>().execute(plan))
255                    .map_err(QueryError::execute)?;
256
257                Ok(PreparedQueryExecutionOutcome::Scalar {
258                    rows,
259                    #[cfg(feature = "diagnostics")]
260                    phase: None,
261                    #[cfg(feature = "diagnostics")]
262                    response_decode_local_instructions: 0,
263                })
264            }
265            QueryMode::Delete(_) => match output {
266                PreparedQueryExecutionOutput::Rows => {
267                    let rows = self
268                        .with_metrics(|| self.delete_executor::<E>().execute(plan))
269                        .map_err(QueryError::execute)?;
270
271                    Ok(PreparedQueryExecutionOutcome::Delete { rows })
272                }
273                PreparedQueryExecutionOutput::DeleteCount => {
274                    let row_count = self
275                        .with_metrics(|| self.delete_executor::<E>().execute_count(plan))
276                        .map_err(QueryError::execute)?;
277
278                    Ok(PreparedQueryExecutionOutcome::DeleteCount { row_count })
279                }
280            },
281        }
282    }
283
284    // Adapt the canonical prepared-plan outcome to the public load-query
285    // result shape. This is the only non-diagnostics adapter that understands
286    // the private scalar/grouped/delete execution outcome variants.
287    fn load_result_from_prepared_outcome<E>(
288        outcome: PreparedQueryExecutionOutcome<E>,
289    ) -> Result<LoadQueryResult<E>, QueryError>
290    where
291        E: PersistedRow<Canister = C> + EntityValue,
292    {
293        match outcome {
294            PreparedQueryExecutionOutcome::Scalar { rows, .. }
295            | PreparedQueryExecutionOutcome::Delete { rows } => Ok(LoadQueryResult::Rows(rows)),
296            PreparedQueryExecutionOutcome::Grouped { result, trace, .. } => {
297                finalize_structural_grouped_projection_result(result, trace)
298                    .map(LoadQueryResult::Grouped)
299            }
300            PreparedQueryExecutionOutcome::DeleteCount { .. } => Err(QueryError::invariant()),
301        }
302    }
303
304    // Shared load-query terminal wrapper: build plan, run under metrics, map
305    // execution errors into query-facing errors.
306    pub(in crate::db) fn execute_with_plan<E, T>(
307        &self,
308        query: &Query<E>,
309        op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
310    ) -> Result<T, QueryError>
311    where
312        E: PersistedRow<Canister = C> + EntityValue,
313    {
314        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
315
316        self.with_metrics(|| op(self.load_executor::<E>(), plan))
317            .map_err(QueryError::execute)
318    }
319}