icydb_core/db/session/query/
execution.rs1#[cfg(feature = "diagnostics")]
7use crate::db::executor::{GroupedExecutePhaseAttribution, ScalarExecutePhaseAttribution};
8use crate::{
9 db::{
10 DbSession, EntityResponse, LoadQueryResult, PersistedRow, Query, QueryError,
11 diagnostics::ExecutionTrace,
12 executor::{
13 ExecutionFamily, ExecutorPlanError, LoadExecutor, PreparedExecutionPlan,
14 StructuralGroupedProjectionResult,
15 },
16 query::plan::QueryMode,
17 session::finalize_structural_grouped_projection_result,
18 },
19 error::InternalError,
20 traits::{CanisterKind, EntityValue},
21};
22
23#[allow(
32 clippy::large_enum_variant,
33 reason = "private dispatch enum keeps scalar, grouped, and delete outcomes explicit at the session boundary; boxing the grouped trace would add indirection to avoid a non-hot-path size warning"
34)]
35pub(in crate::db::session::query) enum PreparedQueryExecutionOutcome<E>
36where
37 E: PersistedRow,
38{
39 Scalar {
40 rows: EntityResponse<E>,
41 #[cfg(feature = "diagnostics")]
42 phase: Option<ScalarExecutePhaseAttribution>,
43 #[cfg(feature = "diagnostics")]
44 response_decode_local_instructions: u64,
45 },
46 Grouped {
47 result: StructuralGroupedProjectionResult,
48 trace: Option<ExecutionTrace>,
49 #[cfg(feature = "diagnostics")]
50 phase: Option<GroupedExecutePhaseAttribution>,
51 },
52 Delete {
53 rows: EntityResponse<E>,
54 },
55 DeleteCount {
56 row_count: u32,
57 },
58}
59
60#[derive(Clone, Copy, Debug, Eq, PartialEq)]
70pub(in crate::db::session::query) enum PreparedQueryExecutionOutput {
71 Rows,
72 DeleteCount,
73}
74
75pub(in crate::db::session) fn query_error_from_executor_plan_error(
78 err: ExecutorPlanError,
79) -> QueryError {
80 match err {
81 ExecutorPlanError::Cursor(err) => QueryError::from_cursor_plan_error(*err),
82 }
83}
84
85impl<C: CanisterKind> DbSession<C> {
86 pub(in crate::db::session::query) fn ensure_scalar_paged_execution_family(
89 family: ExecutionFamily,
90 ) -> Result<(), QueryError> {
91 match family {
92 ExecutionFamily::Ordered => Ok(()),
93 ExecutionFamily::PrimaryKey | ExecutionFamily::Grouped => Err(QueryError::invariant()),
94 }
95 }
96
97 pub(in crate::db::session::query) fn ensure_grouped_execution_family(
100 family: ExecutionFamily,
101 ) -> Result<(), QueryError> {
102 match family {
103 ExecutionFamily::Grouped => Ok(()),
104 ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant()),
105 }
106 }
107
108 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
110 where
111 E: PersistedRow<Canister = C> + EntityValue,
112 {
113 self.execute_query_result(query)
114 .and_then(LoadQueryResult::into_rows)
115 }
116
117 pub fn execute_scalar_query_rows<E>(
122 &self,
123 query: &Query<E>,
124 ) -> Result<EntityResponse<E>, QueryError>
125 where
126 E: PersistedRow<Canister = C> + EntityValue,
127 {
128 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
129
130 if plan.is_grouped() {
131 return Err(QueryError::invariant());
132 }
133
134 match plan.mode() {
135 QueryMode::Load(_) => self
136 .with_metrics(|| self.load_executor::<E>().execute(plan))
137 .map_err(QueryError::execute),
138 QueryMode::Delete(_) => Err(QueryError::unsupported_query()),
139 }
140 }
141
142 #[doc(hidden)]
145 pub fn execute_query_result<E>(
146 &self,
147 query: &Query<E>,
148 ) -> Result<LoadQueryResult<E>, QueryError>
149 where
150 E: PersistedRow<Canister = C> + EntityValue,
151 {
152 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
155
156 self.execute_prepared(plan, false, PreparedQueryExecutionOutput::Rows)
159 .and_then(Self::load_result_from_prepared_outcome)
160 }
161
162 #[doc(hidden)]
164 pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
165 where
166 E: PersistedRow<Canister = C> + EntityValue,
167 {
168 if !query.mode().is_delete() {
170 return Err(QueryError::unsupported_query());
171 }
172
173 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
177
178 match self.execute_prepared(plan, false, PreparedQueryExecutionOutput::DeleteCount)? {
181 PreparedQueryExecutionOutcome::DeleteCount { row_count } => Ok(row_count),
182 PreparedQueryExecutionOutcome::Scalar { .. }
183 | PreparedQueryExecutionOutcome::Grouped { .. }
184 | PreparedQueryExecutionOutcome::Delete { .. } => Err(QueryError::invariant()),
185 }
186 }
187
188 pub(in crate::db::session::query) fn execute_prepared<E>(
192 &self,
193 plan: PreparedExecutionPlan<E>,
194 collect_attribution: bool,
195 output: PreparedQueryExecutionOutput,
196 ) -> Result<PreparedQueryExecutionOutcome<E>, QueryError>
197 where
198 E: PersistedRow<Canister = C> + EntityValue,
199 {
200 #[cfg(not(feature = "diagnostics"))]
201 let _ = collect_attribution;
202
203 if plan.is_grouped() {
204 if output == PreparedQueryExecutionOutput::DeleteCount {
205 return Err(QueryError::invariant());
206 }
207
208 #[cfg(feature = "diagnostics")]
209 if collect_attribution {
210 let (result, trace, phase) =
211 self.execute_grouped_with_cursor(plan, None, |executor, plan, cursor| {
212 executor.execute_grouped_paged_with_cursor_traced_with_phase_attribution(
213 plan, cursor,
214 )
215 })?;
216
217 return Ok(PreparedQueryExecutionOutcome::Grouped {
218 result,
219 trace,
220 phase: Some(phase),
221 });
222 }
223
224 let (result, trace) = self.execute_grouped_with_trace(plan, None)?;
225
226 return Ok(PreparedQueryExecutionOutcome::Grouped {
227 result,
228 trace,
229 #[cfg(feature = "diagnostics")]
230 phase: None,
231 });
232 }
233
234 match plan.mode() {
235 QueryMode::Load(_) => {
236 if output == PreparedQueryExecutionOutput::DeleteCount {
237 return Err(QueryError::invariant());
238 }
239
240 #[cfg(feature = "diagnostics")]
241 if collect_attribution {
242 let (rows, phase, response_decode_local_instructions) = self
243 .load_executor::<E>()
244 .execute_with_phase_attribution(plan)
245 .map_err(QueryError::execute)?;
246
247 return Ok(PreparedQueryExecutionOutcome::Scalar {
248 rows,
249 phase: Some(phase),
250 response_decode_local_instructions,
251 });
252 }
253
254 let rows = self
255 .with_metrics(|| self.load_executor::<E>().execute(plan))
256 .map_err(QueryError::execute)?;
257
258 Ok(PreparedQueryExecutionOutcome::Scalar {
259 rows,
260 #[cfg(feature = "diagnostics")]
261 phase: None,
262 #[cfg(feature = "diagnostics")]
263 response_decode_local_instructions: 0,
264 })
265 }
266 QueryMode::Delete(_) => match output {
267 PreparedQueryExecutionOutput::Rows => {
268 let rows = self
269 .with_metrics(|| self.delete_executor::<E>().execute(plan))
270 .map_err(QueryError::execute)?;
271
272 Ok(PreparedQueryExecutionOutcome::Delete { rows })
273 }
274 PreparedQueryExecutionOutput::DeleteCount => {
275 let row_count = self
276 .with_metrics(|| self.delete_executor::<E>().execute_count(plan))
277 .map_err(QueryError::execute)?;
278
279 Ok(PreparedQueryExecutionOutcome::DeleteCount { row_count })
280 }
281 },
282 }
283 }
284
285 fn load_result_from_prepared_outcome<E>(
289 outcome: PreparedQueryExecutionOutcome<E>,
290 ) -> Result<LoadQueryResult<E>, QueryError>
291 where
292 E: PersistedRow<Canister = C> + EntityValue,
293 {
294 match outcome {
295 PreparedQueryExecutionOutcome::Scalar { rows, .. }
296 | PreparedQueryExecutionOutcome::Delete { rows } => Ok(LoadQueryResult::Rows(rows)),
297 PreparedQueryExecutionOutcome::Grouped { result, trace, .. } => {
298 finalize_structural_grouped_projection_result(result, trace)
299 .map(LoadQueryResult::Grouped)
300 }
301 PreparedQueryExecutionOutcome::DeleteCount { .. } => Err(QueryError::invariant()),
302 }
303 }
304
305 pub(in crate::db) fn execute_with_plan<E, T>(
308 &self,
309 query: &Query<E>,
310 op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
311 ) -> Result<T, QueryError>
312 where
313 E: PersistedRow<Canister = C> + EntityValue,
314 {
315 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
316
317 self.with_metrics(|| op(self.load_executor::<E>(), plan))
318 .map_err(QueryError::execute)
319 }
320}