1use crate::{
7 db::{
8 DbSession, EntityResponse, GroupedTextCursorPageWithTrace, PagedGroupedExecutionWithTrace,
9 PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
10 TraceExecutionStrategy,
11 access::AccessStrategy,
12 cursor::{CursorPlanError, GroupedContinuationToken},
13 diagnostics::ExecutionTrace,
14 executor::{
15 ExecutablePlan, ExecutionStrategy, GroupedCursorPage, LoadExecutor, PageCursor,
16 },
17 query::builder::aggregate::AggregateExpr,
18 query::explain::{
19 ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
20 },
21 query::intent::{CompiledQuery, PlannedQuery},
22 query::plan::QueryMode,
23 session::{decode_optional_cursor_bytes, decode_optional_grouped_cursor},
24 },
25 error::InternalError,
26 traits::{CanisterKind, EntityKind, EntityValue, Path},
27};
28
29impl<C: CanisterKind> DbSession<C> {
30 pub(in crate::db) fn compile_query_with_visible_indexes<E>(
33 &self,
34 query: &Query<E>,
35 ) -> Result<CompiledQuery<E>, QueryError>
36 where
37 E: EntityKind<Canister = C>,
38 {
39 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
40
41 query.plan_with_visible_indexes(&visible_indexes)
42 }
43
44 pub(in crate::db) fn planned_query_with_visible_indexes<E>(
47 &self,
48 query: &Query<E>,
49 ) -> Result<PlannedQuery<E>, QueryError>
50 where
51 E: EntityKind<Canister = C>,
52 {
53 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
54
55 query.planned_with_visible_indexes(&visible_indexes)
56 }
57
58 pub(in crate::db) fn explain_query_with_visible_indexes<E>(
60 &self,
61 query: &Query<E>,
62 ) -> Result<ExplainPlan, QueryError>
63 where
64 E: EntityKind<Canister = C>,
65 {
66 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
67
68 query.explain_with_visible_indexes(&visible_indexes)
69 }
70
71 pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
74 &self,
75 query: &Query<E>,
76 ) -> Result<String, QueryError>
77 where
78 E: EntityKind<Canister = C>,
79 {
80 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
81
82 query.plan_hash_hex_with_visible_indexes(&visible_indexes)
83 }
84
85 pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
88 &self,
89 query: &Query<E>,
90 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
91 where
92 E: EntityValue + EntityKind<Canister = C>,
93 {
94 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
95
96 query.explain_execution_with_visible_indexes(&visible_indexes)
97 }
98
99 pub(in crate::db) fn explain_query_execution_text_with_visible_indexes<E>(
102 &self,
103 query: &Query<E>,
104 ) -> Result<String, QueryError>
105 where
106 E: EntityValue + EntityKind<Canister = C>,
107 {
108 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
109
110 query.explain_execution_text_with_visible_indexes(&visible_indexes)
111 }
112
113 pub(in crate::db) fn explain_query_execution_json_with_visible_indexes<E>(
116 &self,
117 query: &Query<E>,
118 ) -> Result<String, QueryError>
119 where
120 E: EntityValue + EntityKind<Canister = C>,
121 {
122 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
123
124 query.explain_execution_json_with_visible_indexes(&visible_indexes)
125 }
126
127 pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
130 &self,
131 query: &Query<E>,
132 ) -> Result<String, QueryError>
133 where
134 E: EntityValue + EntityKind<Canister = C>,
135 {
136 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
137
138 query.explain_execution_verbose_with_visible_indexes(&visible_indexes)
139 }
140
141 pub(in crate::db) fn explain_query_aggregate_terminal_with_visible_indexes<E>(
144 &self,
145 query: &Query<E>,
146 aggregate: AggregateExpr,
147 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
148 where
149 E: EntityValue + EntityKind<Canister = C>,
150 {
151 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
152
153 query.explain_aggregate_terminal_with_visible_indexes(&visible_indexes, aggregate)
154 }
155
156 pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
159 &self,
160 query: &Query<E>,
161 target_field: &str,
162 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
163 where
164 E: EntityValue + EntityKind<Canister = C>,
165 {
166 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
167
168 query.explain_bytes_by_with_visible_indexes(&visible_indexes, target_field)
169 }
170
171 fn ensure_scalar_paged_execution_strategy(
174 strategy: ExecutionStrategy,
175 ) -> Result<(), QueryError> {
176 match strategy {
177 ExecutionStrategy::PrimaryKey => Err(QueryError::invariant(
178 CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
179 )),
180 ExecutionStrategy::Ordered => Ok(()),
181 ExecutionStrategy::Grouped => Err(QueryError::invariant(
182 "grouped plans require execute_grouped(...)",
183 )),
184 }
185 }
186
187 fn ensure_grouped_execution_strategy(strategy: ExecutionStrategy) -> Result<(), QueryError> {
190 match strategy {
191 ExecutionStrategy::Grouped => Ok(()),
192 ExecutionStrategy::PrimaryKey | ExecutionStrategy::Ordered => Err(
193 QueryError::invariant("execute_grouped requires grouped logical plans"),
194 ),
195 }
196 }
197
198 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
200 where
201 E: PersistedRow<Canister = C> + EntityValue,
202 {
203 let mode = query.mode();
205 let plan = self
206 .compile_query_with_visible_indexes(query)?
207 .into_executable();
208
209 self.execute_query_dyn(mode, plan)
211 }
212
213 #[doc(hidden)]
215 pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
216 where
217 E: PersistedRow<Canister = C> + EntityValue,
218 {
219 if !query.mode().is_delete() {
221 return Err(QueryError::unsupported_query(
222 "delete count execution requires delete query mode",
223 ));
224 }
225
226 let plan = self
228 .compile_query_with_visible_indexes(query)?
229 .into_executable();
230
231 self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
233 .map_err(QueryError::execute)
234 }
235
236 pub(in crate::db) fn execute_query_dyn<E>(
241 &self,
242 mode: QueryMode,
243 plan: ExecutablePlan<E>,
244 ) -> Result<EntityResponse<E>, QueryError>
245 where
246 E: PersistedRow<Canister = C> + EntityValue,
247 {
248 let result = match mode {
249 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
250 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
251 };
252
253 result.map_err(QueryError::execute)
254 }
255
256 pub(in crate::db) fn execute_load_query_with<E, T>(
259 &self,
260 query: &Query<E>,
261 op: impl FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
262 ) -> Result<T, QueryError>
263 where
264 E: PersistedRow<Canister = C> + EntityValue,
265 {
266 let plan = self
267 .compile_query_with_visible_indexes(query)?
268 .into_executable();
269
270 self.with_metrics(|| op(self.load_executor::<E>(), plan))
271 .map_err(QueryError::execute)
272 }
273
274 pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
279 where
280 E: EntityKind<Canister = C>,
281 {
282 let compiled = self.compile_query_with_visible_indexes(query)?;
283 let explain = compiled.explain();
284 let plan_hash = compiled.plan_hash_hex();
285
286 let executable = compiled.into_executable();
287 let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
288 let execution_strategy = match query.mode() {
289 QueryMode::Load(_) => Some(trace_execution_strategy(
290 executable
291 .execution_strategy()
292 .map_err(QueryError::execute)?,
293 )),
294 QueryMode::Delete(_) => None,
295 };
296
297 Ok(QueryTracePlan::new(
298 plan_hash,
299 access_strategy,
300 execution_strategy,
301 explain,
302 ))
303 }
304
305 pub(crate) fn execute_load_query_paged_with_trace<E>(
307 &self,
308 query: &Query<E>,
309 cursor_token: Option<&str>,
310 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
311 where
312 E: PersistedRow<Canister = C> + EntityValue,
313 {
314 let plan = self
316 .compile_query_with_visible_indexes(query)?
317 .into_executable();
318 Self::ensure_scalar_paged_execution_strategy(
319 plan.execution_strategy().map_err(QueryError::execute)?,
320 )?;
321
322 let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
324 let cursor = plan
325 .prepare_cursor(cursor_bytes.as_deref())
326 .map_err(QueryError::from_executor_plan_error)?;
327
328 let (page, trace) = self
330 .with_metrics(|| {
331 self.load_executor::<E>()
332 .execute_paged_with_cursor_traced(plan, cursor)
333 })
334 .map_err(QueryError::execute)?;
335 let next_cursor = page
336 .next_cursor
337 .map(|token| {
338 let Some(token) = token.as_scalar() else {
339 return Err(QueryError::scalar_paged_emitted_grouped_continuation());
340 };
341
342 token.encode().map_err(|err| {
343 QueryError::serialize_internal(format!(
344 "failed to serialize continuation cursor: {err}"
345 ))
346 })
347 })
348 .transpose()?;
349
350 Ok(PagedLoadExecutionWithTrace::new(
351 page.items,
352 next_cursor,
353 trace,
354 ))
355 }
356
357 pub fn execute_grouped<E>(
362 &self,
363 query: &Query<E>,
364 cursor_token: Option<&str>,
365 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
366 where
367 E: PersistedRow<Canister = C> + EntityValue,
368 {
369 let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
370 let next_cursor = page
371 .next_cursor
372 .map(|token| {
373 let Some(token) = token.as_grouped() else {
374 return Err(QueryError::grouped_paged_emitted_scalar_continuation());
375 };
376
377 token.encode().map_err(|err| {
378 QueryError::serialize_internal(format!(
379 "failed to serialize grouped continuation cursor: {err}"
380 ))
381 })
382 })
383 .transpose()?;
384
385 Ok(PagedGroupedExecutionWithTrace::new(
386 page.rows,
387 next_cursor,
388 trace,
389 ))
390 }
391
392 #[doc(hidden)]
394 pub fn execute_grouped_text_cursor<E>(
395 &self,
396 query: &Query<E>,
397 cursor_token: Option<&str>,
398 ) -> Result<GroupedTextCursorPageWithTrace, QueryError>
399 where
400 E: PersistedRow<Canister = C> + EntityValue,
401 {
402 let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
403 let next_cursor = page
404 .next_cursor
405 .map(Self::encode_grouped_page_cursor_hex)
406 .transpose()?;
407
408 Ok((page.rows, next_cursor, trace))
409 }
410}
411
412impl<C: CanisterKind> DbSession<C> {
413 fn execute_grouped_page_with_trace<E>(
416 &self,
417 query: &Query<E>,
418 cursor_token: Option<&str>,
419 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
420 where
421 E: PersistedRow<Canister = C> + EntityValue,
422 {
423 let plan = self
425 .compile_query_with_visible_indexes(query)?
426 .into_executable();
427 Self::ensure_grouped_execution_strategy(
428 plan.execution_strategy().map_err(QueryError::execute)?,
429 )?;
430
431 let cursor = decode_optional_grouped_cursor(cursor_token)?;
433 let cursor = plan
434 .prepare_grouped_cursor_token(cursor)
435 .map_err(QueryError::from_executor_plan_error)?;
436
437 self.with_metrics(|| {
440 self.load_executor::<E>()
441 .execute_grouped_paged_with_cursor_traced(plan, cursor)
442 })
443 .map_err(QueryError::execute)
444 }
445
446 fn encode_grouped_page_cursor_hex(page_cursor: PageCursor) -> Result<String, QueryError> {
449 let token: &GroupedContinuationToken = page_cursor
450 .as_grouped()
451 .ok_or_else(QueryError::grouped_paged_emitted_scalar_continuation)?;
452
453 token.encode_hex().map_err(|err| {
454 QueryError::serialize_internal(format!(
455 "failed to serialize grouped continuation cursor: {err}"
456 ))
457 })
458 }
459}
460
461const fn trace_execution_strategy(strategy: ExecutionStrategy) -> TraceExecutionStrategy {
462 match strategy {
463 ExecutionStrategy::PrimaryKey => TraceExecutionStrategy::PrimaryKey,
464 ExecutionStrategy::Ordered => TraceExecutionStrategy::Ordered,
465 ExecutionStrategy::Grouped => TraceExecutionStrategy::Grouped,
466 }
467}