icydb_core/db/session/query/
execution.rs1#[cfg(feature = "diagnostics")]
7use crate::db::executor::{GroupedExecutePhaseAttribution, ScalarExecutePhaseAttribution};
8use crate::{
9 db::{
10 DbSession, EntityResponse, LoadQueryResult, PersistedRow, Query, QueryError,
11 cursor::CursorPlanError,
12 diagnostics::ExecutionTrace,
13 executor::{
14 ExecutionFamily, ExecutorPlanError, LoadExecutor, PreparedExecutionPlan,
15 StructuralGroupedProjectionResult,
16 },
17 query::plan::QueryMode,
18 session::finalize_structural_grouped_projection_result,
19 },
20 error::InternalError,
21 traits::{CanisterKind, EntityValue},
22};
23
24#[expect(
33 clippy::large_enum_variant,
34 reason = "the grouped execution result stays inline to avoid adding a boxed allocation on query execution paths"
35)]
36pub(in crate::db::session::query) enum PreparedQueryExecutionOutcome<E>
37where
38 E: PersistedRow,
39{
40 Scalar {
41 rows: EntityResponse<E>,
42 #[cfg(feature = "diagnostics")]
43 phase: Option<ScalarExecutePhaseAttribution>,
44 #[cfg(feature = "diagnostics")]
45 response_decode_local_instructions: u64,
46 },
47 Grouped {
48 result: StructuralGroupedProjectionResult,
49 trace: Option<ExecutionTrace>,
50 #[cfg(feature = "diagnostics")]
51 phase: Option<GroupedExecutePhaseAttribution>,
52 },
53 Delete {
54 rows: EntityResponse<E>,
55 },
56 DeleteCount {
57 row_count: u32,
58 },
59}
60
61#[derive(Clone, Copy, Debug, Eq, PartialEq)]
71pub(in crate::db::session::query) enum PreparedQueryExecutionOutput {
72 Rows,
73 DeleteCount,
74}
75
76pub(in crate::db::session) fn query_error_from_executor_plan_error(
79 err: ExecutorPlanError,
80) -> QueryError {
81 match err {
82 ExecutorPlanError::Cursor(err) => QueryError::from_cursor_plan_error(*err),
83 }
84}
85
86impl<C: CanisterKind> DbSession<C> {
87 pub(in crate::db::session::query) fn ensure_scalar_paged_execution_family(
90 family: ExecutionFamily,
91 ) -> Result<(), QueryError> {
92 match family {
93 ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
94 CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
95 )),
96 ExecutionFamily::Ordered => Ok(()),
97 ExecutionFamily::Grouped => Err(QueryError::invariant(
98 "grouped queries execute via execute(), not page().execute()",
99 )),
100 }
101 }
102
103 pub(in crate::db::session::query) fn ensure_grouped_execution_family(
106 family: ExecutionFamily,
107 ) -> Result<(), QueryError> {
108 match family {
109 ExecutionFamily::Grouped => Ok(()),
110 ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
111 "grouped execution requires grouped logical plans",
112 )),
113 }
114 }
115
116 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
118 where
119 E: PersistedRow<Canister = C> + EntityValue,
120 {
121 self.execute_query_result(query)
122 .and_then(LoadQueryResult::into_rows)
123 }
124
125 #[doc(hidden)]
128 pub fn execute_query_result<E>(
129 &self,
130 query: &Query<E>,
131 ) -> Result<LoadQueryResult<E>, QueryError>
132 where
133 E: PersistedRow<Canister = C> + EntityValue,
134 {
135 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
138
139 self.execute_prepared(query, plan, false, PreparedQueryExecutionOutput::Rows)
142 .and_then(Self::load_result_from_prepared_outcome)
143 }
144
145 #[doc(hidden)]
147 pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
148 where
149 E: PersistedRow<Canister = C> + EntityValue,
150 {
151 if !query.mode().is_delete() {
153 return Err(QueryError::unsupported_query(
154 "delete count execution requires delete query mode",
155 ));
156 }
157
158 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
162
163 match self.execute_prepared(
166 query,
167 plan,
168 false,
169 PreparedQueryExecutionOutput::DeleteCount,
170 )? {
171 PreparedQueryExecutionOutcome::DeleteCount { row_count } => Ok(row_count),
172 PreparedQueryExecutionOutcome::Scalar { .. }
173 | PreparedQueryExecutionOutcome::Grouped { .. }
174 | PreparedQueryExecutionOutcome::Delete { .. } => Err(QueryError::invariant(
175 "delete count execution returned non-count result",
176 )),
177 }
178 }
179
180 pub(in crate::db::session::query) fn execute_prepared<E>(
184 &self,
185 query: &Query<E>,
186 plan: PreparedExecutionPlan<E>,
187 collect_attribution: bool,
188 output: PreparedQueryExecutionOutput,
189 ) -> Result<PreparedQueryExecutionOutcome<E>, QueryError>
190 where
191 E: PersistedRow<Canister = C> + EntityValue,
192 {
193 #[cfg(not(feature = "diagnostics"))]
194 let _ = collect_attribution;
195
196 if query.has_grouping() {
197 if output == PreparedQueryExecutionOutput::DeleteCount {
198 return Err(QueryError::invariant(
199 "delete count execution requires delete query mode",
200 ));
201 }
202
203 #[cfg(feature = "diagnostics")]
204 if collect_attribution {
205 let (result, trace, phase) =
206 self.execute_grouped_with_cursor(plan, None, |executor, plan, cursor| {
207 executor.execute_grouped_paged_with_cursor_traced_with_phase_attribution(
208 plan, cursor,
209 )
210 })?;
211
212 return Ok(PreparedQueryExecutionOutcome::Grouped {
213 result,
214 trace,
215 phase: Some(phase),
216 });
217 }
218
219 let (result, trace) = self.execute_grouped_with_trace(plan, None)?;
220
221 return Ok(PreparedQueryExecutionOutcome::Grouped {
222 result,
223 trace,
224 #[cfg(feature = "diagnostics")]
225 phase: None,
226 });
227 }
228
229 match query.mode() {
230 QueryMode::Load(_) => {
231 if output == PreparedQueryExecutionOutput::DeleteCount {
232 return Err(QueryError::invariant(
233 "delete count execution requires delete query mode",
234 ));
235 }
236
237 #[cfg(feature = "diagnostics")]
238 if collect_attribution {
239 let (rows, phase, response_decode_local_instructions) = self
240 .load_executor::<E>()
241 .execute_with_phase_attribution(plan)
242 .map_err(QueryError::execute)?;
243
244 return Ok(PreparedQueryExecutionOutcome::Scalar {
245 rows,
246 phase: Some(phase),
247 response_decode_local_instructions,
248 });
249 }
250
251 let rows = self
252 .with_metrics(|| self.load_executor::<E>().execute(plan))
253 .map_err(QueryError::execute)?;
254
255 Ok(PreparedQueryExecutionOutcome::Scalar {
256 rows,
257 #[cfg(feature = "diagnostics")]
258 phase: None,
259 #[cfg(feature = "diagnostics")]
260 response_decode_local_instructions: 0,
261 })
262 }
263 QueryMode::Delete(_) => match output {
264 PreparedQueryExecutionOutput::Rows => {
265 let rows = self
266 .with_metrics(|| self.delete_executor::<E>().execute(plan))
267 .map_err(QueryError::execute)?;
268
269 Ok(PreparedQueryExecutionOutcome::Delete { rows })
270 }
271 PreparedQueryExecutionOutput::DeleteCount => {
272 let row_count = self
273 .with_metrics(|| self.delete_executor::<E>().execute_count(plan))
274 .map_err(QueryError::execute)?;
275
276 Ok(PreparedQueryExecutionOutcome::DeleteCount { row_count })
277 }
278 },
279 }
280 }
281
282 fn load_result_from_prepared_outcome<E>(
286 outcome: PreparedQueryExecutionOutcome<E>,
287 ) -> Result<LoadQueryResult<E>, QueryError>
288 where
289 E: PersistedRow<Canister = C> + EntityValue,
290 {
291 match outcome {
292 PreparedQueryExecutionOutcome::Scalar { rows, .. }
293 | PreparedQueryExecutionOutcome::Delete { rows } => Ok(LoadQueryResult::Rows(rows)),
294 PreparedQueryExecutionOutcome::Grouped { result, trace, .. } => {
295 finalize_structural_grouped_projection_result(result, trace)
296 .map(LoadQueryResult::Grouped)
297 }
298 PreparedQueryExecutionOutcome::DeleteCount { .. } => Err(QueryError::invariant(
299 "delete count result cannot be converted to load query result",
300 )),
301 }
302 }
303
304 pub(in crate::db) fn execute_with_plan<E, T>(
307 &self,
308 query: &Query<E>,
309 op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
310 ) -> Result<T, QueryError>
311 where
312 E: PersistedRow<Canister = C> + EntityValue,
313 {
314 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
315
316 self.with_metrics(|| op(self.load_executor::<E>(), plan))
317 .map_err(QueryError::execute)
318 }
319}