icydb_core/db/session/query/
execution.rs1#[cfg(feature = "diagnostics")]
7use crate::db::executor::{GroupedExecutePhaseAttribution, ScalarExecutePhaseAttribution};
8use crate::{
9 db::{
10 DbSession, EntityResponse, LoadQueryResult, PersistedRow, Query, QueryError,
11 diagnostics::ExecutionTrace,
12 executor::{
13 ExecutionFamily, ExecutorPlanError, LoadExecutor, PreparedExecutionPlan,
14 StructuralGroupedProjectionResult,
15 },
16 query::plan::QueryMode,
17 session::finalize_structural_grouped_projection_result,
18 },
19 error::InternalError,
20 traits::{CanisterKind, EntityValue},
21};
22
23#[cfg_attr(
32 not(feature = "diagnostics"),
33 expect(
34 clippy::large_enum_variant,
35 reason = "non-diagnostics builds keep the grouped execution trace inline to avoid boxing a private session-boundary outcome"
36 )
37)]
38pub(in crate::db::session::query) enum PreparedQueryExecutionOutcome<E>
39where
40 E: PersistedRow,
41{
42 Scalar {
43 rows: EntityResponse<E>,
44 #[cfg(feature = "diagnostics")]
45 phase: Option<ScalarExecutePhaseAttribution>,
46 #[cfg(feature = "diagnostics")]
47 response_decode_local_instructions: u64,
48 },
49 Grouped {
50 result: StructuralGroupedProjectionResult,
51 trace: Option<ExecutionTrace>,
52 #[cfg(feature = "diagnostics")]
53 phase: Option<GroupedExecutePhaseAttribution>,
54 },
55 Delete {
56 rows: EntityResponse<E>,
57 },
58 DeleteCount {
59 row_count: u32,
60 },
61}
62
63#[derive(Clone, Copy, Debug, Eq, PartialEq)]
73pub(in crate::db::session::query) enum PreparedQueryExecutionOutput {
74 Rows,
75 DeleteCount,
76}
77
78pub(in crate::db::session) fn query_error_from_executor_plan_error(
81 err: ExecutorPlanError,
82) -> QueryError {
83 match err {
84 ExecutorPlanError::Cursor(err) => QueryError::from_cursor_plan_error(*err),
85 }
86}
87
88impl<C: CanisterKind> DbSession<C> {
89 pub(in crate::db::session::query) fn ensure_scalar_paged_execution_family(
92 family: ExecutionFamily,
93 ) -> Result<(), QueryError> {
94 match family {
95 ExecutionFamily::Ordered => Ok(()),
96 ExecutionFamily::PrimaryKey | ExecutionFamily::Grouped => Err(QueryError::invariant()),
97 }
98 }
99
100 pub(in crate::db::session::query) fn ensure_grouped_execution_family(
103 family: ExecutionFamily,
104 ) -> Result<(), QueryError> {
105 match family {
106 ExecutionFamily::Grouped => Ok(()),
107 ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant()),
108 }
109 }
110
111 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
113 where
114 E: PersistedRow<Canister = C> + EntityValue,
115 {
116 self.execute_query_result(query)
117 .and_then(LoadQueryResult::into_rows)
118 }
119
120 pub fn execute_scalar_query_rows<E>(
125 &self,
126 query: &Query<E>,
127 ) -> Result<EntityResponse<E>, QueryError>
128 where
129 E: PersistedRow<Canister = C> + EntityValue,
130 {
131 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
132
133 if plan.is_grouped() {
134 return Err(QueryError::invariant());
135 }
136
137 match plan.mode() {
138 QueryMode::Load(_) => self
139 .with_metrics(|| self.load_executor::<E>().execute(plan))
140 .map_err(QueryError::execute),
141 QueryMode::Delete(_) => Err(QueryError::unsupported_query()),
142 }
143 }
144
145 #[doc(hidden)]
148 pub fn execute_query_result<E>(
149 &self,
150 query: &Query<E>,
151 ) -> Result<LoadQueryResult<E>, QueryError>
152 where
153 E: PersistedRow<Canister = C> + EntityValue,
154 {
155 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
158
159 self.execute_prepared(plan, false, PreparedQueryExecutionOutput::Rows)
162 .and_then(Self::load_result_from_prepared_outcome)
163 }
164
165 #[doc(hidden)]
167 pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
168 where
169 E: PersistedRow<Canister = C> + EntityValue,
170 {
171 if !query.mode().is_delete() {
173 return Err(QueryError::unsupported_query());
174 }
175
176 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
180
181 match self.execute_prepared(plan, false, PreparedQueryExecutionOutput::DeleteCount)? {
184 PreparedQueryExecutionOutcome::DeleteCount { row_count } => Ok(row_count),
185 PreparedQueryExecutionOutcome::Scalar { .. }
186 | PreparedQueryExecutionOutcome::Grouped { .. }
187 | PreparedQueryExecutionOutcome::Delete { .. } => Err(QueryError::invariant()),
188 }
189 }
190
191 pub(in crate::db::session::query) fn execute_prepared<E>(
195 &self,
196 plan: PreparedExecutionPlan<E>,
197 collect_attribution: bool,
198 output: PreparedQueryExecutionOutput,
199 ) -> Result<PreparedQueryExecutionOutcome<E>, QueryError>
200 where
201 E: PersistedRow<Canister = C> + EntityValue,
202 {
203 #[cfg(not(feature = "diagnostics"))]
204 let _ = collect_attribution;
205
206 if plan.is_grouped() {
207 if output == PreparedQueryExecutionOutput::DeleteCount {
208 return Err(QueryError::invariant());
209 }
210
211 #[cfg(feature = "diagnostics")]
212 if collect_attribution {
213 let (result, trace, phase) =
214 self.execute_grouped_with_phase_attribution(plan, None)?;
215
216 return Ok(PreparedQueryExecutionOutcome::Grouped {
217 result,
218 trace,
219 phase: Some(phase),
220 });
221 }
222
223 let (result, trace) = self.execute_grouped_with_trace(plan, None)?;
224
225 return Ok(PreparedQueryExecutionOutcome::Grouped {
226 result,
227 trace,
228 #[cfg(feature = "diagnostics")]
229 phase: None,
230 });
231 }
232
233 match plan.mode() {
234 QueryMode::Load(_) => {
235 if output == PreparedQueryExecutionOutput::DeleteCount {
236 return Err(QueryError::invariant());
237 }
238
239 #[cfg(feature = "diagnostics")]
240 if collect_attribution {
241 let (rows, phase, response_decode_local_instructions) = self
242 .load_executor::<E>()
243 .execute_with_phase_attribution(plan)
244 .map_err(QueryError::execute)?;
245
246 return Ok(PreparedQueryExecutionOutcome::Scalar {
247 rows,
248 phase: Some(phase),
249 response_decode_local_instructions,
250 });
251 }
252
253 let rows = self
254 .with_metrics(|| self.load_executor::<E>().execute(plan))
255 .map_err(QueryError::execute)?;
256
257 Ok(PreparedQueryExecutionOutcome::Scalar {
258 rows,
259 #[cfg(feature = "diagnostics")]
260 phase: None,
261 #[cfg(feature = "diagnostics")]
262 response_decode_local_instructions: 0,
263 })
264 }
265 QueryMode::Delete(_) => match output {
266 PreparedQueryExecutionOutput::Rows => {
267 let rows = self
268 .with_metrics(|| self.delete_executor::<E>().execute(plan))
269 .map_err(QueryError::execute)?;
270
271 Ok(PreparedQueryExecutionOutcome::Delete { rows })
272 }
273 PreparedQueryExecutionOutput::DeleteCount => {
274 let row_count = self
275 .with_metrics(|| self.delete_executor::<E>().execute_count(plan))
276 .map_err(QueryError::execute)?;
277
278 Ok(PreparedQueryExecutionOutcome::DeleteCount { row_count })
279 }
280 },
281 }
282 }
283
284 fn load_result_from_prepared_outcome<E>(
288 outcome: PreparedQueryExecutionOutcome<E>,
289 ) -> Result<LoadQueryResult<E>, QueryError>
290 where
291 E: PersistedRow<Canister = C> + EntityValue,
292 {
293 match outcome {
294 PreparedQueryExecutionOutcome::Scalar { rows, .. }
295 | PreparedQueryExecutionOutcome::Delete { rows } => Ok(LoadQueryResult::Rows(rows)),
296 PreparedQueryExecutionOutcome::Grouped { result, trace, .. } => {
297 finalize_structural_grouped_projection_result(result, trace)
298 .map(LoadQueryResult::Grouped)
299 }
300 PreparedQueryExecutionOutcome::DeleteCount { .. } => Err(QueryError::invariant()),
301 }
302 }
303
304 pub(in crate::db) fn execute_with_plan<E, T>(
307 &self,
308 query: &Query<E>,
309 op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
310 ) -> Result<T, QueryError>
311 where
312 E: PersistedRow<Canister = C> + EntityValue,
313 {
314 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
315
316 self.with_metrics(|| op(self.load_executor::<E>(), plan))
317 .map_err(QueryError::execute)
318 }
319}