1use crate::{
8 db::{
9 DbSession, EntityResponse, GroupedTextCursorPageWithTrace, PagedGroupedExecutionWithTrace,
10 PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
11 access::AccessStrategy,
12 cursor::{
13 CursorPlanError, GroupedContinuationToken, decode_optional_cursor_token,
14 decode_optional_grouped_cursor_token,
15 },
16 diagnostics::ExecutionTrace,
17 executor::{
18 ExecutablePlan, ExecutionStrategy, GroupedCursorPage, LoadExecutor, PageCursor,
19 },
20 query::builder::{
21 PreparedFluentAggregateExplainStrategy, PreparedFluentProjectionStrategy,
22 },
23 query::explain::{
24 ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
25 },
26 query::intent::{CompiledQuery, PlannedQuery},
27 query::plan::QueryMode,
28 },
29 error::InternalError,
30 traits::{CanisterKind, EntityKind, EntityValue, Path},
31};
32
33impl<C: CanisterKind> DbSession<C> {
34 fn with_query_visible_indexes<E, T>(
37 &self,
38 query: &Query<E>,
39 op: impl FnOnce(
40 &Query<E>,
41 &crate::db::query::plan::VisibleIndexes<'static>,
42 ) -> Result<T, QueryError>,
43 ) -> Result<T, QueryError>
44 where
45 E: EntityKind<Canister = C>,
46 {
47 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
48
49 op(query, &visible_indexes)
50 }
51
52 pub(in crate::db) fn compile_query_with_visible_indexes<E>(
55 &self,
56 query: &Query<E>,
57 ) -> Result<CompiledQuery<E>, QueryError>
58 where
59 E: EntityKind<Canister = C>,
60 {
61 self.with_query_visible_indexes(query, |query, visible_indexes| {
62 query.plan_with_visible_indexes(visible_indexes)
63 })
64 }
65
66 pub(in crate::db) fn planned_query_with_visible_indexes<E>(
69 &self,
70 query: &Query<E>,
71 ) -> Result<PlannedQuery<E>, QueryError>
72 where
73 E: EntityKind<Canister = C>,
74 {
75 self.with_query_visible_indexes(query, |query, visible_indexes| {
76 query.planned_with_visible_indexes(visible_indexes)
77 })
78 }
79
80 pub(in crate::db) fn explain_query_with_visible_indexes<E>(
82 &self,
83 query: &Query<E>,
84 ) -> Result<ExplainPlan, QueryError>
85 where
86 E: EntityKind<Canister = C>,
87 {
88 self.with_query_visible_indexes(query, |query, visible_indexes| {
89 query.explain_with_visible_indexes(visible_indexes)
90 })
91 }
92
93 pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
96 &self,
97 query: &Query<E>,
98 ) -> Result<String, QueryError>
99 where
100 E: EntityKind<Canister = C>,
101 {
102 self.with_query_visible_indexes(query, |query, visible_indexes| {
103 query.plan_hash_hex_with_visible_indexes(visible_indexes)
104 })
105 }
106
107 pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
110 &self,
111 query: &Query<E>,
112 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
113 where
114 E: EntityValue + EntityKind<Canister = C>,
115 {
116 self.with_query_visible_indexes(query, |query, visible_indexes| {
117 query.explain_execution_with_visible_indexes(visible_indexes)
118 })
119 }
120
121 pub(in crate::db) fn explain_query_execution_text_with_visible_indexes<E>(
124 &self,
125 query: &Query<E>,
126 ) -> Result<String, QueryError>
127 where
128 E: EntityValue + EntityKind<Canister = C>,
129 {
130 self.with_query_visible_indexes(query, |query, visible_indexes| {
131 query.explain_execution_text_with_visible_indexes(visible_indexes)
132 })
133 }
134
135 pub(in crate::db) fn explain_query_execution_json_with_visible_indexes<E>(
138 &self,
139 query: &Query<E>,
140 ) -> Result<String, QueryError>
141 where
142 E: EntityValue + EntityKind<Canister = C>,
143 {
144 self.with_query_visible_indexes(query, |query, visible_indexes| {
145 query.explain_execution_json_with_visible_indexes(visible_indexes)
146 })
147 }
148
149 pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
152 &self,
153 query: &Query<E>,
154 ) -> Result<String, QueryError>
155 where
156 E: EntityValue + EntityKind<Canister = C>,
157 {
158 self.with_query_visible_indexes(query, |query, visible_indexes| {
159 query.explain_execution_verbose_with_visible_indexes(visible_indexes)
160 })
161 }
162
163 pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
166 &self,
167 query: &Query<E>,
168 strategy: &S,
169 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
170 where
171 E: EntityValue + EntityKind<Canister = C>,
172 S: PreparedFluentAggregateExplainStrategy,
173 {
174 self.with_query_visible_indexes(query, |query, visible_indexes| {
175 query
176 .explain_prepared_aggregate_terminal_with_visible_indexes(visible_indexes, strategy)
177 })
178 }
179
180 pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
183 &self,
184 query: &Query<E>,
185 target_field: &str,
186 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
187 where
188 E: EntityValue + EntityKind<Canister = C>,
189 {
190 self.with_query_visible_indexes(query, |query, visible_indexes| {
191 query.explain_bytes_by_with_visible_indexes(visible_indexes, target_field)
192 })
193 }
194
195 pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
198 &self,
199 query: &Query<E>,
200 strategy: &PreparedFluentProjectionStrategy,
201 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
202 where
203 E: EntityValue + EntityKind<Canister = C>,
204 {
205 self.with_query_visible_indexes(query, |query, visible_indexes| {
206 query.explain_prepared_projection_terminal_with_visible_indexes(
207 visible_indexes,
208 strategy,
209 )
210 })
211 }
212
213 fn ensure_scalar_paged_execution_strategy(
216 strategy: ExecutionStrategy,
217 ) -> Result<(), QueryError> {
218 match strategy {
219 ExecutionStrategy::PrimaryKey => Err(QueryError::invariant(
220 CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
221 )),
222 ExecutionStrategy::Ordered => Ok(()),
223 ExecutionStrategy::Grouped => Err(QueryError::invariant(
224 "grouped plans require execute_grouped(...)",
225 )),
226 }
227 }
228
229 fn ensure_grouped_execution_strategy(strategy: ExecutionStrategy) -> Result<(), QueryError> {
232 match strategy {
233 ExecutionStrategy::Grouped => Ok(()),
234 ExecutionStrategy::PrimaryKey | ExecutionStrategy::Ordered => Err(
235 QueryError::invariant("execute_grouped requires grouped logical plans"),
236 ),
237 }
238 }
239
240 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
242 where
243 E: PersistedRow<Canister = C> + EntityValue,
244 {
245 let mode = query.mode();
247 let plan = self
248 .compile_query_with_visible_indexes(query)?
249 .into_executable();
250
251 self.execute_query_dyn(mode, plan)
253 }
254
255 #[doc(hidden)]
257 pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
258 where
259 E: PersistedRow<Canister = C> + EntityValue,
260 {
261 if !query.mode().is_delete() {
263 return Err(QueryError::unsupported_query(
264 "delete count execution requires delete query mode",
265 ));
266 }
267
268 let plan = self
270 .compile_query_with_visible_indexes(query)?
271 .into_executable();
272
273 self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
275 .map_err(QueryError::execute)
276 }
277
278 pub(in crate::db) fn execute_query_dyn<E>(
283 &self,
284 mode: QueryMode,
285 plan: ExecutablePlan<E>,
286 ) -> Result<EntityResponse<E>, QueryError>
287 where
288 E: PersistedRow<Canister = C> + EntityValue,
289 {
290 let result = match mode {
291 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
292 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
293 };
294
295 result.map_err(QueryError::execute)
296 }
297
298 pub(in crate::db) fn execute_load_query_with<E, T>(
301 &self,
302 query: &Query<E>,
303 op: impl FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
304 ) -> Result<T, QueryError>
305 where
306 E: PersistedRow<Canister = C> + EntityValue,
307 {
308 let plan = self
309 .compile_query_with_visible_indexes(query)?
310 .into_executable();
311
312 self.with_metrics(|| op(self.load_executor::<E>(), plan))
313 .map_err(QueryError::execute)
314 }
315
316 pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
321 where
322 E: EntityKind<Canister = C>,
323 {
324 let compiled = self.compile_query_with_visible_indexes(query)?;
325 let explain = compiled.explain();
326 let plan_hash = compiled.plan_hash_hex();
327
328 let executable = compiled.into_executable();
329 let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
330 let execution_strategy = match query.mode() {
331 QueryMode::Load(_) => Some(
332 executable
333 .execution_strategy()
334 .map_err(QueryError::execute)?,
335 ),
336 QueryMode::Delete(_) => None,
337 };
338
339 Ok(QueryTracePlan::new(
340 plan_hash,
341 access_strategy,
342 execution_strategy,
343 explain,
344 ))
345 }
346
347 pub(crate) fn execute_load_query_paged_with_trace<E>(
349 &self,
350 query: &Query<E>,
351 cursor_token: Option<&str>,
352 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
353 where
354 E: PersistedRow<Canister = C> + EntityValue,
355 {
356 let plan = self
358 .compile_query_with_visible_indexes(query)?
359 .into_executable();
360 Self::ensure_scalar_paged_execution_strategy(
361 plan.execution_strategy().map_err(QueryError::execute)?,
362 )?;
363
364 let cursor_bytes = decode_optional_cursor_token(cursor_token)
366 .map_err(QueryError::from_cursor_plan_error)?;
367 let cursor = plan
368 .prepare_cursor(cursor_bytes.as_deref())
369 .map_err(QueryError::from_executor_plan_error)?;
370
371 let (page, trace) = self
373 .with_metrics(|| {
374 self.load_executor::<E>()
375 .execute_paged_with_cursor_traced(plan, cursor)
376 })
377 .map_err(QueryError::execute)?;
378 let next_cursor = page
379 .next_cursor
380 .map(|token| {
381 let Some(token) = token.as_scalar() else {
382 return Err(QueryError::scalar_paged_emitted_grouped_continuation());
383 };
384
385 token.encode().map_err(|err| {
386 QueryError::serialize_internal(format!(
387 "failed to serialize continuation cursor: {err}"
388 ))
389 })
390 })
391 .transpose()?;
392
393 Ok(PagedLoadExecutionWithTrace::new(
394 page.items,
395 next_cursor,
396 trace,
397 ))
398 }
399
400 pub fn execute_grouped<E>(
405 &self,
406 query: &Query<E>,
407 cursor_token: Option<&str>,
408 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
409 where
410 E: PersistedRow<Canister = C> + EntityValue,
411 {
412 let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
413 let next_cursor = page
414 .next_cursor
415 .map(|token| {
416 let Some(token) = token.as_grouped() else {
417 return Err(QueryError::grouped_paged_emitted_scalar_continuation());
418 };
419
420 token.encode().map_err(|err| {
421 QueryError::serialize_internal(format!(
422 "failed to serialize grouped continuation cursor: {err}"
423 ))
424 })
425 })
426 .transpose()?;
427
428 Ok(PagedGroupedExecutionWithTrace::new(
429 page.rows,
430 next_cursor,
431 trace,
432 ))
433 }
434
435 #[doc(hidden)]
437 pub fn execute_grouped_text_cursor<E>(
438 &self,
439 query: &Query<E>,
440 cursor_token: Option<&str>,
441 ) -> Result<GroupedTextCursorPageWithTrace, QueryError>
442 where
443 E: PersistedRow<Canister = C> + EntityValue,
444 {
445 let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
446 let next_cursor = page
447 .next_cursor
448 .map(Self::encode_grouped_page_cursor_hex)
449 .transpose()?;
450
451 Ok((page.rows, next_cursor, trace))
452 }
453}
454
455impl<C: CanisterKind> DbSession<C> {
456 fn execute_grouped_page_with_trace<E>(
459 &self,
460 query: &Query<E>,
461 cursor_token: Option<&str>,
462 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
463 where
464 E: PersistedRow<Canister = C> + EntityValue,
465 {
466 let plan = self
468 .compile_query_with_visible_indexes(query)?
469 .into_executable();
470 Self::ensure_grouped_execution_strategy(
471 plan.execution_strategy().map_err(QueryError::execute)?,
472 )?;
473
474 let cursor = decode_optional_grouped_cursor_token(cursor_token)
476 .map_err(QueryError::from_cursor_plan_error)?;
477 let cursor = plan
478 .prepare_grouped_cursor_token(cursor)
479 .map_err(QueryError::from_executor_plan_error)?;
480
481 self.with_metrics(|| {
484 self.load_executor::<E>()
485 .execute_grouped_paged_with_cursor_traced(plan, cursor)
486 })
487 .map_err(QueryError::execute)
488 }
489
490 fn encode_grouped_page_cursor_hex(page_cursor: PageCursor) -> Result<String, QueryError> {
493 let token: &GroupedContinuationToken = page_cursor
494 .as_grouped()
495 .ok_or_else(QueryError::grouped_paged_emitted_scalar_continuation)?;
496
497 token.encode_hex().map_err(|err| {
498 QueryError::serialize_internal(format!(
499 "failed to serialize grouped continuation cursor: {err}"
500 ))
501 })
502 }
503}