1use crate::{
7 db::{
8 DbSession, EntityResponse, GroupedTextCursorPageWithTrace, PagedGroupedExecutionWithTrace,
9 PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
10 TraceExecutionStrategy,
11 access::AccessStrategy,
12 cursor::{CursorPlanError, GroupedContinuationToken},
13 diagnostics::ExecutionTrace,
14 executor::{
15 ExecutablePlan, ExecutionStrategy, GroupedCursorPage, LoadExecutor, PageCursor,
16 },
17 query::builder::{
18 PreparedFluentAggregateExplainStrategy, PreparedFluentProjectionStrategy,
19 },
20 query::explain::{
21 ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
22 },
23 query::intent::{CompiledQuery, PlannedQuery},
24 query::plan::QueryMode,
25 session::{decode_optional_cursor_bytes, decode_optional_grouped_cursor},
26 },
27 error::InternalError,
28 traits::{CanisterKind, EntityKind, EntityValue, Path},
29};
30
31impl<C: CanisterKind> DbSession<C> {
32 pub(in crate::db) fn compile_query_with_visible_indexes<E>(
35 &self,
36 query: &Query<E>,
37 ) -> Result<CompiledQuery<E>, QueryError>
38 where
39 E: EntityKind<Canister = C>,
40 {
41 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
42
43 query.plan_with_visible_indexes(&visible_indexes)
44 }
45
46 pub(in crate::db) fn planned_query_with_visible_indexes<E>(
49 &self,
50 query: &Query<E>,
51 ) -> Result<PlannedQuery<E>, QueryError>
52 where
53 E: EntityKind<Canister = C>,
54 {
55 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
56
57 query.planned_with_visible_indexes(&visible_indexes)
58 }
59
60 pub(in crate::db) fn explain_query_with_visible_indexes<E>(
62 &self,
63 query: &Query<E>,
64 ) -> Result<ExplainPlan, QueryError>
65 where
66 E: EntityKind<Canister = C>,
67 {
68 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
69
70 query.explain_with_visible_indexes(&visible_indexes)
71 }
72
73 pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
76 &self,
77 query: &Query<E>,
78 ) -> Result<String, QueryError>
79 where
80 E: EntityKind<Canister = C>,
81 {
82 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
83
84 query.plan_hash_hex_with_visible_indexes(&visible_indexes)
85 }
86
87 pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
90 &self,
91 query: &Query<E>,
92 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
93 where
94 E: EntityValue + EntityKind<Canister = C>,
95 {
96 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
97
98 query.explain_execution_with_visible_indexes(&visible_indexes)
99 }
100
101 pub(in crate::db) fn explain_query_execution_text_with_visible_indexes<E>(
104 &self,
105 query: &Query<E>,
106 ) -> Result<String, QueryError>
107 where
108 E: EntityValue + EntityKind<Canister = C>,
109 {
110 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
111
112 query.explain_execution_text_with_visible_indexes(&visible_indexes)
113 }
114
115 pub(in crate::db) fn explain_query_execution_json_with_visible_indexes<E>(
118 &self,
119 query: &Query<E>,
120 ) -> Result<String, QueryError>
121 where
122 E: EntityValue + EntityKind<Canister = C>,
123 {
124 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
125
126 query.explain_execution_json_with_visible_indexes(&visible_indexes)
127 }
128
129 pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
132 &self,
133 query: &Query<E>,
134 ) -> Result<String, QueryError>
135 where
136 E: EntityValue + EntityKind<Canister = C>,
137 {
138 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
139
140 query.explain_execution_verbose_with_visible_indexes(&visible_indexes)
141 }
142
143 pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
146 &self,
147 query: &Query<E>,
148 strategy: &S,
149 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
150 where
151 E: EntityValue + EntityKind<Canister = C>,
152 S: PreparedFluentAggregateExplainStrategy,
153 {
154 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
155
156 query.explain_prepared_aggregate_terminal_with_visible_indexes(&visible_indexes, strategy)
157 }
158
159 pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
162 &self,
163 query: &Query<E>,
164 target_field: &str,
165 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
166 where
167 E: EntityValue + EntityKind<Canister = C>,
168 {
169 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
170
171 query.explain_bytes_by_with_visible_indexes(&visible_indexes, target_field)
172 }
173
174 pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
177 &self,
178 query: &Query<E>,
179 strategy: &PreparedFluentProjectionStrategy,
180 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
181 where
182 E: EntityValue + EntityKind<Canister = C>,
183 {
184 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
185
186 query.explain_prepared_projection_terminal_with_visible_indexes(&visible_indexes, strategy)
187 }
188
189 fn ensure_scalar_paged_execution_strategy(
192 strategy: ExecutionStrategy,
193 ) -> Result<(), QueryError> {
194 match strategy {
195 ExecutionStrategy::PrimaryKey => Err(QueryError::invariant(
196 CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
197 )),
198 ExecutionStrategy::Ordered => Ok(()),
199 ExecutionStrategy::Grouped => Err(QueryError::invariant(
200 "grouped plans require execute_grouped(...)",
201 )),
202 }
203 }
204
205 fn ensure_grouped_execution_strategy(strategy: ExecutionStrategy) -> Result<(), QueryError> {
208 match strategy {
209 ExecutionStrategy::Grouped => Ok(()),
210 ExecutionStrategy::PrimaryKey | ExecutionStrategy::Ordered => Err(
211 QueryError::invariant("execute_grouped requires grouped logical plans"),
212 ),
213 }
214 }
215
216 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
218 where
219 E: PersistedRow<Canister = C> + EntityValue,
220 {
221 let mode = query.mode();
223 let plan = self
224 .compile_query_with_visible_indexes(query)?
225 .into_executable();
226
227 self.execute_query_dyn(mode, plan)
229 }
230
231 #[doc(hidden)]
233 pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
234 where
235 E: PersistedRow<Canister = C> + EntityValue,
236 {
237 if !query.mode().is_delete() {
239 return Err(QueryError::unsupported_query(
240 "delete count execution requires delete query mode",
241 ));
242 }
243
244 let plan = self
246 .compile_query_with_visible_indexes(query)?
247 .into_executable();
248
249 self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
251 .map_err(QueryError::execute)
252 }
253
254 pub(in crate::db) fn execute_query_dyn<E>(
259 &self,
260 mode: QueryMode,
261 plan: ExecutablePlan<E>,
262 ) -> Result<EntityResponse<E>, QueryError>
263 where
264 E: PersistedRow<Canister = C> + EntityValue,
265 {
266 let result = match mode {
267 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
268 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
269 };
270
271 result.map_err(QueryError::execute)
272 }
273
274 pub(in crate::db) fn execute_load_query_with<E, T>(
277 &self,
278 query: &Query<E>,
279 op: impl FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
280 ) -> Result<T, QueryError>
281 where
282 E: PersistedRow<Canister = C> + EntityValue,
283 {
284 let plan = self
285 .compile_query_with_visible_indexes(query)?
286 .into_executable();
287
288 self.with_metrics(|| op(self.load_executor::<E>(), plan))
289 .map_err(QueryError::execute)
290 }
291
292 pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
297 where
298 E: EntityKind<Canister = C>,
299 {
300 let compiled = self.compile_query_with_visible_indexes(query)?;
301 let explain = compiled.explain();
302 let plan_hash = compiled.plan_hash_hex();
303
304 let executable = compiled.into_executable();
305 let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
306 let execution_strategy = match query.mode() {
307 QueryMode::Load(_) => Some(trace_execution_strategy(
308 executable
309 .execution_strategy()
310 .map_err(QueryError::execute)?,
311 )),
312 QueryMode::Delete(_) => None,
313 };
314
315 Ok(QueryTracePlan::new(
316 plan_hash,
317 access_strategy,
318 execution_strategy,
319 explain,
320 ))
321 }
322
323 pub(crate) fn execute_load_query_paged_with_trace<E>(
325 &self,
326 query: &Query<E>,
327 cursor_token: Option<&str>,
328 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
329 where
330 E: PersistedRow<Canister = C> + EntityValue,
331 {
332 let plan = self
334 .compile_query_with_visible_indexes(query)?
335 .into_executable();
336 Self::ensure_scalar_paged_execution_strategy(
337 plan.execution_strategy().map_err(QueryError::execute)?,
338 )?;
339
340 let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
342 let cursor = plan
343 .prepare_cursor(cursor_bytes.as_deref())
344 .map_err(QueryError::from_executor_plan_error)?;
345
346 let (page, trace) = self
348 .with_metrics(|| {
349 self.load_executor::<E>()
350 .execute_paged_with_cursor_traced(plan, cursor)
351 })
352 .map_err(QueryError::execute)?;
353 let next_cursor = page
354 .next_cursor
355 .map(|token| {
356 let Some(token) = token.as_scalar() else {
357 return Err(QueryError::scalar_paged_emitted_grouped_continuation());
358 };
359
360 token.encode().map_err(|err| {
361 QueryError::serialize_internal(format!(
362 "failed to serialize continuation cursor: {err}"
363 ))
364 })
365 })
366 .transpose()?;
367
368 Ok(PagedLoadExecutionWithTrace::new(
369 page.items,
370 next_cursor,
371 trace,
372 ))
373 }
374
375 pub fn execute_grouped<E>(
380 &self,
381 query: &Query<E>,
382 cursor_token: Option<&str>,
383 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
384 where
385 E: PersistedRow<Canister = C> + EntityValue,
386 {
387 let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
388 let next_cursor = page
389 .next_cursor
390 .map(|token| {
391 let Some(token) = token.as_grouped() else {
392 return Err(QueryError::grouped_paged_emitted_scalar_continuation());
393 };
394
395 token.encode().map_err(|err| {
396 QueryError::serialize_internal(format!(
397 "failed to serialize grouped continuation cursor: {err}"
398 ))
399 })
400 })
401 .transpose()?;
402
403 Ok(PagedGroupedExecutionWithTrace::new(
404 page.rows,
405 next_cursor,
406 trace,
407 ))
408 }
409
410 #[doc(hidden)]
412 pub fn execute_grouped_text_cursor<E>(
413 &self,
414 query: &Query<E>,
415 cursor_token: Option<&str>,
416 ) -> Result<GroupedTextCursorPageWithTrace, QueryError>
417 where
418 E: PersistedRow<Canister = C> + EntityValue,
419 {
420 let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
421 let next_cursor = page
422 .next_cursor
423 .map(Self::encode_grouped_page_cursor_hex)
424 .transpose()?;
425
426 Ok((page.rows, next_cursor, trace))
427 }
428}
429
430impl<C: CanisterKind> DbSession<C> {
431 fn execute_grouped_page_with_trace<E>(
434 &self,
435 query: &Query<E>,
436 cursor_token: Option<&str>,
437 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
438 where
439 E: PersistedRow<Canister = C> + EntityValue,
440 {
441 let plan = self
443 .compile_query_with_visible_indexes(query)?
444 .into_executable();
445 Self::ensure_grouped_execution_strategy(
446 plan.execution_strategy().map_err(QueryError::execute)?,
447 )?;
448
449 let cursor = decode_optional_grouped_cursor(cursor_token)?;
451 let cursor = plan
452 .prepare_grouped_cursor_token(cursor)
453 .map_err(QueryError::from_executor_plan_error)?;
454
455 self.with_metrics(|| {
458 self.load_executor::<E>()
459 .execute_grouped_paged_with_cursor_traced(plan, cursor)
460 })
461 .map_err(QueryError::execute)
462 }
463
464 fn encode_grouped_page_cursor_hex(page_cursor: PageCursor) -> Result<String, QueryError> {
467 let token: &GroupedContinuationToken = page_cursor
468 .as_grouped()
469 .ok_or_else(QueryError::grouped_paged_emitted_scalar_continuation)?;
470
471 token.encode_hex().map_err(|err| {
472 QueryError::serialize_internal(format!(
473 "failed to serialize grouped continuation cursor: {err}"
474 ))
475 })
476 }
477}
478
479const fn trace_execution_strategy(strategy: ExecutionStrategy) -> TraceExecutionStrategy {
480 match strategy {
481 ExecutionStrategy::PrimaryKey => TraceExecutionStrategy::PrimaryKey,
482 ExecutionStrategy::Ordered => TraceExecutionStrategy::Ordered,
483 ExecutionStrategy::Grouped => TraceExecutionStrategy::Grouped,
484 }
485}