1use crate::{
8 db::{
9 DbSession, EntityResponse, GroupedTextCursorPageWithTrace, PagedGroupedExecutionWithTrace,
10 PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
11 access::AccessStrategy,
12 cursor::{
13 CursorPlanError, GroupedContinuationToken, decode_optional_cursor_token,
14 decode_optional_grouped_cursor_token,
15 },
16 diagnostics::ExecutionTrace,
17 executor::{
18 ExecutionFamily, GroupedCursorPage, LoadExecutor, PageCursor, PreparedExecutionPlan,
19 },
20 query::builder::{
21 PreparedFluentAggregateExplainStrategy, PreparedFluentProjectionStrategy,
22 },
23 query::explain::{
24 ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
25 },
26 query::intent::{CompiledQuery, PlannedQuery},
27 query::plan::QueryMode,
28 },
29 error::InternalError,
30 traits::{CanisterKind, EntityKind, EntityValue, Path},
31};
32
33impl<C: CanisterKind> DbSession<C> {
34 fn with_query_visible_indexes<E, T>(
37 &self,
38 query: &Query<E>,
39 op: impl FnOnce(
40 &Query<E>,
41 &crate::db::query::plan::VisibleIndexes<'static>,
42 ) -> Result<T, QueryError>,
43 ) -> Result<T, QueryError>
44 where
45 E: EntityKind<Canister = C>,
46 {
47 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
48
49 op(query, &visible_indexes)
50 }
51
52 pub(in crate::db) fn compile_query_with_visible_indexes<E>(
55 &self,
56 query: &Query<E>,
57 ) -> Result<CompiledQuery<E>, QueryError>
58 where
59 E: EntityKind<Canister = C>,
60 {
61 self.with_query_visible_indexes(query, |query, visible_indexes| {
62 query.plan_with_visible_indexes(visible_indexes)
63 })
64 }
65
66 pub(in crate::db) fn planned_query_with_visible_indexes<E>(
69 &self,
70 query: &Query<E>,
71 ) -> Result<PlannedQuery<E>, QueryError>
72 where
73 E: EntityKind<Canister = C>,
74 {
75 self.with_query_visible_indexes(query, |query, visible_indexes| {
76 query.planned_with_visible_indexes(visible_indexes)
77 })
78 }
79
80 pub(in crate::db) fn explain_query_with_visible_indexes<E>(
82 &self,
83 query: &Query<E>,
84 ) -> Result<ExplainPlan, QueryError>
85 where
86 E: EntityKind<Canister = C>,
87 {
88 self.with_query_visible_indexes(query, |query, visible_indexes| {
89 query.explain_with_visible_indexes(visible_indexes)
90 })
91 }
92
93 pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
96 &self,
97 query: &Query<E>,
98 ) -> Result<String, QueryError>
99 where
100 E: EntityKind<Canister = C>,
101 {
102 self.with_query_visible_indexes(query, |query, visible_indexes| {
103 query.plan_hash_hex_with_visible_indexes(visible_indexes)
104 })
105 }
106
107 pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
110 &self,
111 query: &Query<E>,
112 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
113 where
114 E: EntityValue + EntityKind<Canister = C>,
115 {
116 self.with_query_visible_indexes(query, |query, visible_indexes| {
117 query.explain_execution_with_visible_indexes(visible_indexes)
118 })
119 }
120
121 pub(in crate::db) fn explain_query_execution_text_with_visible_indexes<E>(
124 &self,
125 query: &Query<E>,
126 ) -> Result<String, QueryError>
127 where
128 E: EntityValue + EntityKind<Canister = C>,
129 {
130 self.with_query_visible_indexes(query, |query, visible_indexes| {
131 query.explain_execution_text_with_visible_indexes(visible_indexes)
132 })
133 }
134
135 pub(in crate::db) fn explain_query_execution_json_with_visible_indexes<E>(
138 &self,
139 query: &Query<E>,
140 ) -> Result<String, QueryError>
141 where
142 E: EntityValue + EntityKind<Canister = C>,
143 {
144 self.with_query_visible_indexes(query, |query, visible_indexes| {
145 query.explain_execution_json_with_visible_indexes(visible_indexes)
146 })
147 }
148
149 pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
152 &self,
153 query: &Query<E>,
154 ) -> Result<String, QueryError>
155 where
156 E: EntityValue + EntityKind<Canister = C>,
157 {
158 self.with_query_visible_indexes(query, |query, visible_indexes| {
159 query.explain_execution_verbose_with_visible_indexes(visible_indexes)
160 })
161 }
162
163 pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
166 &self,
167 query: &Query<E>,
168 strategy: &S,
169 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
170 where
171 E: EntityValue + EntityKind<Canister = C>,
172 S: PreparedFluentAggregateExplainStrategy,
173 {
174 self.with_query_visible_indexes(query, |query, visible_indexes| {
175 query
176 .explain_prepared_aggregate_terminal_with_visible_indexes(visible_indexes, strategy)
177 })
178 }
179
180 pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
183 &self,
184 query: &Query<E>,
185 target_field: &str,
186 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
187 where
188 E: EntityValue + EntityKind<Canister = C>,
189 {
190 self.with_query_visible_indexes(query, |query, visible_indexes| {
191 query.explain_bytes_by_with_visible_indexes(visible_indexes, target_field)
192 })
193 }
194
195 pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
198 &self,
199 query: &Query<E>,
200 strategy: &PreparedFluentProjectionStrategy,
201 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
202 where
203 E: EntityValue + EntityKind<Canister = C>,
204 {
205 self.with_query_visible_indexes(query, |query, visible_indexes| {
206 query.explain_prepared_projection_terminal_with_visible_indexes(
207 visible_indexes,
208 strategy,
209 )
210 })
211 }
212
213 fn ensure_scalar_paged_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
216 match family {
217 ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
218 CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
219 )),
220 ExecutionFamily::Ordered => Ok(()),
221 ExecutionFamily::Grouped => Err(QueryError::invariant(
222 "grouped plans require execute_grouped(...)",
223 )),
224 }
225 }
226
227 fn ensure_grouped_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
230 match family {
231 ExecutionFamily::Grouped => Ok(()),
232 ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
233 "execute_grouped requires grouped logical plans",
234 )),
235 }
236 }
237
238 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
240 where
241 E: PersistedRow<Canister = C> + EntityValue,
242 {
243 let mode = query.mode();
245 let plan = self
246 .compile_query_with_visible_indexes(query)?
247 .into_prepared_execution_plan();
248
249 self.execute_query_dyn(mode, plan)
251 }
252
253 #[doc(hidden)]
255 pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
256 where
257 E: PersistedRow<Canister = C> + EntityValue,
258 {
259 if !query.mode().is_delete() {
261 return Err(QueryError::unsupported_query(
262 "delete count execution requires delete query mode",
263 ));
264 }
265
266 let plan = self
268 .compile_query_with_visible_indexes(query)?
269 .into_prepared_execution_plan();
270
271 self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
273 .map_err(QueryError::execute)
274 }
275
276 pub(in crate::db) fn execute_query_dyn<E>(
281 &self,
282 mode: QueryMode,
283 plan: PreparedExecutionPlan<E>,
284 ) -> Result<EntityResponse<E>, QueryError>
285 where
286 E: PersistedRow<Canister = C> + EntityValue,
287 {
288 let result = match mode {
289 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
290 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
291 };
292
293 result.map_err(QueryError::execute)
294 }
295
296 pub(in crate::db) fn execute_load_query_with<E, T>(
299 &self,
300 query: &Query<E>,
301 op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
302 ) -> Result<T, QueryError>
303 where
304 E: PersistedRow<Canister = C> + EntityValue,
305 {
306 let plan = self
307 .compile_query_with_visible_indexes(query)?
308 .into_prepared_execution_plan();
309
310 self.with_metrics(|| op(self.load_executor::<E>(), plan))
311 .map_err(QueryError::execute)
312 }
313
314 pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
319 where
320 E: EntityKind<Canister = C>,
321 {
322 let compiled = self.compile_query_with_visible_indexes(query)?;
323 let explain = compiled.explain();
324 let plan_hash = compiled.plan_hash_hex();
325
326 let executable = compiled.into_prepared_execution_plan();
327 let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
328 let execution_family = match query.mode() {
329 QueryMode::Load(_) => Some(executable.execution_family().map_err(QueryError::execute)?),
330 QueryMode::Delete(_) => None,
331 };
332
333 Ok(QueryTracePlan::new(
334 plan_hash,
335 access_strategy,
336 execution_family,
337 explain,
338 ))
339 }
340
341 pub(crate) fn execute_load_query_paged_with_trace<E>(
343 &self,
344 query: &Query<E>,
345 cursor_token: Option<&str>,
346 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
347 where
348 E: PersistedRow<Canister = C> + EntityValue,
349 {
350 let plan = self
352 .compile_query_with_visible_indexes(query)?
353 .into_prepared_execution_plan();
354 Self::ensure_scalar_paged_execution_family(
355 plan.execution_family().map_err(QueryError::execute)?,
356 )?;
357
358 let cursor_bytes = decode_optional_cursor_token(cursor_token)
360 .map_err(QueryError::from_cursor_plan_error)?;
361 let cursor = plan
362 .prepare_cursor(cursor_bytes.as_deref())
363 .map_err(QueryError::from_executor_plan_error)?;
364
365 let (page, trace) = self
367 .with_metrics(|| {
368 self.load_executor::<E>()
369 .execute_paged_with_cursor_traced(plan, cursor)
370 })
371 .map_err(QueryError::execute)?;
372 let next_cursor = page
373 .next_cursor
374 .map(|token| {
375 let Some(token) = token.as_scalar() else {
376 return Err(QueryError::scalar_paged_emitted_grouped_continuation());
377 };
378
379 token.encode().map_err(|err| {
380 QueryError::serialize_internal(format!(
381 "failed to serialize continuation cursor: {err}"
382 ))
383 })
384 })
385 .transpose()?;
386
387 Ok(PagedLoadExecutionWithTrace::new(
388 page.items,
389 next_cursor,
390 trace,
391 ))
392 }
393
394 pub fn execute_grouped<E>(
399 &self,
400 query: &Query<E>,
401 cursor_token: Option<&str>,
402 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
403 where
404 E: PersistedRow<Canister = C> + EntityValue,
405 {
406 let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
407 let next_cursor = page
408 .next_cursor
409 .map(|token| {
410 let Some(token) = token.as_grouped() else {
411 return Err(QueryError::grouped_paged_emitted_scalar_continuation());
412 };
413
414 token.encode().map_err(|err| {
415 QueryError::serialize_internal(format!(
416 "failed to serialize grouped continuation cursor: {err}"
417 ))
418 })
419 })
420 .transpose()?;
421
422 Ok(PagedGroupedExecutionWithTrace::new(
423 page.rows,
424 next_cursor,
425 trace,
426 ))
427 }
428
429 #[doc(hidden)]
431 pub fn execute_grouped_text_cursor<E>(
432 &self,
433 query: &Query<E>,
434 cursor_token: Option<&str>,
435 ) -> Result<GroupedTextCursorPageWithTrace, QueryError>
436 where
437 E: PersistedRow<Canister = C> + EntityValue,
438 {
439 let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
440 let next_cursor = page
441 .next_cursor
442 .map(Self::encode_grouped_page_cursor_hex)
443 .transpose()?;
444
445 Ok((page.rows, next_cursor, trace))
446 }
447}
448
449impl<C: CanisterKind> DbSession<C> {
450 fn execute_grouped_page_with_trace<E>(
453 &self,
454 query: &Query<E>,
455 cursor_token: Option<&str>,
456 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
457 where
458 E: PersistedRow<Canister = C> + EntityValue,
459 {
460 let plan = self
462 .compile_query_with_visible_indexes(query)?
463 .into_prepared_execution_plan();
464 Self::ensure_grouped_execution_family(
465 plan.execution_family().map_err(QueryError::execute)?,
466 )?;
467
468 let cursor = decode_optional_grouped_cursor_token(cursor_token)
470 .map_err(QueryError::from_cursor_plan_error)?;
471 let cursor = plan
472 .prepare_grouped_cursor_token(cursor)
473 .map_err(QueryError::from_executor_plan_error)?;
474
475 self.with_metrics(|| {
478 self.load_executor::<E>()
479 .execute_grouped_paged_with_cursor_traced(plan, cursor)
480 })
481 .map_err(QueryError::execute)
482 }
483
484 fn encode_grouped_page_cursor_hex(page_cursor: PageCursor) -> Result<String, QueryError> {
487 let token: &GroupedContinuationToken = page_cursor
488 .as_grouped()
489 .ok_or_else(QueryError::grouped_paged_emitted_scalar_continuation)?;
490
491 token.encode_hex().map_err(|err| {
492 QueryError::serialize_internal(format!(
493 "failed to serialize grouped continuation cursor: {err}"
494 ))
495 })
496 }
497}