1use crate::{
8 db::{
9 DbSession, EntityResponse, LoadQueryResult, PagedGroupedExecutionWithTrace,
10 PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
11 access::AccessStrategy,
12 commit::CommitSchemaFingerprint,
13 cursor::{
14 CursorPlanError, decode_optional_cursor_token, decode_optional_grouped_cursor_token,
15 },
16 diagnostics::ExecutionTrace,
17 executor::{ExecutionFamily, GroupedCursorPage, LoadExecutor, PreparedExecutionPlan},
18 query::builder::{
19 PreparedFluentAggregateExplainStrategy, PreparedFluentProjectionStrategy,
20 },
21 query::explain::{
22 ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
23 },
24 query::{
25 intent::{CompiledQuery, PlannedQuery, StructuralQuery},
26 plan::{AccessPlannedQuery, QueryMode, VisibleIndexes},
27 },
28 },
29 error::InternalError,
30 model::entity::EntityModel,
31 traits::{CanisterKind, EntityKind, EntityValue, Path},
32};
33#[cfg(feature = "perf-attribution")]
34use candid::CandidType;
35#[cfg(feature = "perf-attribution")]
36use serde::Deserialize;
37use std::collections::HashMap;
38
39#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
40pub(in crate::db) enum QueryPlanVisibility {
41 StoreNotReady,
42 StoreReady,
43}
44
45#[derive(Clone, Debug, Eq, Hash, PartialEq)]
46pub(in crate::db) struct QueryPlanCacheKey {
47 entity_path: &'static str,
48 schema_fingerprint: CommitSchemaFingerprint,
49 visibility: QueryPlanVisibility,
50 query_fingerprint: [u8; 32],
51}
52
53pub(in crate::db) type QueryPlanCache = HashMap<QueryPlanCacheKey, AccessPlannedQuery>;
54
55#[cfg(feature = "perf-attribution")]
62#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
63pub struct QueryExecutionAttribution {
64 pub compile_local_instructions: u64,
65 pub execute_local_instructions: u64,
66 pub total_local_instructions: u64,
67}
68
69#[cfg(feature = "perf-attribution")]
70#[expect(
71 clippy::missing_const_for_fn,
72 reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
73)]
74fn read_query_local_instruction_counter() -> u64 {
75 #[cfg(target_arch = "wasm32")]
76 {
77 canic_cdk::api::performance_counter(1)
78 }
79
80 #[cfg(not(target_arch = "wasm32"))]
81 {
82 0
83 }
84}
85
86#[cfg(feature = "perf-attribution")]
87fn measure_query_stage<T, E>(run: impl FnOnce() -> Result<T, E>) -> (u64, Result<T, E>) {
88 let start = read_query_local_instruction_counter();
89 let result = run();
90 let delta = read_query_local_instruction_counter().saturating_sub(start);
91
92 (delta, result)
93}
94
95impl<C: CanisterKind> DbSession<C> {
96 fn query_plan_cache(&self) -> &std::cell::RefCell<QueryPlanCache> {
97 self.query_plan_cache
98 .get_or_init(|| std::cell::RefCell::new(QueryPlanCache::new()))
99 }
100
101 const fn visible_indexes_for_model(
102 model: &'static EntityModel,
103 visibility: QueryPlanVisibility,
104 ) -> VisibleIndexes<'static> {
105 match visibility {
106 QueryPlanVisibility::StoreReady => VisibleIndexes::planner_visible(model.indexes()),
107 QueryPlanVisibility::StoreNotReady => VisibleIndexes::none(),
108 }
109 }
110
111 #[cfg(test)]
112 pub(in crate::db) fn query_plan_cache_len(&self) -> usize {
113 self.query_plan_cache().borrow().len()
114 }
115
116 pub(in crate::db) fn query_plan_visibility_for_store_path(
117 &self,
118 store_path: &'static str,
119 ) -> Result<QueryPlanVisibility, QueryError> {
120 let store = self
121 .db
122 .recovered_store(store_path)
123 .map_err(QueryError::execute)?;
124 let visibility = if store.index_state() == crate::db::IndexState::Ready {
125 QueryPlanVisibility::StoreReady
126 } else {
127 QueryPlanVisibility::StoreNotReady
128 };
129
130 Ok(visibility)
131 }
132
133 pub(in crate::db) fn cached_structural_plan_for_authority(
134 &self,
135 entity_path: &'static str,
136 schema_fingerprint: CommitSchemaFingerprint,
137 store_path: &'static str,
138 model: &'static EntityModel,
139 query: &StructuralQuery,
140 ) -> Result<AccessPlannedQuery, QueryError> {
141 let visibility = self.query_plan_visibility_for_store_path(store_path)?;
142 let cache_key = QueryPlanCacheKey {
143 entity_path,
144 schema_fingerprint,
145 visibility,
146 query_fingerprint: query.cache_fingerprint(),
147 };
148
149 {
150 let cache = self.query_plan_cache().borrow();
151 if let Some(plan) = cache.get(&cache_key) {
152 return Ok(plan.clone());
153 }
154 }
155
156 let visible_indexes = Self::visible_indexes_for_model(model, visibility);
157 let plan = query.build_plan_with_visible_indexes(&visible_indexes)?;
158 self.query_plan_cache()
159 .borrow_mut()
160 .insert(cache_key, plan.clone());
161
162 Ok(plan)
163 }
164
165 fn with_query_visible_indexes<E, T>(
168 &self,
169 query: &Query<E>,
170 op: impl FnOnce(
171 &Query<E>,
172 &crate::db::query::plan::VisibleIndexes<'static>,
173 ) -> Result<T, QueryError>,
174 ) -> Result<T, QueryError>
175 where
176 E: EntityKind<Canister = C>,
177 {
178 let visibility = self.query_plan_visibility_for_store_path(E::Store::PATH)?;
179 let visible_indexes = Self::visible_indexes_for_model(E::MODEL, visibility);
180
181 op(query, &visible_indexes)
182 }
183
184 fn cached_structural_plan_for_entity<E>(
187 &self,
188 query: &StructuralQuery,
189 ) -> Result<AccessPlannedQuery, QueryError>
190 where
191 E: EntityKind<Canister = C>,
192 {
193 self.cached_structural_plan_for_authority(
194 E::PATH,
195 crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
196 E::Store::PATH,
197 E::MODEL,
198 query,
199 )
200 }
201
202 pub(in crate::db) fn compile_query_with_visible_indexes<E>(
205 &self,
206 query: &Query<E>,
207 ) -> Result<CompiledQuery<E>, QueryError>
208 where
209 E: EntityKind<Canister = C>,
210 {
211 let plan = self.cached_structural_plan_for_entity::<E>(query.structural())?;
212
213 Ok(Query::<E>::compiled_query_from_plan(plan))
214 }
215
216 pub(in crate::db) fn planned_query_with_visible_indexes<E>(
219 &self,
220 query: &Query<E>,
221 ) -> Result<PlannedQuery<E>, QueryError>
222 where
223 E: EntityKind<Canister = C>,
224 {
225 let plan = self.cached_structural_plan_for_entity::<E>(query.structural())?;
226
227 Ok(Query::<E>::planned_query_from_plan(plan))
228 }
229
230 pub(in crate::db) fn explain_query_with_visible_indexes<E>(
232 &self,
233 query: &Query<E>,
234 ) -> Result<ExplainPlan, QueryError>
235 where
236 E: EntityKind<Canister = C>,
237 {
238 self.with_query_visible_indexes(query, |query, visible_indexes| {
239 query.explain_with_visible_indexes(visible_indexes)
240 })
241 }
242
243 pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
246 &self,
247 query: &Query<E>,
248 ) -> Result<String, QueryError>
249 where
250 E: EntityKind<Canister = C>,
251 {
252 self.with_query_visible_indexes(query, |query, visible_indexes| {
253 query.plan_hash_hex_with_visible_indexes(visible_indexes)
254 })
255 }
256
257 pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
260 &self,
261 query: &Query<E>,
262 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
263 where
264 E: EntityValue + EntityKind<Canister = C>,
265 {
266 self.with_query_visible_indexes(query, |query, visible_indexes| {
267 query.explain_execution_with_visible_indexes(visible_indexes)
268 })
269 }
270
271 pub(in crate::db) fn explain_query_execution_text_with_visible_indexes<E>(
274 &self,
275 query: &Query<E>,
276 ) -> Result<String, QueryError>
277 where
278 E: EntityValue + EntityKind<Canister = C>,
279 {
280 self.with_query_visible_indexes(query, |query, visible_indexes| {
281 query.explain_execution_text_with_visible_indexes(visible_indexes)
282 })
283 }
284
285 pub(in crate::db) fn explain_query_execution_json_with_visible_indexes<E>(
288 &self,
289 query: &Query<E>,
290 ) -> Result<String, QueryError>
291 where
292 E: EntityValue + EntityKind<Canister = C>,
293 {
294 self.with_query_visible_indexes(query, |query, visible_indexes| {
295 query.explain_execution_json_with_visible_indexes(visible_indexes)
296 })
297 }
298
299 pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
302 &self,
303 query: &Query<E>,
304 ) -> Result<String, QueryError>
305 where
306 E: EntityValue + EntityKind<Canister = C>,
307 {
308 self.with_query_visible_indexes(query, |query, visible_indexes| {
309 query.explain_execution_verbose_with_visible_indexes(visible_indexes)
310 })
311 }
312
313 pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
316 &self,
317 query: &Query<E>,
318 strategy: &S,
319 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
320 where
321 E: EntityValue + EntityKind<Canister = C>,
322 S: PreparedFluentAggregateExplainStrategy,
323 {
324 self.with_query_visible_indexes(query, |query, visible_indexes| {
325 query
326 .explain_prepared_aggregate_terminal_with_visible_indexes(visible_indexes, strategy)
327 })
328 }
329
330 pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
333 &self,
334 query: &Query<E>,
335 target_field: &str,
336 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
337 where
338 E: EntityValue + EntityKind<Canister = C>,
339 {
340 self.with_query_visible_indexes(query, |query, visible_indexes| {
341 query.explain_bytes_by_with_visible_indexes(visible_indexes, target_field)
342 })
343 }
344
345 pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
348 &self,
349 query: &Query<E>,
350 strategy: &PreparedFluentProjectionStrategy,
351 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
352 where
353 E: EntityValue + EntityKind<Canister = C>,
354 {
355 self.with_query_visible_indexes(query, |query, visible_indexes| {
356 query.explain_prepared_projection_terminal_with_visible_indexes(
357 visible_indexes,
358 strategy,
359 )
360 })
361 }
362
363 fn ensure_scalar_paged_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
366 match family {
367 ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
368 CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
369 )),
370 ExecutionFamily::Ordered => Ok(()),
371 ExecutionFamily::Grouped => Err(QueryError::invariant(
372 "grouped queries execute via execute(), not page().execute()",
373 )),
374 }
375 }
376
377 fn ensure_grouped_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
380 match family {
381 ExecutionFamily::Grouped => Ok(()),
382 ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
383 "grouped execution requires grouped logical plans",
384 )),
385 }
386 }
387
388 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
390 where
391 E: PersistedRow<Canister = C> + EntityValue,
392 {
393 let mode = query.mode();
395 let plan = self
396 .compile_query_with_visible_indexes(query)?
397 .into_prepared_execution_plan();
398
399 self.execute_query_dyn(mode, plan)
401 }
402
403 #[cfg(feature = "perf-attribution")]
406 #[doc(hidden)]
407 pub fn execute_query_result_with_attribution<E>(
408 &self,
409 query: &Query<E>,
410 ) -> Result<(LoadQueryResult<E>, QueryExecutionAttribution), QueryError>
411 where
412 E: PersistedRow<Canister = C> + EntityValue,
413 {
414 let (compile_local_instructions, compiled) =
416 measure_query_stage(|| self.compile_query_with_visible_indexes(query));
417 let compiled = compiled?;
418 let plan = compiled.into_prepared_execution_plan();
419
420 let (execute_local_instructions, result) = measure_query_stage(|| {
422 if query.has_grouping() {
423 self.execute_grouped_plan_with_trace(plan, None)
424 .map(|(page, trace)| {
425 let next_cursor = page
426 .next_cursor
427 .map(|token| {
428 let Some(token) = token.as_grouped() else {
429 return Err(
430 QueryError::grouped_paged_emitted_scalar_continuation(),
431 );
432 };
433
434 token.encode().map_err(|err| {
435 QueryError::serialize_internal(format!(
436 "failed to serialize grouped continuation cursor: {err}"
437 ))
438 })
439 })
440 .transpose()?;
441
442 Ok::<LoadQueryResult<E>, QueryError>(LoadQueryResult::Grouped(
443 PagedGroupedExecutionWithTrace::new(page.rows, next_cursor, trace),
444 ))
445 })?
446 } else {
447 self.execute_query_dyn(query.mode(), plan)
448 .map(LoadQueryResult::Rows)
449 }
450 });
451 let result = result?;
452 let total_local_instructions =
453 compile_local_instructions.saturating_add(execute_local_instructions);
454
455 Ok((
456 result,
457 QueryExecutionAttribution {
458 compile_local_instructions,
459 execute_local_instructions,
460 total_local_instructions,
461 },
462 ))
463 }
464
465 #[doc(hidden)]
468 pub fn execute_query_result<E>(
469 &self,
470 query: &Query<E>,
471 ) -> Result<LoadQueryResult<E>, QueryError>
472 where
473 E: PersistedRow<Canister = C> + EntityValue,
474 {
475 if query.has_grouping() {
476 return self
477 .execute_grouped(query, None)
478 .map(LoadQueryResult::Grouped);
479 }
480
481 self.execute_query(query).map(LoadQueryResult::Rows)
482 }
483
484 #[doc(hidden)]
486 pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
487 where
488 E: PersistedRow<Canister = C> + EntityValue,
489 {
490 if !query.mode().is_delete() {
492 return Err(QueryError::unsupported_query(
493 "delete count execution requires delete query mode",
494 ));
495 }
496
497 let plan = self
499 .compile_query_with_visible_indexes(query)?
500 .into_prepared_execution_plan();
501
502 self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
504 .map_err(QueryError::execute)
505 }
506
507 pub(in crate::db) fn execute_query_dyn<E>(
512 &self,
513 mode: QueryMode,
514 plan: PreparedExecutionPlan<E>,
515 ) -> Result<EntityResponse<E>, QueryError>
516 where
517 E: PersistedRow<Canister = C> + EntityValue,
518 {
519 let result = match mode {
520 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
521 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
522 };
523
524 result.map_err(QueryError::execute)
525 }
526
527 pub(in crate::db) fn execute_load_query_with<E, T>(
530 &self,
531 query: &Query<E>,
532 op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
533 ) -> Result<T, QueryError>
534 where
535 E: PersistedRow<Canister = C> + EntityValue,
536 {
537 let plan = self
538 .compile_query_with_visible_indexes(query)?
539 .into_prepared_execution_plan();
540
541 self.with_metrics(|| op(self.load_executor::<E>(), plan))
542 .map_err(QueryError::execute)
543 }
544
545 pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
550 where
551 E: EntityKind<Canister = C>,
552 {
553 let compiled = self.compile_query_with_visible_indexes(query)?;
554 let explain = compiled.explain();
555 let plan_hash = compiled.plan_hash_hex();
556
557 let executable = compiled.into_prepared_execution_plan();
558 let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
559 let execution_family = match query.mode() {
560 QueryMode::Load(_) => Some(executable.execution_family().map_err(QueryError::execute)?),
561 QueryMode::Delete(_) => None,
562 };
563
564 Ok(QueryTracePlan::new(
565 plan_hash,
566 access_strategy,
567 execution_family,
568 explain,
569 ))
570 }
571
572 pub(crate) fn execute_load_query_paged_with_trace<E>(
574 &self,
575 query: &Query<E>,
576 cursor_token: Option<&str>,
577 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
578 where
579 E: PersistedRow<Canister = C> + EntityValue,
580 {
581 let plan = self
583 .compile_query_with_visible_indexes(query)?
584 .into_prepared_execution_plan();
585 Self::ensure_scalar_paged_execution_family(
586 plan.execution_family().map_err(QueryError::execute)?,
587 )?;
588
589 let cursor_bytes = decode_optional_cursor_token(cursor_token)
591 .map_err(QueryError::from_cursor_plan_error)?;
592 let cursor = plan
593 .prepare_cursor(cursor_bytes.as_deref())
594 .map_err(QueryError::from_executor_plan_error)?;
595
596 let (page, trace) = self
598 .with_metrics(|| {
599 self.load_executor::<E>()
600 .execute_paged_with_cursor_traced(plan, cursor)
601 })
602 .map_err(QueryError::execute)?;
603 let next_cursor = page
604 .next_cursor
605 .map(|token| {
606 let Some(token) = token.as_scalar() else {
607 return Err(QueryError::scalar_paged_emitted_grouped_continuation());
608 };
609
610 token.encode().map_err(|err| {
611 QueryError::serialize_internal(format!(
612 "failed to serialize continuation cursor: {err}"
613 ))
614 })
615 })
616 .transpose()?;
617
618 Ok(PagedLoadExecutionWithTrace::new(
619 page.items,
620 next_cursor,
621 trace,
622 ))
623 }
624
625 pub(in crate::db) fn execute_grouped<E>(
630 &self,
631 query: &Query<E>,
632 cursor_token: Option<&str>,
633 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
634 where
635 E: PersistedRow<Canister = C> + EntityValue,
636 {
637 let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
638 let next_cursor = page
639 .next_cursor
640 .map(|token| {
641 let Some(token) = token.as_grouped() else {
642 return Err(QueryError::grouped_paged_emitted_scalar_continuation());
643 };
644
645 token.encode().map_err(|err| {
646 QueryError::serialize_internal(format!(
647 "failed to serialize grouped continuation cursor: {err}"
648 ))
649 })
650 })
651 .transpose()?;
652
653 Ok(PagedGroupedExecutionWithTrace::new(
654 page.rows,
655 next_cursor,
656 trace,
657 ))
658 }
659
660 fn execute_grouped_page_with_trace<E>(
663 &self,
664 query: &Query<E>,
665 cursor_token: Option<&str>,
666 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
667 where
668 E: PersistedRow<Canister = C> + EntityValue,
669 {
670 let plan = self
672 .compile_query_with_visible_indexes(query)?
673 .into_prepared_execution_plan();
674
675 self.execute_grouped_plan_with_trace(plan, cursor_token)
677 }
678
679 fn execute_grouped_plan_with_trace<E>(
681 &self,
682 plan: PreparedExecutionPlan<E>,
683 cursor_token: Option<&str>,
684 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
685 where
686 E: PersistedRow<Canister = C> + EntityValue,
687 {
688 Self::ensure_grouped_execution_family(
690 plan.execution_family().map_err(QueryError::execute)?,
691 )?;
692
693 let cursor = decode_optional_grouped_cursor_token(cursor_token)
695 .map_err(QueryError::from_cursor_plan_error)?;
696 let cursor = plan
697 .prepare_grouped_cursor_token(cursor)
698 .map_err(QueryError::from_executor_plan_error)?;
699
700 self.with_metrics(|| {
703 self.load_executor::<E>()
704 .execute_grouped_paged_with_cursor_traced(plan, cursor)
705 })
706 .map_err(QueryError::execute)
707 }
708}