1use crate::{
7 db::{
8 DbSession, EntityResponse, GroupedTextCursorPageWithTrace, PagedGroupedExecutionWithTrace,
9 PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
10 TraceExecutionStrategy,
11 access::AccessStrategy,
12 cursor::{CursorPlanError, GroupedContinuationToken},
13 diagnostics::ExecutionTrace,
14 executor::{
15 ExecutablePlan, ExecutionStrategy, GroupedCursorPage, LoadExecutor, PageCursor,
16 },
17 query::builder::aggregate::{
18 PreparedFluentOrderSensitiveTerminalStrategy, PreparedFluentScalarTerminalStrategy,
19 },
20 query::explain::{
21 ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
22 },
23 query::intent::{CompiledQuery, PlannedQuery},
24 query::plan::QueryMode,
25 session::{decode_optional_cursor_bytes, decode_optional_grouped_cursor},
26 },
27 error::InternalError,
28 traits::{CanisterKind, EntityKind, EntityValue, Path},
29};
30
31#[cfg(test)]
32use crate::db::query::builder::aggregate::PreparedFluentNumericFieldStrategy;
33
34impl<C: CanisterKind> DbSession<C> {
35 pub(in crate::db) fn compile_query_with_visible_indexes<E>(
38 &self,
39 query: &Query<E>,
40 ) -> Result<CompiledQuery<E>, QueryError>
41 where
42 E: EntityKind<Canister = C>,
43 {
44 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
45
46 query.plan_with_visible_indexes(&visible_indexes)
47 }
48
49 pub(in crate::db) fn planned_query_with_visible_indexes<E>(
52 &self,
53 query: &Query<E>,
54 ) -> Result<PlannedQuery<E>, QueryError>
55 where
56 E: EntityKind<Canister = C>,
57 {
58 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
59
60 query.planned_with_visible_indexes(&visible_indexes)
61 }
62
63 pub(in crate::db) fn explain_query_with_visible_indexes<E>(
65 &self,
66 query: &Query<E>,
67 ) -> Result<ExplainPlan, QueryError>
68 where
69 E: EntityKind<Canister = C>,
70 {
71 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
72
73 query.explain_with_visible_indexes(&visible_indexes)
74 }
75
76 pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
79 &self,
80 query: &Query<E>,
81 ) -> Result<String, QueryError>
82 where
83 E: EntityKind<Canister = C>,
84 {
85 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
86
87 query.plan_hash_hex_with_visible_indexes(&visible_indexes)
88 }
89
90 pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
93 &self,
94 query: &Query<E>,
95 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
96 where
97 E: EntityValue + EntityKind<Canister = C>,
98 {
99 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
100
101 query.explain_execution_with_visible_indexes(&visible_indexes)
102 }
103
104 pub(in crate::db) fn explain_query_execution_text_with_visible_indexes<E>(
107 &self,
108 query: &Query<E>,
109 ) -> Result<String, QueryError>
110 where
111 E: EntityValue + EntityKind<Canister = C>,
112 {
113 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
114
115 query.explain_execution_text_with_visible_indexes(&visible_indexes)
116 }
117
118 pub(in crate::db) fn explain_query_execution_json_with_visible_indexes<E>(
121 &self,
122 query: &Query<E>,
123 ) -> Result<String, QueryError>
124 where
125 E: EntityValue + EntityKind<Canister = C>,
126 {
127 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
128
129 query.explain_execution_json_with_visible_indexes(&visible_indexes)
130 }
131
132 pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
135 &self,
136 query: &Query<E>,
137 ) -> Result<String, QueryError>
138 where
139 E: EntityValue + EntityKind<Canister = C>,
140 {
141 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
142
143 query.explain_execution_verbose_with_visible_indexes(&visible_indexes)
144 }
145
146 pub(in crate::db) fn explain_query_prepared_scalar_terminal_with_visible_indexes<E>(
149 &self,
150 query: &Query<E>,
151 strategy: &PreparedFluentScalarTerminalStrategy,
152 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
153 where
154 E: EntityValue + EntityKind<Canister = C>,
155 {
156 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
157
158 query.explain_prepared_scalar_terminal_with_visible_indexes(&visible_indexes, strategy)
159 }
160
161 pub(in crate::db) fn explain_query_prepared_order_sensitive_terminal_with_visible_indexes<E>(
164 &self,
165 query: &Query<E>,
166 strategy: &PreparedFluentOrderSensitiveTerminalStrategy,
167 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
168 where
169 E: EntityValue + EntityKind<Canister = C>,
170 {
171 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
172
173 query.explain_prepared_order_sensitive_terminal_with_visible_indexes(
174 &visible_indexes,
175 strategy,
176 )
177 }
178
179 #[cfg(test)]
182 pub(in crate::db) fn explain_query_prepared_numeric_field_with_visible_indexes<E>(
183 &self,
184 query: &Query<E>,
185 strategy: &PreparedFluentNumericFieldStrategy,
186 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
187 where
188 E: EntityValue + EntityKind<Canister = C>,
189 {
190 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
191
192 query.explain_prepared_numeric_field_with_visible_indexes(&visible_indexes, strategy)
193 }
194
195 pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
198 &self,
199 query: &Query<E>,
200 target_field: &str,
201 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
202 where
203 E: EntityValue + EntityKind<Canister = C>,
204 {
205 let visible_indexes = self.visible_indexes_for_store_model(E::Store::PATH, E::MODEL)?;
206
207 query.explain_bytes_by_with_visible_indexes(&visible_indexes, target_field)
208 }
209
210 fn ensure_scalar_paged_execution_strategy(
213 strategy: ExecutionStrategy,
214 ) -> Result<(), QueryError> {
215 match strategy {
216 ExecutionStrategy::PrimaryKey => Err(QueryError::invariant(
217 CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
218 )),
219 ExecutionStrategy::Ordered => Ok(()),
220 ExecutionStrategy::Grouped => Err(QueryError::invariant(
221 "grouped plans require execute_grouped(...)",
222 )),
223 }
224 }
225
226 fn ensure_grouped_execution_strategy(strategy: ExecutionStrategy) -> Result<(), QueryError> {
229 match strategy {
230 ExecutionStrategy::Grouped => Ok(()),
231 ExecutionStrategy::PrimaryKey | ExecutionStrategy::Ordered => Err(
232 QueryError::invariant("execute_grouped requires grouped logical plans"),
233 ),
234 }
235 }
236
237 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
239 where
240 E: PersistedRow<Canister = C> + EntityValue,
241 {
242 let mode = query.mode();
244 let plan = self
245 .compile_query_with_visible_indexes(query)?
246 .into_executable();
247
248 self.execute_query_dyn(mode, plan)
250 }
251
252 #[doc(hidden)]
254 pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
255 where
256 E: PersistedRow<Canister = C> + EntityValue,
257 {
258 if !query.mode().is_delete() {
260 return Err(QueryError::unsupported_query(
261 "delete count execution requires delete query mode",
262 ));
263 }
264
265 let plan = self
267 .compile_query_with_visible_indexes(query)?
268 .into_executable();
269
270 self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
272 .map_err(QueryError::execute)
273 }
274
275 pub(in crate::db) fn execute_query_dyn<E>(
280 &self,
281 mode: QueryMode,
282 plan: ExecutablePlan<E>,
283 ) -> Result<EntityResponse<E>, QueryError>
284 where
285 E: PersistedRow<Canister = C> + EntityValue,
286 {
287 let result = match mode {
288 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
289 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
290 };
291
292 result.map_err(QueryError::execute)
293 }
294
295 pub(in crate::db) fn execute_load_query_with<E, T>(
298 &self,
299 query: &Query<E>,
300 op: impl FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
301 ) -> Result<T, QueryError>
302 where
303 E: PersistedRow<Canister = C> + EntityValue,
304 {
305 let plan = self
306 .compile_query_with_visible_indexes(query)?
307 .into_executable();
308
309 self.with_metrics(|| op(self.load_executor::<E>(), plan))
310 .map_err(QueryError::execute)
311 }
312
313 pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
318 where
319 E: EntityKind<Canister = C>,
320 {
321 let compiled = self.compile_query_with_visible_indexes(query)?;
322 let explain = compiled.explain();
323 let plan_hash = compiled.plan_hash_hex();
324
325 let executable = compiled.into_executable();
326 let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
327 let execution_strategy = match query.mode() {
328 QueryMode::Load(_) => Some(trace_execution_strategy(
329 executable
330 .execution_strategy()
331 .map_err(QueryError::execute)?,
332 )),
333 QueryMode::Delete(_) => None,
334 };
335
336 Ok(QueryTracePlan::new(
337 plan_hash,
338 access_strategy,
339 execution_strategy,
340 explain,
341 ))
342 }
343
344 pub(crate) fn execute_load_query_paged_with_trace<E>(
346 &self,
347 query: &Query<E>,
348 cursor_token: Option<&str>,
349 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
350 where
351 E: PersistedRow<Canister = C> + EntityValue,
352 {
353 let plan = self
355 .compile_query_with_visible_indexes(query)?
356 .into_executable();
357 Self::ensure_scalar_paged_execution_strategy(
358 plan.execution_strategy().map_err(QueryError::execute)?,
359 )?;
360
361 let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
363 let cursor = plan
364 .prepare_cursor(cursor_bytes.as_deref())
365 .map_err(QueryError::from_executor_plan_error)?;
366
367 let (page, trace) = self
369 .with_metrics(|| {
370 self.load_executor::<E>()
371 .execute_paged_with_cursor_traced(plan, cursor)
372 })
373 .map_err(QueryError::execute)?;
374 let next_cursor = page
375 .next_cursor
376 .map(|token| {
377 let Some(token) = token.as_scalar() else {
378 return Err(QueryError::scalar_paged_emitted_grouped_continuation());
379 };
380
381 token.encode().map_err(|err| {
382 QueryError::serialize_internal(format!(
383 "failed to serialize continuation cursor: {err}"
384 ))
385 })
386 })
387 .transpose()?;
388
389 Ok(PagedLoadExecutionWithTrace::new(
390 page.items,
391 next_cursor,
392 trace,
393 ))
394 }
395
396 pub fn execute_grouped<E>(
401 &self,
402 query: &Query<E>,
403 cursor_token: Option<&str>,
404 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
405 where
406 E: PersistedRow<Canister = C> + EntityValue,
407 {
408 let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
409 let next_cursor = page
410 .next_cursor
411 .map(|token| {
412 let Some(token) = token.as_grouped() else {
413 return Err(QueryError::grouped_paged_emitted_scalar_continuation());
414 };
415
416 token.encode().map_err(|err| {
417 QueryError::serialize_internal(format!(
418 "failed to serialize grouped continuation cursor: {err}"
419 ))
420 })
421 })
422 .transpose()?;
423
424 Ok(PagedGroupedExecutionWithTrace::new(
425 page.rows,
426 next_cursor,
427 trace,
428 ))
429 }
430
431 #[doc(hidden)]
433 pub fn execute_grouped_text_cursor<E>(
434 &self,
435 query: &Query<E>,
436 cursor_token: Option<&str>,
437 ) -> Result<GroupedTextCursorPageWithTrace, QueryError>
438 where
439 E: PersistedRow<Canister = C> + EntityValue,
440 {
441 let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
442 let next_cursor = page
443 .next_cursor
444 .map(Self::encode_grouped_page_cursor_hex)
445 .transpose()?;
446
447 Ok((page.rows, next_cursor, trace))
448 }
449}
450
451impl<C: CanisterKind> DbSession<C> {
452 fn execute_grouped_page_with_trace<E>(
455 &self,
456 query: &Query<E>,
457 cursor_token: Option<&str>,
458 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
459 where
460 E: PersistedRow<Canister = C> + EntityValue,
461 {
462 let plan = self
464 .compile_query_with_visible_indexes(query)?
465 .into_executable();
466 Self::ensure_grouped_execution_strategy(
467 plan.execution_strategy().map_err(QueryError::execute)?,
468 )?;
469
470 let cursor = decode_optional_grouped_cursor(cursor_token)?;
472 let cursor = plan
473 .prepare_grouped_cursor_token(cursor)
474 .map_err(QueryError::from_executor_plan_error)?;
475
476 self.with_metrics(|| {
479 self.load_executor::<E>()
480 .execute_grouped_paged_with_cursor_traced(plan, cursor)
481 })
482 .map_err(QueryError::execute)
483 }
484
485 fn encode_grouped_page_cursor_hex(page_cursor: PageCursor) -> Result<String, QueryError> {
488 let token: &GroupedContinuationToken = page_cursor
489 .as_grouped()
490 .ok_or_else(QueryError::grouped_paged_emitted_scalar_continuation)?;
491
492 token.encode_hex().map_err(|err| {
493 QueryError::serialize_internal(format!(
494 "failed to serialize grouped continuation cursor: {err}"
495 ))
496 })
497 }
498}
499
500const fn trace_execution_strategy(strategy: ExecutionStrategy) -> TraceExecutionStrategy {
501 match strategy {
502 ExecutionStrategy::PrimaryKey => TraceExecutionStrategy::PrimaryKey,
503 ExecutionStrategy::Ordered => TraceExecutionStrategy::Ordered,
504 ExecutionStrategy::Grouped => TraceExecutionStrategy::Grouped,
505 }
506}