icydb_core/db/session/query/
execution.rs1#[cfg(feature = "diagnostics")]
7use crate::db::executor::{GroupedExecutePhaseAttribution, ScalarExecutePhaseAttribution};
8use crate::{
9 db::{
10 DbSession, EntityResponse, LoadQueryResult, PersistedRow, Query, QueryError,
11 diagnostics::ExecutionTrace,
12 executor::{
13 ExecutionFamily, ExecutorPlanError, LoadExecutor, PreparedExecutionPlan,
14 StructuralGroupedProjectionResult,
15 },
16 query::plan::QueryMode,
17 session::finalize_structural_grouped_projection_result,
18 },
19 error::InternalError,
20 traits::{CanisterKind, EntityValue},
21};
22
23#[allow(
32 clippy::large_enum_variant,
33 reason = "private dispatch enum keeps scalar, grouped, and delete outcomes explicit at the session boundary; boxing the grouped trace would add indirection to avoid a non-hot-path size warning"
34)]
35pub(in crate::db::session::query) enum PreparedQueryExecutionOutcome<E>
36where
37 E: PersistedRow,
38{
39 Scalar {
40 rows: EntityResponse<E>,
41 #[cfg(feature = "diagnostics")]
42 phase: Option<ScalarExecutePhaseAttribution>,
43 #[cfg(feature = "diagnostics")]
44 response_decode_local_instructions: u64,
45 },
46 Grouped {
47 result: StructuralGroupedProjectionResult,
48 trace: Option<ExecutionTrace>,
49 #[cfg(feature = "diagnostics")]
50 phase: Option<GroupedExecutePhaseAttribution>,
51 },
52 Delete {
53 rows: EntityResponse<E>,
54 },
55 DeleteCount {
56 row_count: u32,
57 },
58}
59
60#[derive(Clone, Copy, Debug, Eq, PartialEq)]
70pub(in crate::db::session::query) enum PreparedQueryExecutionOutput {
71 Rows,
72 DeleteCount,
73}
74
75pub(in crate::db::session) fn query_error_from_executor_plan_error(
78 err: ExecutorPlanError,
79) -> QueryError {
80 match err {
81 ExecutorPlanError::Cursor(err) => QueryError::from_cursor_plan_error(*err),
82 }
83}
84
85impl<C: CanisterKind> DbSession<C> {
86 pub(in crate::db::session::query) fn ensure_scalar_paged_execution_family(
89 family: ExecutionFamily,
90 ) -> Result<(), QueryError> {
91 match family {
92 ExecutionFamily::Ordered => Ok(()),
93 ExecutionFamily::PrimaryKey | ExecutionFamily::Grouped => Err(QueryError::invariant()),
94 }
95 }
96
97 pub(in crate::db::session::query) fn ensure_grouped_execution_family(
100 family: ExecutionFamily,
101 ) -> Result<(), QueryError> {
102 match family {
103 ExecutionFamily::Grouped => Ok(()),
104 ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant()),
105 }
106 }
107
108 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
110 where
111 E: PersistedRow<Canister = C> + EntityValue,
112 {
113 self.execute_query_result(query)
114 .and_then(LoadQueryResult::into_rows)
115 }
116
117 #[doc(hidden)]
120 pub fn execute_query_result<E>(
121 &self,
122 query: &Query<E>,
123 ) -> Result<LoadQueryResult<E>, QueryError>
124 where
125 E: PersistedRow<Canister = C> + EntityValue,
126 {
127 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
130
131 self.execute_prepared(plan, false, PreparedQueryExecutionOutput::Rows)
134 .and_then(Self::load_result_from_prepared_outcome)
135 }
136
137 #[doc(hidden)]
139 pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
140 where
141 E: PersistedRow<Canister = C> + EntityValue,
142 {
143 if !query.mode().is_delete() {
145 return Err(QueryError::unsupported_query());
146 }
147
148 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
152
153 match self.execute_prepared(plan, false, PreparedQueryExecutionOutput::DeleteCount)? {
156 PreparedQueryExecutionOutcome::DeleteCount { row_count } => Ok(row_count),
157 PreparedQueryExecutionOutcome::Scalar { .. }
158 | PreparedQueryExecutionOutcome::Grouped { .. }
159 | PreparedQueryExecutionOutcome::Delete { .. } => Err(QueryError::invariant()),
160 }
161 }
162
163 pub(in crate::db::session::query) fn execute_prepared<E>(
167 &self,
168 plan: PreparedExecutionPlan<E>,
169 collect_attribution: bool,
170 output: PreparedQueryExecutionOutput,
171 ) -> Result<PreparedQueryExecutionOutcome<E>, QueryError>
172 where
173 E: PersistedRow<Canister = C> + EntityValue,
174 {
175 #[cfg(not(feature = "diagnostics"))]
176 let _ = collect_attribution;
177
178 if plan.is_grouped() {
179 if output == PreparedQueryExecutionOutput::DeleteCount {
180 return Err(QueryError::invariant());
181 }
182
183 #[cfg(feature = "diagnostics")]
184 if collect_attribution {
185 let (result, trace, phase) =
186 self.execute_grouped_with_cursor(plan, None, |executor, plan, cursor| {
187 executor.execute_grouped_paged_with_cursor_traced_with_phase_attribution(
188 plan, cursor,
189 )
190 })?;
191
192 return Ok(PreparedQueryExecutionOutcome::Grouped {
193 result,
194 trace,
195 phase: Some(phase),
196 });
197 }
198
199 let (result, trace) = self.execute_grouped_with_trace(plan, None)?;
200
201 return Ok(PreparedQueryExecutionOutcome::Grouped {
202 result,
203 trace,
204 #[cfg(feature = "diagnostics")]
205 phase: None,
206 });
207 }
208
209 match plan.mode() {
210 QueryMode::Load(_) => {
211 if output == PreparedQueryExecutionOutput::DeleteCount {
212 return Err(QueryError::invariant());
213 }
214
215 #[cfg(feature = "diagnostics")]
216 if collect_attribution {
217 let (rows, phase, response_decode_local_instructions) = self
218 .load_executor::<E>()
219 .execute_with_phase_attribution(plan)
220 .map_err(QueryError::execute)?;
221
222 return Ok(PreparedQueryExecutionOutcome::Scalar {
223 rows,
224 phase: Some(phase),
225 response_decode_local_instructions,
226 });
227 }
228
229 let rows = self
230 .with_metrics(|| self.load_executor::<E>().execute(plan))
231 .map_err(QueryError::execute)?;
232
233 Ok(PreparedQueryExecutionOutcome::Scalar {
234 rows,
235 #[cfg(feature = "diagnostics")]
236 phase: None,
237 #[cfg(feature = "diagnostics")]
238 response_decode_local_instructions: 0,
239 })
240 }
241 QueryMode::Delete(_) => match output {
242 PreparedQueryExecutionOutput::Rows => {
243 let rows = self
244 .with_metrics(|| self.delete_executor::<E>().execute(plan))
245 .map_err(QueryError::execute)?;
246
247 Ok(PreparedQueryExecutionOutcome::Delete { rows })
248 }
249 PreparedQueryExecutionOutput::DeleteCount => {
250 let row_count = self
251 .with_metrics(|| self.delete_executor::<E>().execute_count(plan))
252 .map_err(QueryError::execute)?;
253
254 Ok(PreparedQueryExecutionOutcome::DeleteCount { row_count })
255 }
256 },
257 }
258 }
259
260 fn load_result_from_prepared_outcome<E>(
264 outcome: PreparedQueryExecutionOutcome<E>,
265 ) -> Result<LoadQueryResult<E>, QueryError>
266 where
267 E: PersistedRow<Canister = C> + EntityValue,
268 {
269 match outcome {
270 PreparedQueryExecutionOutcome::Scalar { rows, .. }
271 | PreparedQueryExecutionOutcome::Delete { rows } => Ok(LoadQueryResult::Rows(rows)),
272 PreparedQueryExecutionOutcome::Grouped { result, trace, .. } => {
273 finalize_structural_grouped_projection_result(result, trace)
274 .map(LoadQueryResult::Grouped)
275 }
276 PreparedQueryExecutionOutcome::DeleteCount { .. } => Err(QueryError::invariant()),
277 }
278 }
279
280 pub(in crate::db) fn execute_with_plan<E, T>(
283 &self,
284 query: &Query<E>,
285 op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
286 ) -> Result<T, QueryError>
287 where
288 E: PersistedRow<Canister = C> + EntityValue,
289 {
290 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
291
292 self.with_metrics(|| op(self.load_executor::<E>(), plan))
293 .map_err(QueryError::execute)
294 }
295}