1use crate::{
7 db::{
8 DbSession, EntityResponse, GroupedTextCursorPageWithTrace, PagedGroupedExecutionWithTrace,
9 PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
10 access::AccessStrategy,
11 cursor::{
12 CursorPlanError, GroupedContinuationToken, decode_optional_cursor_token,
13 decode_optional_grouped_cursor_token,
14 },
15 diagnostics::ExecutionTrace,
16 executor::{
17 ExecutablePlan, ExecutionStrategy, GroupedCursorPage, LoadExecutor, PageCursor,
18 },
19 query::builder::{
20 PreparedFluentAggregateExplainStrategy, PreparedFluentProjectionStrategy,
21 },
22 query::explain::{
23 ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
24 },
25 query::intent::{CompiledQuery, PlannedQuery},
26 query::plan::QueryMode,
27 },
28 error::InternalError,
29 traits::{CanisterKind, EntityKind, EntityValue, Path},
30};
31
32impl<C: CanisterKind> DbSession<C> {
33 pub(in crate::db) fn compile_query_with_visible_indexes<E>(
36 &self,
37 query: &Query<E>,
38 ) -> Result<CompiledQuery<E>, QueryError>
39 where
40 E: EntityKind<Canister = C>,
41 {
42 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
43
44 query.plan_with_visible_indexes(&visible_indexes)
45 }
46
47 pub(in crate::db) fn planned_query_with_visible_indexes<E>(
50 &self,
51 query: &Query<E>,
52 ) -> Result<PlannedQuery<E>, QueryError>
53 where
54 E: EntityKind<Canister = C>,
55 {
56 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
57
58 query.planned_with_visible_indexes(&visible_indexes)
59 }
60
61 pub(in crate::db) fn explain_query_with_visible_indexes<E>(
63 &self,
64 query: &Query<E>,
65 ) -> Result<ExplainPlan, QueryError>
66 where
67 E: EntityKind<Canister = C>,
68 {
69 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
70
71 query.explain_with_visible_indexes(&visible_indexes)
72 }
73
74 pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
77 &self,
78 query: &Query<E>,
79 ) -> Result<String, QueryError>
80 where
81 E: EntityKind<Canister = C>,
82 {
83 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
84
85 query.plan_hash_hex_with_visible_indexes(&visible_indexes)
86 }
87
88 pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
91 &self,
92 query: &Query<E>,
93 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
94 where
95 E: EntityValue + EntityKind<Canister = C>,
96 {
97 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
98
99 query.explain_execution_with_visible_indexes(&visible_indexes)
100 }
101
102 pub(in crate::db) fn explain_query_execution_text_with_visible_indexes<E>(
105 &self,
106 query: &Query<E>,
107 ) -> Result<String, QueryError>
108 where
109 E: EntityValue + EntityKind<Canister = C>,
110 {
111 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
112
113 query.explain_execution_text_with_visible_indexes(&visible_indexes)
114 }
115
116 pub(in crate::db) fn explain_query_execution_json_with_visible_indexes<E>(
119 &self,
120 query: &Query<E>,
121 ) -> Result<String, QueryError>
122 where
123 E: EntityValue + EntityKind<Canister = C>,
124 {
125 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
126
127 query.explain_execution_json_with_visible_indexes(&visible_indexes)
128 }
129
130 pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
133 &self,
134 query: &Query<E>,
135 ) -> Result<String, QueryError>
136 where
137 E: EntityValue + EntityKind<Canister = C>,
138 {
139 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
140
141 query.explain_execution_verbose_with_visible_indexes(&visible_indexes)
142 }
143
144 pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
147 &self,
148 query: &Query<E>,
149 strategy: &S,
150 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
151 where
152 E: EntityValue + EntityKind<Canister = C>,
153 S: PreparedFluentAggregateExplainStrategy,
154 {
155 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
156
157 query.explain_prepared_aggregate_terminal_with_visible_indexes(&visible_indexes, strategy)
158 }
159
160 pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
163 &self,
164 query: &Query<E>,
165 target_field: &str,
166 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
167 where
168 E: EntityValue + EntityKind<Canister = C>,
169 {
170 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
171
172 query.explain_bytes_by_with_visible_indexes(&visible_indexes, target_field)
173 }
174
175 pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
178 &self,
179 query: &Query<E>,
180 strategy: &PreparedFluentProjectionStrategy,
181 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
182 where
183 E: EntityValue + EntityKind<Canister = C>,
184 {
185 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
186
187 query.explain_prepared_projection_terminal_with_visible_indexes(&visible_indexes, strategy)
188 }
189
190 fn ensure_scalar_paged_execution_strategy(
193 strategy: ExecutionStrategy,
194 ) -> Result<(), QueryError> {
195 match strategy {
196 ExecutionStrategy::PrimaryKey => Err(QueryError::invariant(
197 CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
198 )),
199 ExecutionStrategy::Ordered => Ok(()),
200 ExecutionStrategy::Grouped => Err(QueryError::invariant(
201 "grouped plans require execute_grouped(...)",
202 )),
203 }
204 }
205
206 fn ensure_grouped_execution_strategy(strategy: ExecutionStrategy) -> Result<(), QueryError> {
209 match strategy {
210 ExecutionStrategy::Grouped => Ok(()),
211 ExecutionStrategy::PrimaryKey | ExecutionStrategy::Ordered => Err(
212 QueryError::invariant("execute_grouped requires grouped logical plans"),
213 ),
214 }
215 }
216
217 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
219 where
220 E: PersistedRow<Canister = C> + EntityValue,
221 {
222 let mode = query.mode();
224 let plan = self
225 .compile_query_with_visible_indexes(query)?
226 .into_executable();
227
228 self.execute_query_dyn(mode, plan)
230 }
231
232 #[doc(hidden)]
234 pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
235 where
236 E: PersistedRow<Canister = C> + EntityValue,
237 {
238 if !query.mode().is_delete() {
240 return Err(QueryError::unsupported_query(
241 "delete count execution requires delete query mode",
242 ));
243 }
244
245 let plan = self
247 .compile_query_with_visible_indexes(query)?
248 .into_executable();
249
250 self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
252 .map_err(QueryError::execute)
253 }
254
255 pub(in crate::db) fn execute_query_dyn<E>(
260 &self,
261 mode: QueryMode,
262 plan: ExecutablePlan<E>,
263 ) -> Result<EntityResponse<E>, QueryError>
264 where
265 E: PersistedRow<Canister = C> + EntityValue,
266 {
267 let result = match mode {
268 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
269 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
270 };
271
272 result.map_err(QueryError::execute)
273 }
274
275 pub(in crate::db) fn execute_load_query_with<E, T>(
278 &self,
279 query: &Query<E>,
280 op: impl FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
281 ) -> Result<T, QueryError>
282 where
283 E: PersistedRow<Canister = C> + EntityValue,
284 {
285 let plan = self
286 .compile_query_with_visible_indexes(query)?
287 .into_executable();
288
289 self.with_metrics(|| op(self.load_executor::<E>(), plan))
290 .map_err(QueryError::execute)
291 }
292
293 pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
298 where
299 E: EntityKind<Canister = C>,
300 {
301 let compiled = self.compile_query_with_visible_indexes(query)?;
302 let explain = compiled.explain();
303 let plan_hash = compiled.plan_hash_hex();
304
305 let executable = compiled.into_executable();
306 let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
307 let execution_strategy = match query.mode() {
308 QueryMode::Load(_) => Some(
309 executable
310 .execution_strategy()
311 .map_err(QueryError::execute)?,
312 ),
313 QueryMode::Delete(_) => None,
314 };
315
316 Ok(QueryTracePlan::new(
317 plan_hash,
318 access_strategy,
319 execution_strategy,
320 explain,
321 ))
322 }
323
324 pub(crate) fn execute_load_query_paged_with_trace<E>(
326 &self,
327 query: &Query<E>,
328 cursor_token: Option<&str>,
329 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
330 where
331 E: PersistedRow<Canister = C> + EntityValue,
332 {
333 let plan = self
335 .compile_query_with_visible_indexes(query)?
336 .into_executable();
337 Self::ensure_scalar_paged_execution_strategy(
338 plan.execution_strategy().map_err(QueryError::execute)?,
339 )?;
340
341 let cursor_bytes = decode_optional_cursor_token(cursor_token)
343 .map_err(QueryError::from_cursor_plan_error)?;
344 let cursor = plan
345 .prepare_cursor(cursor_bytes.as_deref())
346 .map_err(QueryError::from_executor_plan_error)?;
347
348 let (page, trace) = self
350 .with_metrics(|| {
351 self.load_executor::<E>()
352 .execute_paged_with_cursor_traced(plan, cursor)
353 })
354 .map_err(QueryError::execute)?;
355 let next_cursor = page
356 .next_cursor
357 .map(|token| {
358 let Some(token) = token.as_scalar() else {
359 return Err(QueryError::scalar_paged_emitted_grouped_continuation());
360 };
361
362 token.encode().map_err(|err| {
363 QueryError::serialize_internal(format!(
364 "failed to serialize continuation cursor: {err}"
365 ))
366 })
367 })
368 .transpose()?;
369
370 Ok(PagedLoadExecutionWithTrace::new(
371 page.items,
372 next_cursor,
373 trace,
374 ))
375 }
376
377 pub fn execute_grouped<E>(
382 &self,
383 query: &Query<E>,
384 cursor_token: Option<&str>,
385 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
386 where
387 E: PersistedRow<Canister = C> + EntityValue,
388 {
389 let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
390 let next_cursor = page
391 .next_cursor
392 .map(|token| {
393 let Some(token) = token.as_grouped() else {
394 return Err(QueryError::grouped_paged_emitted_scalar_continuation());
395 };
396
397 token.encode().map_err(|err| {
398 QueryError::serialize_internal(format!(
399 "failed to serialize grouped continuation cursor: {err}"
400 ))
401 })
402 })
403 .transpose()?;
404
405 Ok(PagedGroupedExecutionWithTrace::new(
406 page.rows,
407 next_cursor,
408 trace,
409 ))
410 }
411
412 #[doc(hidden)]
414 pub fn execute_grouped_text_cursor<E>(
415 &self,
416 query: &Query<E>,
417 cursor_token: Option<&str>,
418 ) -> Result<GroupedTextCursorPageWithTrace, QueryError>
419 where
420 E: PersistedRow<Canister = C> + EntityValue,
421 {
422 let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
423 let next_cursor = page
424 .next_cursor
425 .map(Self::encode_grouped_page_cursor_hex)
426 .transpose()?;
427
428 Ok((page.rows, next_cursor, trace))
429 }
430}
431
432impl<C: CanisterKind> DbSession<C> {
433 fn execute_grouped_page_with_trace<E>(
436 &self,
437 query: &Query<E>,
438 cursor_token: Option<&str>,
439 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
440 where
441 E: PersistedRow<Canister = C> + EntityValue,
442 {
443 let plan = self
445 .compile_query_with_visible_indexes(query)?
446 .into_executable();
447 Self::ensure_grouped_execution_strategy(
448 plan.execution_strategy().map_err(QueryError::execute)?,
449 )?;
450
451 let cursor = decode_optional_grouped_cursor_token(cursor_token)
453 .map_err(QueryError::from_cursor_plan_error)?;
454 let cursor = plan
455 .prepare_grouped_cursor_token(cursor)
456 .map_err(QueryError::from_executor_plan_error)?;
457
458 self.with_metrics(|| {
461 self.load_executor::<E>()
462 .execute_grouped_paged_with_cursor_traced(plan, cursor)
463 })
464 .map_err(QueryError::execute)
465 }
466
467 fn encode_grouped_page_cursor_hex(page_cursor: PageCursor) -> Result<String, QueryError> {
470 let token: &GroupedContinuationToken = page_cursor
471 .as_grouped()
472 .ok_or_else(QueryError::grouped_paged_emitted_scalar_continuation)?;
473
474 token.encode_hex().map_err(|err| {
475 QueryError::serialize_internal(format!(
476 "failed to serialize grouped continuation cursor: {err}"
477 ))
478 })
479 }
480}