icydb_core/db/session/query/
execution.rs1#[cfg(feature = "diagnostics")]
7use crate::db::executor::{GroupedExecutePhaseAttribution, ScalarExecutePhaseAttribution};
8use crate::{
9 db::{
10 DbSession, EntityResponse, LoadQueryResult, PersistedRow, Query, QueryError,
11 diagnostics::ExecutionTrace,
12 executor::{
13 ExecutionFamily, ExecutorPlanError, LoadExecutor, PreparedExecutionPlan,
14 StructuralGroupedProjectionResult,
15 },
16 query::plan::QueryMode,
17 session::finalize_structural_grouped_projection_result,
18 },
19 error::InternalError,
20 traits::{CanisterKind, EntityValue},
21};
22
23#[cfg_attr(
32 not(feature = "diagnostics"),
33 expect(
34 clippy::large_enum_variant,
35 reason = "non-diagnostics builds keep the grouped execution trace inline to avoid boxing a private session-boundary outcome"
36 )
37)]
38pub(in crate::db::session::query) enum PreparedQueryExecutionOutcome<E>
39where
40 E: PersistedRow,
41{
42 Scalar {
43 rows: EntityResponse<E>,
44 #[cfg(feature = "diagnostics")]
45 phase: Option<ScalarExecutePhaseAttribution>,
46 #[cfg(feature = "diagnostics")]
47 response_decode_local_instructions: u64,
48 },
49 Grouped {
50 result: StructuralGroupedProjectionResult,
51 trace: Option<ExecutionTrace>,
52 #[cfg(feature = "diagnostics")]
53 phase: Option<GroupedExecutePhaseAttribution>,
54 },
55 Delete {
56 rows: EntityResponse<E>,
57 },
58 DeleteCount {
59 row_count: u32,
60 },
61}
62
63#[derive(Clone, Copy, Debug, Eq, PartialEq)]
73pub(in crate::db::session::query) enum PreparedQueryExecutionOutput {
74 Rows,
75 DeleteCount,
76}
77
78pub(in crate::db::session) fn query_error_from_executor_plan_error(
81 err: ExecutorPlanError,
82) -> QueryError {
83 match err {
84 ExecutorPlanError::Cursor(err) => QueryError::from_cursor_plan_error(*err),
85 }
86}
87
88impl<C: CanisterKind> DbSession<C> {
89 pub(in crate::db::session::query) fn ensure_scalar_paged_execution_family(
92 family: ExecutionFamily,
93 ) -> Result<(), QueryError> {
94 match family {
95 ExecutionFamily::Ordered => Ok(()),
96 ExecutionFamily::PrimaryKey | ExecutionFamily::Grouped => Err(QueryError::invariant()),
97 }
98 }
99
100 pub(in crate::db::session::query) fn ensure_grouped_execution_family(
103 family: ExecutionFamily,
104 ) -> Result<(), QueryError> {
105 match family {
106 ExecutionFamily::Grouped => Ok(()),
107 ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant()),
108 }
109 }
110
111 pub(in crate::db) fn execute_query<E>(
113 &self,
114 query: &Query<E>,
115 ) -> Result<EntityResponse<E>, QueryError>
116 where
117 E: PersistedRow<Canister = C> + EntityValue,
118 {
119 self.execute_query_result(query)
120 .and_then(LoadQueryResult::into_rows)
121 }
122
123 pub fn execute_scalar_query_rows<E>(
128 &self,
129 query: &Query<E>,
130 ) -> Result<EntityResponse<E>, QueryError>
131 where
132 E: PersistedRow<Canister = C> + EntityValue,
133 {
134 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
135
136 if plan.is_grouped() {
137 return Err(QueryError::invariant());
138 }
139
140 match plan.mode() {
141 QueryMode::Load(_) => self
142 .with_metrics(|| self.load_executor::<E>().execute(plan))
143 .map_err(QueryError::execute),
144 QueryMode::Delete(_) => Err(QueryError::unsupported_query()),
145 }
146 }
147
148 #[doc(hidden)]
151 pub fn execute_query_result<E>(
152 &self,
153 query: &Query<E>,
154 ) -> Result<LoadQueryResult<E>, QueryError>
155 where
156 E: PersistedRow<Canister = C> + EntityValue,
157 {
158 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
161
162 self.execute_prepared(plan, false, PreparedQueryExecutionOutput::Rows)
165 .and_then(Self::load_result_from_prepared_outcome)
166 }
167
168 #[doc(hidden)]
170 pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
171 where
172 E: PersistedRow<Canister = C> + EntityValue,
173 {
174 if !query.mode().is_delete() {
176 return Err(QueryError::unsupported_query());
177 }
178
179 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
183
184 match self.execute_prepared(plan, false, PreparedQueryExecutionOutput::DeleteCount)? {
187 PreparedQueryExecutionOutcome::DeleteCount { row_count } => Ok(row_count),
188 PreparedQueryExecutionOutcome::Scalar { .. }
189 | PreparedQueryExecutionOutcome::Grouped { .. }
190 | PreparedQueryExecutionOutcome::Delete { .. } => Err(QueryError::invariant()),
191 }
192 }
193
194 pub(in crate::db::session::query) fn execute_prepared<E>(
198 &self,
199 plan: PreparedExecutionPlan<E>,
200 collect_attribution: bool,
201 output: PreparedQueryExecutionOutput,
202 ) -> Result<PreparedQueryExecutionOutcome<E>, QueryError>
203 where
204 E: PersistedRow<Canister = C> + EntityValue,
205 {
206 #[cfg(not(feature = "diagnostics"))]
207 let _ = collect_attribution;
208
209 if plan.is_grouped() {
210 if output == PreparedQueryExecutionOutput::DeleteCount {
211 return Err(QueryError::invariant());
212 }
213
214 #[cfg(feature = "diagnostics")]
215 if collect_attribution {
216 let (result, trace, phase) =
217 self.execute_grouped_with_phase_attribution(plan, None)?;
218
219 return Ok(PreparedQueryExecutionOutcome::Grouped {
220 result,
221 trace,
222 phase: Some(phase),
223 });
224 }
225
226 let (result, trace) = self.execute_grouped_with_trace(plan, None)?;
227
228 return Ok(PreparedQueryExecutionOutcome::Grouped {
229 result,
230 trace,
231 #[cfg(feature = "diagnostics")]
232 phase: None,
233 });
234 }
235
236 match plan.mode() {
237 QueryMode::Load(_) => {
238 if output == PreparedQueryExecutionOutput::DeleteCount {
239 return Err(QueryError::invariant());
240 }
241
242 #[cfg(feature = "diagnostics")]
243 if collect_attribution {
244 let (rows, phase, response_decode_local_instructions) = self
245 .load_executor::<E>()
246 .execute_with_phase_attribution(plan)
247 .map_err(QueryError::execute)?;
248
249 return Ok(PreparedQueryExecutionOutcome::Scalar {
250 rows,
251 phase: Some(phase),
252 response_decode_local_instructions,
253 });
254 }
255
256 let rows = self
257 .with_metrics(|| self.load_executor::<E>().execute(plan))
258 .map_err(QueryError::execute)?;
259
260 Ok(PreparedQueryExecutionOutcome::Scalar {
261 rows,
262 #[cfg(feature = "diagnostics")]
263 phase: None,
264 #[cfg(feature = "diagnostics")]
265 response_decode_local_instructions: 0,
266 })
267 }
268 QueryMode::Delete(_) => match output {
269 PreparedQueryExecutionOutput::Rows => {
270 let rows = self
271 .with_metrics(|| self.delete_executor::<E>().execute(plan))
272 .map_err(QueryError::execute)?;
273
274 Ok(PreparedQueryExecutionOutcome::Delete { rows })
275 }
276 PreparedQueryExecutionOutput::DeleteCount => {
277 let row_count = self
278 .with_metrics(|| self.delete_executor::<E>().execute_count(plan))
279 .map_err(QueryError::execute)?;
280
281 Ok(PreparedQueryExecutionOutcome::DeleteCount { row_count })
282 }
283 },
284 }
285 }
286
287 fn load_result_from_prepared_outcome<E>(
291 outcome: PreparedQueryExecutionOutcome<E>,
292 ) -> Result<LoadQueryResult<E>, QueryError>
293 where
294 E: PersistedRow<Canister = C> + EntityValue,
295 {
296 match outcome {
297 PreparedQueryExecutionOutcome::Scalar { rows, .. }
298 | PreparedQueryExecutionOutcome::Delete { rows } => Ok(LoadQueryResult::Rows(rows)),
299 PreparedQueryExecutionOutcome::Grouped { result, trace, .. } => {
300 finalize_structural_grouped_projection_result(result, trace)
301 .map(LoadQueryResult::Grouped)
302 }
303 PreparedQueryExecutionOutcome::DeleteCount { .. } => Err(QueryError::invariant()),
304 }
305 }
306
307 pub(in crate::db) fn execute_with_plan<E, T>(
310 &self,
311 query: &Query<E>,
312 op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
313 ) -> Result<T, QueryError>
314 where
315 E: PersistedRow<Canister = C> + EntityValue,
316 {
317 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
318
319 self.with_metrics(|| op(self.load_executor::<E>(), plan))
320 .map_err(QueryError::execute)
321 }
322}