1#[cfg(feature = "perf-attribution")]
8use crate::db::executor::ScalarExecutePhaseAttribution;
9use crate::{
10 db::{
11 DbSession, EntityResponse, LoadQueryResult, PagedGroupedExecutionWithTrace,
12 PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
13 access::AccessStrategy,
14 commit::CommitSchemaFingerprint,
15 cursor::{
16 CursorPlanError, decode_optional_cursor_token, decode_optional_grouped_cursor_token,
17 },
18 diagnostics::ExecutionTrace,
19 executor::{
20 ExecutionFamily, GroupedCursorPage, LoadExecutor, PreparedExecutionPlan,
21 SharedPreparedExecutionPlan,
22 },
23 query::builder::{
24 PreparedFluentAggregateExplainStrategy, PreparedFluentProjectionStrategy,
25 },
26 query::explain::{
27 ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
28 },
29 query::{
30 intent::{CompiledQuery, PlannedQuery, StructuralQuery},
31 plan::{AccessPlannedQuery, QueryMode, VisibleIndexes},
32 },
33 },
34 error::InternalError,
35 model::entity::EntityModel,
36 traits::{CanisterKind, EntityKind, EntityValue, Path},
37};
38#[cfg(feature = "perf-attribution")]
39use candid::CandidType;
40use icydb_utils::Xxh3;
41#[cfg(feature = "perf-attribution")]
42use serde::Deserialize;
43use std::{cell::RefCell, collections::HashMap, hash::BuildHasherDefault};
44
45type CacheBuildHasher = BuildHasherDefault<Xxh3>;
46
47const SHARED_QUERY_PLAN_CACHE_METHOD_VERSION: u8 = 1;
50
51#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
52pub(in crate::db) enum QueryPlanVisibility {
53 StoreNotReady,
54 StoreReady,
55}
56
57#[derive(Clone, Debug, Eq, Hash, PartialEq)]
58pub(in crate::db) struct QueryPlanCacheKey {
59 cache_method_version: u8,
60 entity_path: &'static str,
61 schema_fingerprint: CommitSchemaFingerprint,
62 visibility: QueryPlanVisibility,
63 structural_query: crate::db::query::intent::StructuralQueryCacheKey,
64}
65
66#[derive(Clone, Debug)]
67pub(in crate::db) struct QueryPlanCacheEntry {
68 logical_plan: AccessPlannedQuery,
69 prepared_plan: SharedPreparedExecutionPlan,
70}
71
72impl QueryPlanCacheEntry {
73 #[must_use]
74 pub(in crate::db) const fn new(
75 logical_plan: AccessPlannedQuery,
76 prepared_plan: SharedPreparedExecutionPlan,
77 ) -> Self {
78 Self {
79 logical_plan,
80 prepared_plan,
81 }
82 }
83
84 #[must_use]
85 pub(in crate::db) const fn logical_plan(&self) -> &AccessPlannedQuery {
86 &self.logical_plan
87 }
88
89 #[must_use]
90 pub(in crate::db) fn typed_prepared_plan<E: EntityKind>(&self) -> PreparedExecutionPlan<E> {
91 self.prepared_plan.typed_clone::<E>()
92 }
93}
94
95pub(in crate::db) type QueryPlanCache =
96 HashMap<QueryPlanCacheKey, QueryPlanCacheEntry, CacheBuildHasher>;
97
98thread_local! {
99 static QUERY_PLAN_CACHES: RefCell<HashMap<usize, QueryPlanCache, CacheBuildHasher>> =
104 RefCell::new(HashMap::default());
105}
106
107#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
108pub(in crate::db) struct QueryPlanCacheAttribution {
109 pub hits: u64,
110 pub misses: u64,
111}
112
113impl QueryPlanCacheAttribution {
114 #[must_use]
115 const fn hit() -> Self {
116 Self { hits: 1, misses: 0 }
117 }
118
119 #[must_use]
120 const fn miss() -> Self {
121 Self { hits: 0, misses: 1 }
122 }
123}
124
125#[cfg(feature = "perf-attribution")]
132#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
133pub struct QueryExecutionAttribution {
134 pub compile_local_instructions: u64,
135 pub runtime_local_instructions: u64,
136 pub finalize_local_instructions: u64,
137 pub direct_data_row_scan_local_instructions: u64,
138 pub direct_data_row_key_stream_local_instructions: u64,
139 pub direct_data_row_row_read_local_instructions: u64,
140 pub direct_data_row_key_encode_local_instructions: u64,
141 pub direct_data_row_store_get_local_instructions: u64,
142 pub direct_data_row_order_window_local_instructions: u64,
143 pub direct_data_row_page_window_local_instructions: u64,
144 pub response_decode_local_instructions: u64,
145 pub execute_local_instructions: u64,
146 pub total_local_instructions: u64,
147 pub shared_query_plan_cache_hits: u64,
148 pub shared_query_plan_cache_misses: u64,
149}
150
151#[cfg(feature = "perf-attribution")]
152#[expect(
153 clippy::missing_const_for_fn,
154 reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
155)]
156fn read_query_local_instruction_counter() -> u64 {
157 #[cfg(target_arch = "wasm32")]
158 {
159 canic_cdk::api::performance_counter(1)
160 }
161
162 #[cfg(not(target_arch = "wasm32"))]
163 {
164 0
165 }
166}
167
168#[cfg(feature = "perf-attribution")]
169fn measure_query_stage<T, E>(run: impl FnOnce() -> Result<T, E>) -> (u64, Result<T, E>) {
170 let start = read_query_local_instruction_counter();
171 let result = run();
172 let delta = read_query_local_instruction_counter().saturating_sub(start);
173
174 (delta, result)
175}
176
177impl<C: CanisterKind> DbSession<C> {
178 #[cfg(feature = "perf-attribution")]
179 const fn empty_scalar_execute_phase_attribution() -> ScalarExecutePhaseAttribution {
180 ScalarExecutePhaseAttribution {
181 runtime_local_instructions: 0,
182 finalize_local_instructions: 0,
183 direct_data_row_scan_local_instructions: 0,
184 direct_data_row_key_stream_local_instructions: 0,
185 direct_data_row_row_read_local_instructions: 0,
186 direct_data_row_key_encode_local_instructions: 0,
187 direct_data_row_store_get_local_instructions: 0,
188 direct_data_row_order_window_local_instructions: 0,
189 direct_data_row_page_window_local_instructions: 0,
190 }
191 }
192
193 fn query_plan_cache_scope_id(&self) -> usize {
194 self.db.cache_scope_id()
195 }
196
197 fn with_query_plan_cache<R>(&self, f: impl FnOnce(&mut QueryPlanCache) -> R) -> R {
198 let scope_id = self.query_plan_cache_scope_id();
199
200 QUERY_PLAN_CACHES.with(|caches| {
201 let mut caches = caches.borrow_mut();
202 let cache = caches.entry(scope_id).or_default();
203
204 f(cache)
205 })
206 }
207
208 const fn visible_indexes_for_model(
209 model: &'static EntityModel,
210 visibility: QueryPlanVisibility,
211 ) -> VisibleIndexes<'static> {
212 match visibility {
213 QueryPlanVisibility::StoreReady => VisibleIndexes::planner_visible(model.indexes()),
214 QueryPlanVisibility::StoreNotReady => VisibleIndexes::none(),
215 }
216 }
217
218 #[cfg(test)]
219 pub(in crate::db) fn query_plan_cache_len(&self) -> usize {
220 self.with_query_plan_cache(|cache| cache.len())
221 }
222
223 #[cfg(test)]
224 pub(in crate::db) fn clear_query_plan_cache_for_tests(&self) {
225 self.with_query_plan_cache(QueryPlanCache::clear);
226 }
227
228 pub(in crate::db) fn query_plan_visibility_for_store_path(
229 &self,
230 store_path: &'static str,
231 ) -> Result<QueryPlanVisibility, QueryError> {
232 let store = self
233 .db
234 .recovered_store(store_path)
235 .map_err(QueryError::execute)?;
236 let visibility = if store.index_state() == crate::db::IndexState::Ready {
237 QueryPlanVisibility::StoreReady
238 } else {
239 QueryPlanVisibility::StoreNotReady
240 };
241
242 Ok(visibility)
243 }
244
245 pub(in crate::db) fn cached_query_plan_entry_for_authority(
246 &self,
247 authority: crate::db::executor::EntityAuthority,
248 schema_fingerprint: CommitSchemaFingerprint,
249 query: &StructuralQuery,
250 ) -> Result<(QueryPlanCacheEntry, QueryPlanCacheAttribution), QueryError> {
251 let visibility = self.query_plan_visibility_for_store_path(authority.store_path())?;
252 let visible_indexes = Self::visible_indexes_for_model(authority.model(), visibility);
253 let normalized_predicate = query.prepare_normalized_scalar_predicate()?;
254 let cache_key = QueryPlanCacheKey::for_authority_with_normalized_predicate(
255 authority,
256 schema_fingerprint,
257 visibility,
258 query,
259 normalized_predicate.as_ref(),
260 );
261
262 {
263 let cached = self.with_query_plan_cache(|cache| cache.get(&cache_key).cloned());
264 if let Some(entry) = cached {
265 return Ok((entry, QueryPlanCacheAttribution::hit()));
266 }
267 }
268
269 let plan = query.build_plan_with_visible_indexes_from_normalized_predicate(
270 &visible_indexes,
271 normalized_predicate,
272 )?;
273 let entry = QueryPlanCacheEntry::new(
274 plan.clone(),
275 SharedPreparedExecutionPlan::from_plan(authority, plan),
276 );
277 self.with_query_plan_cache(|cache| {
278 cache.insert(cache_key, entry.clone());
279 });
280
281 Ok((entry, QueryPlanCacheAttribution::miss()))
282 }
283
284 #[cfg(test)]
285 pub(in crate::db) fn query_plan_cache_key_for_tests(
286 authority: crate::db::executor::EntityAuthority,
287 schema_fingerprint: CommitSchemaFingerprint,
288 visibility: QueryPlanVisibility,
289 query: &StructuralQuery,
290 cache_method_version: u8,
291 ) -> QueryPlanCacheKey {
292 QueryPlanCacheKey::for_authority_with_method_version(
293 authority,
294 schema_fingerprint,
295 visibility,
296 query,
297 cache_method_version,
298 )
299 }
300
301 pub(in crate::db) fn cached_structural_plan_for_authority(
302 &self,
303 authority: crate::db::executor::EntityAuthority,
304 schema_fingerprint: CommitSchemaFingerprint,
305 query: &StructuralQuery,
306 ) -> Result<AccessPlannedQuery, QueryError> {
307 let (entry, _) =
308 self.cached_query_plan_entry_for_authority(authority, schema_fingerprint, query)?;
309
310 Ok(entry.logical_plan().clone())
311 }
312
313 fn with_query_visible_indexes<E, T>(
316 &self,
317 query: &Query<E>,
318 op: impl FnOnce(
319 &Query<E>,
320 &crate::db::query::plan::VisibleIndexes<'static>,
321 ) -> Result<T, QueryError>,
322 ) -> Result<T, QueryError>
323 where
324 E: EntityKind<Canister = C>,
325 {
326 let visibility = self.query_plan_visibility_for_store_path(E::Store::PATH)?;
327 let visible_indexes = Self::visible_indexes_for_model(E::MODEL, visibility);
328
329 op(query, &visible_indexes)
330 }
331
332 fn cached_structural_plan_for_entity<E>(
335 &self,
336 query: &StructuralQuery,
337 ) -> Result<AccessPlannedQuery, QueryError>
338 where
339 E: EntityKind<Canister = C>,
340 {
341 self.cached_structural_plan_for_authority(
342 crate::db::executor::EntityAuthority::for_type::<E>(),
343 crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
344 query,
345 )
346 }
347
348 pub(in crate::db::session) fn cached_prepared_query_plan_for_entity<E>(
349 &self,
350 query: &StructuralQuery,
351 ) -> Result<(PreparedExecutionPlan<E>, QueryPlanCacheAttribution), QueryError>
352 where
353 E: EntityKind<Canister = C>,
354 {
355 let (entry, attribution) = self.cached_query_plan_entry_for_authority(
356 crate::db::executor::EntityAuthority::for_type::<E>(),
357 crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
358 query,
359 )?;
360
361 Ok((entry.typed_prepared_plan::<E>(), attribution))
362 }
363
364 pub(in crate::db) fn compile_query_with_visible_indexes<E>(
367 &self,
368 query: &Query<E>,
369 ) -> Result<CompiledQuery<E>, QueryError>
370 where
371 E: EntityKind<Canister = C>,
372 {
373 let plan = self.cached_structural_plan_for_entity::<E>(query.structural())?;
374
375 Ok(Query::<E>::compiled_query_from_plan(plan))
376 }
377
378 pub(in crate::db) fn planned_query_with_visible_indexes<E>(
381 &self,
382 query: &Query<E>,
383 ) -> Result<PlannedQuery<E>, QueryError>
384 where
385 E: EntityKind<Canister = C>,
386 {
387 let plan = self.cached_structural_plan_for_entity::<E>(query.structural())?;
388
389 Ok(Query::<E>::planned_query_from_plan(plan))
390 }
391
392 pub(in crate::db) fn explain_query_with_visible_indexes<E>(
394 &self,
395 query: &Query<E>,
396 ) -> Result<ExplainPlan, QueryError>
397 where
398 E: EntityKind<Canister = C>,
399 {
400 self.with_query_visible_indexes(query, |query, visible_indexes| {
401 query.explain_with_visible_indexes(visible_indexes)
402 })
403 }
404
405 pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
408 &self,
409 query: &Query<E>,
410 ) -> Result<String, QueryError>
411 where
412 E: EntityKind<Canister = C>,
413 {
414 self.with_query_visible_indexes(query, |query, visible_indexes| {
415 query.plan_hash_hex_with_visible_indexes(visible_indexes)
416 })
417 }
418
419 pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
422 &self,
423 query: &Query<E>,
424 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
425 where
426 E: EntityValue + EntityKind<Canister = C>,
427 {
428 self.with_query_visible_indexes(query, |query, visible_indexes| {
429 query.explain_execution_with_visible_indexes(visible_indexes)
430 })
431 }
432
433 pub(in crate::db) fn explain_query_execution_text_with_visible_indexes<E>(
436 &self,
437 query: &Query<E>,
438 ) -> Result<String, QueryError>
439 where
440 E: EntityValue + EntityKind<Canister = C>,
441 {
442 self.with_query_visible_indexes(query, |query, visible_indexes| {
443 query.explain_execution_text_with_visible_indexes(visible_indexes)
444 })
445 }
446
447 pub(in crate::db) fn explain_query_execution_json_with_visible_indexes<E>(
450 &self,
451 query: &Query<E>,
452 ) -> Result<String, QueryError>
453 where
454 E: EntityValue + EntityKind<Canister = C>,
455 {
456 self.with_query_visible_indexes(query, |query, visible_indexes| {
457 query.explain_execution_json_with_visible_indexes(visible_indexes)
458 })
459 }
460
461 pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
464 &self,
465 query: &Query<E>,
466 ) -> Result<String, QueryError>
467 where
468 E: EntityValue + EntityKind<Canister = C>,
469 {
470 self.with_query_visible_indexes(query, |query, visible_indexes| {
471 query.explain_execution_verbose_with_visible_indexes(visible_indexes)
472 })
473 }
474
475 pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
478 &self,
479 query: &Query<E>,
480 strategy: &S,
481 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
482 where
483 E: EntityValue + EntityKind<Canister = C>,
484 S: PreparedFluentAggregateExplainStrategy,
485 {
486 self.with_query_visible_indexes(query, |query, visible_indexes| {
487 query
488 .explain_prepared_aggregate_terminal_with_visible_indexes(visible_indexes, strategy)
489 })
490 }
491
492 pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
495 &self,
496 query: &Query<E>,
497 target_field: &str,
498 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
499 where
500 E: EntityValue + EntityKind<Canister = C>,
501 {
502 self.with_query_visible_indexes(query, |query, visible_indexes| {
503 query.explain_bytes_by_with_visible_indexes(visible_indexes, target_field)
504 })
505 }
506
507 pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
510 &self,
511 query: &Query<E>,
512 strategy: &PreparedFluentProjectionStrategy,
513 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
514 where
515 E: EntityValue + EntityKind<Canister = C>,
516 {
517 self.with_query_visible_indexes(query, |query, visible_indexes| {
518 query.explain_prepared_projection_terminal_with_visible_indexes(
519 visible_indexes,
520 strategy,
521 )
522 })
523 }
524
525 fn ensure_scalar_paged_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
528 match family {
529 ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
530 CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
531 )),
532 ExecutionFamily::Ordered => Ok(()),
533 ExecutionFamily::Grouped => Err(QueryError::invariant(
534 "grouped queries execute via execute(), not page().execute()",
535 )),
536 }
537 }
538
539 fn ensure_grouped_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
542 match family {
543 ExecutionFamily::Grouped => Ok(()),
544 ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
545 "grouped execution requires grouped logical plans",
546 )),
547 }
548 }
549
550 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
552 where
553 E: PersistedRow<Canister = C> + EntityValue,
554 {
555 let mode = query.mode();
557 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
558
559 self.execute_query_dyn(mode, plan)
561 }
562
563 #[cfg(feature = "perf-attribution")]
566 #[doc(hidden)]
567 pub fn execute_query_result_with_attribution<E>(
568 &self,
569 query: &Query<E>,
570 ) -> Result<(LoadQueryResult<E>, QueryExecutionAttribution), QueryError>
571 where
572 E: PersistedRow<Canister = C> + EntityValue,
573 {
574 let (compile_local_instructions, plan_and_cache) = measure_query_stage(|| {
579 self.cached_prepared_query_plan_for_entity::<E>(query.structural())
580 });
581 let (plan, cache_attribution) = plan_and_cache?;
582
583 let (execute_local_instructions, result) = measure_query_stage(|| {
586 if query.has_grouping() {
587 self.execute_grouped_plan_with_trace(plan, None)
588 .map(|(page, trace)| {
589 let next_cursor = page
590 .next_cursor
591 .map(|token| {
592 let Some(token) = token.as_grouped() else {
593 return Err(
594 QueryError::grouped_paged_emitted_scalar_continuation(),
595 );
596 };
597
598 token.encode().map_err(|err| {
599 QueryError::serialize_internal(format!(
600 "failed to serialize grouped continuation cursor: {err}"
601 ))
602 })
603 })
604 .transpose()?;
605
606 Ok::<(LoadQueryResult<E>, ScalarExecutePhaseAttribution, u64), QueryError>(
607 (
608 LoadQueryResult::Grouped(PagedGroupedExecutionWithTrace::new(
609 page.rows,
610 next_cursor,
611 trace,
612 )),
613 Self::empty_scalar_execute_phase_attribution(),
614 0,
615 ),
616 )
617 })?
618 } else {
619 match query.mode() {
620 QueryMode::Load(_) => {
621 let (rows, phase_attribution, response_decode_local_instructions) = self
622 .load_executor::<E>()
623 .execute_with_phase_attribution(plan)
624 .map_err(QueryError::execute)?;
625
626 Ok::<(LoadQueryResult<E>, ScalarExecutePhaseAttribution, u64), QueryError>(
627 (
628 LoadQueryResult::Rows(rows),
629 phase_attribution,
630 response_decode_local_instructions,
631 ),
632 )
633 }
634 QueryMode::Delete(_) => {
635 let result = self.execute_query_dyn(query.mode(), plan)?;
636
637 Ok((
638 LoadQueryResult::Rows(result),
639 Self::empty_scalar_execute_phase_attribution(),
640 0,
641 ))
642 }
643 }
644 }
645 });
646 let (result, execute_phase_attribution, response_decode_local_instructions) = result?;
647 let total_local_instructions =
648 compile_local_instructions.saturating_add(execute_local_instructions);
649
650 Ok((
651 result,
652 QueryExecutionAttribution {
653 compile_local_instructions,
654 runtime_local_instructions: execute_phase_attribution.runtime_local_instructions,
655 finalize_local_instructions: execute_phase_attribution.finalize_local_instructions,
656 direct_data_row_scan_local_instructions: execute_phase_attribution
657 .direct_data_row_scan_local_instructions,
658 direct_data_row_key_stream_local_instructions: execute_phase_attribution
659 .direct_data_row_key_stream_local_instructions,
660 direct_data_row_row_read_local_instructions: execute_phase_attribution
661 .direct_data_row_row_read_local_instructions,
662 direct_data_row_key_encode_local_instructions: execute_phase_attribution
663 .direct_data_row_key_encode_local_instructions,
664 direct_data_row_store_get_local_instructions: execute_phase_attribution
665 .direct_data_row_store_get_local_instructions,
666 direct_data_row_order_window_local_instructions: execute_phase_attribution
667 .direct_data_row_order_window_local_instructions,
668 direct_data_row_page_window_local_instructions: execute_phase_attribution
669 .direct_data_row_page_window_local_instructions,
670 response_decode_local_instructions,
671 execute_local_instructions,
672 total_local_instructions,
673 shared_query_plan_cache_hits: cache_attribution.hits,
674 shared_query_plan_cache_misses: cache_attribution.misses,
675 },
676 ))
677 }
678
679 #[doc(hidden)]
682 pub fn execute_query_result<E>(
683 &self,
684 query: &Query<E>,
685 ) -> Result<LoadQueryResult<E>, QueryError>
686 where
687 E: PersistedRow<Canister = C> + EntityValue,
688 {
689 if query.has_grouping() {
690 return self
691 .execute_grouped(query, None)
692 .map(LoadQueryResult::Grouped);
693 }
694
695 self.execute_query(query).map(LoadQueryResult::Rows)
696 }
697
698 #[doc(hidden)]
700 pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
701 where
702 E: PersistedRow<Canister = C> + EntityValue,
703 {
704 if !query.mode().is_delete() {
706 return Err(QueryError::unsupported_query(
707 "delete count execution requires delete query mode",
708 ));
709 }
710
711 let plan = self
713 .compile_query_with_visible_indexes(query)?
714 .into_prepared_execution_plan();
715
716 self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
718 .map_err(QueryError::execute)
719 }
720
721 pub(in crate::db) fn execute_query_dyn<E>(
726 &self,
727 mode: QueryMode,
728 plan: PreparedExecutionPlan<E>,
729 ) -> Result<EntityResponse<E>, QueryError>
730 where
731 E: PersistedRow<Canister = C> + EntityValue,
732 {
733 let result = match mode {
734 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
735 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
736 };
737
738 result.map_err(QueryError::execute)
739 }
740
741 pub(in crate::db) fn execute_load_query_with<E, T>(
744 &self,
745 query: &Query<E>,
746 op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
747 ) -> Result<T, QueryError>
748 where
749 E: PersistedRow<Canister = C> + EntityValue,
750 {
751 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
752
753 self.with_metrics(|| op(self.load_executor::<E>(), plan))
754 .map_err(QueryError::execute)
755 }
756
757 pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
762 where
763 E: EntityKind<Canister = C>,
764 {
765 let compiled = self.compile_query_with_visible_indexes(query)?;
766 let explain = compiled.explain();
767 let plan_hash = compiled.plan_hash_hex();
768
769 let (executable, _) =
770 self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
771 let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
772 let execution_family = match query.mode() {
773 QueryMode::Load(_) => Some(executable.execution_family().map_err(QueryError::execute)?),
774 QueryMode::Delete(_) => None,
775 };
776
777 Ok(QueryTracePlan::new(
778 plan_hash,
779 access_strategy,
780 execution_family,
781 explain,
782 ))
783 }
784
785 pub(crate) fn execute_load_query_paged_with_trace<E>(
787 &self,
788 query: &Query<E>,
789 cursor_token: Option<&str>,
790 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
791 where
792 E: PersistedRow<Canister = C> + EntityValue,
793 {
794 let plan = self
796 .cached_prepared_query_plan_for_entity::<E>(query.structural())?
797 .0;
798 Self::ensure_scalar_paged_execution_family(
799 plan.execution_family().map_err(QueryError::execute)?,
800 )?;
801
802 let cursor_bytes = decode_optional_cursor_token(cursor_token)
804 .map_err(QueryError::from_cursor_plan_error)?;
805 let cursor = plan
806 .prepare_cursor(cursor_bytes.as_deref())
807 .map_err(QueryError::from_executor_plan_error)?;
808
809 let (page, trace) = self
811 .with_metrics(|| {
812 self.load_executor::<E>()
813 .execute_paged_with_cursor_traced(plan, cursor)
814 })
815 .map_err(QueryError::execute)?;
816 let next_cursor = page
817 .next_cursor
818 .map(|token| {
819 let Some(token) = token.as_scalar() else {
820 return Err(QueryError::scalar_paged_emitted_grouped_continuation());
821 };
822
823 token.encode().map_err(|err| {
824 QueryError::serialize_internal(format!(
825 "failed to serialize continuation cursor: {err}"
826 ))
827 })
828 })
829 .transpose()?;
830
831 Ok(PagedLoadExecutionWithTrace::new(
832 page.items,
833 next_cursor,
834 trace,
835 ))
836 }
837
838 pub(in crate::db) fn execute_grouped<E>(
843 &self,
844 query: &Query<E>,
845 cursor_token: Option<&str>,
846 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
847 where
848 E: PersistedRow<Canister = C> + EntityValue,
849 {
850 let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
851 let next_cursor = page
852 .next_cursor
853 .map(|token| {
854 let Some(token) = token.as_grouped() else {
855 return Err(QueryError::grouped_paged_emitted_scalar_continuation());
856 };
857
858 token.encode().map_err(|err| {
859 QueryError::serialize_internal(format!(
860 "failed to serialize grouped continuation cursor: {err}"
861 ))
862 })
863 })
864 .transpose()?;
865
866 Ok(PagedGroupedExecutionWithTrace::new(
867 page.rows,
868 next_cursor,
869 trace,
870 ))
871 }
872
873 fn execute_grouped_page_with_trace<E>(
876 &self,
877 query: &Query<E>,
878 cursor_token: Option<&str>,
879 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
880 where
881 E: PersistedRow<Canister = C> + EntityValue,
882 {
883 let plan = self
885 .cached_prepared_query_plan_for_entity::<E>(query.structural())?
886 .0;
887
888 self.execute_grouped_plan_with_trace(plan, cursor_token)
890 }
891
892 fn execute_grouped_plan_with_trace<E>(
894 &self,
895 plan: PreparedExecutionPlan<E>,
896 cursor_token: Option<&str>,
897 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
898 where
899 E: PersistedRow<Canister = C> + EntityValue,
900 {
901 Self::ensure_grouped_execution_family(
903 plan.execution_family().map_err(QueryError::execute)?,
904 )?;
905
906 let cursor = decode_optional_grouped_cursor_token(cursor_token)
908 .map_err(QueryError::from_cursor_plan_error)?;
909 let cursor = plan
910 .prepare_grouped_cursor_token(cursor)
911 .map_err(QueryError::from_executor_plan_error)?;
912
913 self.with_metrics(|| {
916 self.load_executor::<E>()
917 .execute_grouped_paged_with_cursor_traced(plan, cursor)
918 })
919 .map_err(QueryError::execute)
920 }
921}
922
923impl QueryPlanCacheKey {
924 fn for_authority_with_normalized_predicate(
925 authority: crate::db::executor::EntityAuthority,
926 schema_fingerprint: CommitSchemaFingerprint,
927 visibility: QueryPlanVisibility,
928 query: &StructuralQuery,
929 normalized_predicate: Option<&crate::db::predicate::Predicate>,
930 ) -> Self {
931 Self::for_authority_with_normalized_predicate_and_method_version(
932 authority,
933 schema_fingerprint,
934 visibility,
935 query,
936 normalized_predicate,
937 SHARED_QUERY_PLAN_CACHE_METHOD_VERSION,
938 )
939 }
940
941 const fn from_authority_parts(
945 authority: crate::db::executor::EntityAuthority,
946 schema_fingerprint: CommitSchemaFingerprint,
947 visibility: QueryPlanVisibility,
948 structural_query: crate::db::query::intent::StructuralQueryCacheKey,
949 cache_method_version: u8,
950 ) -> Self {
951 Self {
952 cache_method_version,
953 entity_path: authority.entity_path(),
954 schema_fingerprint,
955 visibility,
956 structural_query,
957 }
958 }
959
960 #[cfg(test)]
961 fn for_authority_with_method_version(
962 authority: crate::db::executor::EntityAuthority,
963 schema_fingerprint: CommitSchemaFingerprint,
964 visibility: QueryPlanVisibility,
965 query: &StructuralQuery,
966 cache_method_version: u8,
967 ) -> Self {
968 Self::from_authority_parts(
969 authority,
970 schema_fingerprint,
971 visibility,
972 query.structural_cache_key(),
973 cache_method_version,
974 )
975 }
976
977 fn for_authority_with_normalized_predicate_and_method_version(
978 authority: crate::db::executor::EntityAuthority,
979 schema_fingerprint: CommitSchemaFingerprint,
980 visibility: QueryPlanVisibility,
981 query: &StructuralQuery,
982 normalized_predicate: Option<&crate::db::predicate::Predicate>,
983 cache_method_version: u8,
984 ) -> Self {
985 Self::from_authority_parts(
986 authority,
987 schema_fingerprint,
988 visibility,
989 query.structural_cache_key_with_normalized_predicate(normalized_predicate),
990 cache_method_version,
991 )
992 }
993}