1use crate::{
8 db::{
9 DbSession, EntityResponse, LoadQueryResult, PagedGroupedExecutionWithTrace,
10 PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
11 access::AccessStrategy,
12 commit::CommitSchemaFingerprint,
13 cursor::{
14 CursorPlanError, decode_optional_cursor_token, decode_optional_grouped_cursor_token,
15 },
16 diagnostics::ExecutionTrace,
17 executor::{
18 ExecutionFamily, GroupedCursorPage, LoadExecutor, PreparedExecutionPlan,
19 SharedPreparedExecutionPlan,
20 },
21 query::builder::{
22 PreparedFluentAggregateExplainStrategy, PreparedFluentProjectionStrategy,
23 },
24 query::explain::{
25 ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
26 },
27 query::{
28 intent::{CompiledQuery, PlannedQuery, StructuralQuery},
29 plan::{AccessPlannedQuery, QueryMode, VisibleIndexes},
30 },
31 },
32 error::InternalError,
33 model::entity::EntityModel,
34 traits::{CanisterKind, EntityKind, EntityValue, Path},
35};
36#[cfg(feature = "perf-attribution")]
37use candid::CandidType;
38#[cfg(feature = "perf-attribution")]
39use serde::Deserialize;
40use std::{cell::RefCell, collections::HashMap};
41
42#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
43pub(in crate::db) enum QueryPlanVisibility {
44 StoreNotReady,
45 StoreReady,
46}
47
48#[derive(Clone, Debug, Eq, Hash, PartialEq)]
49pub(in crate::db) struct QueryPlanCacheKey {
50 entity_path: &'static str,
51 schema_fingerprint: CommitSchemaFingerprint,
52 visibility: QueryPlanVisibility,
53 query_fingerprint: [u8; 32],
54}
55
56#[derive(Clone, Debug)]
57pub(in crate::db) struct QueryPlanCacheEntry {
58 logical_plan: AccessPlannedQuery,
59 prepared_plan: SharedPreparedExecutionPlan,
60}
61
62impl QueryPlanCacheEntry {
63 #[must_use]
64 pub(in crate::db) const fn new(
65 logical_plan: AccessPlannedQuery,
66 prepared_plan: SharedPreparedExecutionPlan,
67 ) -> Self {
68 Self {
69 logical_plan,
70 prepared_plan,
71 }
72 }
73
74 #[must_use]
75 pub(in crate::db) const fn logical_plan(&self) -> &AccessPlannedQuery {
76 &self.logical_plan
77 }
78
79 #[must_use]
80 pub(in crate::db) fn typed_prepared_plan<E: EntityKind>(&self) -> PreparedExecutionPlan<E> {
81 self.prepared_plan.typed_clone::<E>()
82 }
83}
84
85pub(in crate::db) type QueryPlanCache = HashMap<QueryPlanCacheKey, QueryPlanCacheEntry>;
86
87thread_local! {
88 static QUERY_PLAN_CACHES: RefCell<HashMap<usize, QueryPlanCache>> =
93 RefCell::new(HashMap::new());
94}
95
96#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
97pub(in crate::db) struct QueryPlanCacheAttribution {
98 pub hits: u64,
99 pub misses: u64,
100}
101
102impl QueryPlanCacheAttribution {
103 #[must_use]
104 const fn hit() -> Self {
105 Self { hits: 1, misses: 0 }
106 }
107
108 #[must_use]
109 const fn miss() -> Self {
110 Self { hits: 0, misses: 1 }
111 }
112}
113
114#[cfg(feature = "perf-attribution")]
121#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
122pub struct QueryExecutionAttribution {
123 pub compile_local_instructions: u64,
124 pub execute_local_instructions: u64,
125 pub total_local_instructions: u64,
126 pub shared_query_plan_cache_hits: u64,
127 pub shared_query_plan_cache_misses: u64,
128}
129
130#[cfg(feature = "perf-attribution")]
131#[expect(
132 clippy::missing_const_for_fn,
133 reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
134)]
135fn read_query_local_instruction_counter() -> u64 {
136 #[cfg(target_arch = "wasm32")]
137 {
138 canic_cdk::api::performance_counter(1)
139 }
140
141 #[cfg(not(target_arch = "wasm32"))]
142 {
143 0
144 }
145}
146
147#[cfg(feature = "perf-attribution")]
148fn measure_query_stage<T, E>(run: impl FnOnce() -> Result<T, E>) -> (u64, Result<T, E>) {
149 let start = read_query_local_instruction_counter();
150 let result = run();
151 let delta = read_query_local_instruction_counter().saturating_sub(start);
152
153 (delta, result)
154}
155
156impl<C: CanisterKind> DbSession<C> {
157 fn query_plan_cache_scope_id(&self) -> usize {
158 self.db.cache_scope_id()
159 }
160
161 fn with_query_plan_cache<R>(&self, f: impl FnOnce(&mut QueryPlanCache) -> R) -> R {
162 let scope_id = self.query_plan_cache_scope_id();
163
164 QUERY_PLAN_CACHES.with(|caches| {
165 let mut caches = caches.borrow_mut();
166 let cache = caches.entry(scope_id).or_default();
167
168 f(cache)
169 })
170 }
171
172 const fn visible_indexes_for_model(
173 model: &'static EntityModel,
174 visibility: QueryPlanVisibility,
175 ) -> VisibleIndexes<'static> {
176 match visibility {
177 QueryPlanVisibility::StoreReady => VisibleIndexes::planner_visible(model.indexes()),
178 QueryPlanVisibility::StoreNotReady => VisibleIndexes::none(),
179 }
180 }
181
182 #[cfg(test)]
183 pub(in crate::db) fn query_plan_cache_len(&self) -> usize {
184 self.with_query_plan_cache(|cache| cache.len())
185 }
186
187 #[cfg(test)]
188 pub(in crate::db) fn clear_query_plan_cache_for_tests(&self) {
189 self.with_query_plan_cache(QueryPlanCache::clear);
190 }
191
192 pub(in crate::db) fn query_plan_visibility_for_store_path(
193 &self,
194 store_path: &'static str,
195 ) -> Result<QueryPlanVisibility, QueryError> {
196 let store = self
197 .db
198 .recovered_store(store_path)
199 .map_err(QueryError::execute)?;
200 let visibility = if store.index_state() == crate::db::IndexState::Ready {
201 QueryPlanVisibility::StoreReady
202 } else {
203 QueryPlanVisibility::StoreNotReady
204 };
205
206 Ok(visibility)
207 }
208
209 pub(in crate::db) fn cached_query_plan_entry_for_authority(
210 &self,
211 authority: crate::db::executor::EntityAuthority,
212 schema_fingerprint: CommitSchemaFingerprint,
213 query: &StructuralQuery,
214 ) -> Result<(QueryPlanCacheEntry, QueryPlanCacheAttribution), QueryError> {
215 let visibility = self.query_plan_visibility_for_store_path(authority.store_path())?;
216 let cache_key = QueryPlanCacheKey {
217 entity_path: authority.entity_path(),
218 schema_fingerprint,
219 visibility,
220 query_fingerprint: query.cache_fingerprint(),
221 };
222
223 {
224 let cached = self.with_query_plan_cache(|cache| cache.get(&cache_key).cloned());
225 if let Some(entry) = cached {
226 return Ok((entry, QueryPlanCacheAttribution::hit()));
227 }
228 }
229
230 let visible_indexes = Self::visible_indexes_for_model(authority.model(), visibility);
231 let plan = query.build_plan_with_visible_indexes(&visible_indexes)?;
232 let entry = QueryPlanCacheEntry::new(
233 plan.clone(),
234 SharedPreparedExecutionPlan::from_plan(authority, plan),
235 );
236 self.with_query_plan_cache(|cache| {
237 cache.insert(cache_key, entry.clone());
238 });
239
240 Ok((entry, QueryPlanCacheAttribution::miss()))
241 }
242
243 pub(in crate::db) fn cached_structural_plan_for_authority(
244 &self,
245 authority: crate::db::executor::EntityAuthority,
246 schema_fingerprint: CommitSchemaFingerprint,
247 query: &StructuralQuery,
248 ) -> Result<AccessPlannedQuery, QueryError> {
249 let (entry, _) =
250 self.cached_query_plan_entry_for_authority(authority, schema_fingerprint, query)?;
251
252 Ok(entry.logical_plan().clone())
253 }
254
255 fn with_query_visible_indexes<E, T>(
258 &self,
259 query: &Query<E>,
260 op: impl FnOnce(
261 &Query<E>,
262 &crate::db::query::plan::VisibleIndexes<'static>,
263 ) -> Result<T, QueryError>,
264 ) -> Result<T, QueryError>
265 where
266 E: EntityKind<Canister = C>,
267 {
268 let visibility = self.query_plan_visibility_for_store_path(E::Store::PATH)?;
269 let visible_indexes = Self::visible_indexes_for_model(E::MODEL, visibility);
270
271 op(query, &visible_indexes)
272 }
273
274 fn cached_structural_plan_for_entity<E>(
277 &self,
278 query: &StructuralQuery,
279 ) -> Result<AccessPlannedQuery, QueryError>
280 where
281 E: EntityKind<Canister = C>,
282 {
283 self.cached_structural_plan_for_authority(
284 crate::db::executor::EntityAuthority::for_type::<E>(),
285 crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
286 query,
287 )
288 }
289
290 fn cached_prepared_query_plan_for_entity<E>(
291 &self,
292 query: &StructuralQuery,
293 ) -> Result<(PreparedExecutionPlan<E>, QueryPlanCacheAttribution), QueryError>
294 where
295 E: EntityKind<Canister = C>,
296 {
297 let (entry, attribution) = self.cached_query_plan_entry_for_authority(
298 crate::db::executor::EntityAuthority::for_type::<E>(),
299 crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
300 query,
301 )?;
302
303 Ok((entry.typed_prepared_plan::<E>(), attribution))
304 }
305
306 pub(in crate::db) fn compile_query_with_visible_indexes<E>(
309 &self,
310 query: &Query<E>,
311 ) -> Result<CompiledQuery<E>, QueryError>
312 where
313 E: EntityKind<Canister = C>,
314 {
315 let plan = self.cached_structural_plan_for_entity::<E>(query.structural())?;
316
317 Ok(Query::<E>::compiled_query_from_plan(plan))
318 }
319
320 pub(in crate::db) fn planned_query_with_visible_indexes<E>(
323 &self,
324 query: &Query<E>,
325 ) -> Result<PlannedQuery<E>, QueryError>
326 where
327 E: EntityKind<Canister = C>,
328 {
329 let plan = self.cached_structural_plan_for_entity::<E>(query.structural())?;
330
331 Ok(Query::<E>::planned_query_from_plan(plan))
332 }
333
334 pub(in crate::db) fn explain_query_with_visible_indexes<E>(
336 &self,
337 query: &Query<E>,
338 ) -> Result<ExplainPlan, QueryError>
339 where
340 E: EntityKind<Canister = C>,
341 {
342 self.with_query_visible_indexes(query, |query, visible_indexes| {
343 query.explain_with_visible_indexes(visible_indexes)
344 })
345 }
346
347 pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
350 &self,
351 query: &Query<E>,
352 ) -> Result<String, QueryError>
353 where
354 E: EntityKind<Canister = C>,
355 {
356 self.with_query_visible_indexes(query, |query, visible_indexes| {
357 query.plan_hash_hex_with_visible_indexes(visible_indexes)
358 })
359 }
360
361 pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
364 &self,
365 query: &Query<E>,
366 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
367 where
368 E: EntityValue + EntityKind<Canister = C>,
369 {
370 self.with_query_visible_indexes(query, |query, visible_indexes| {
371 query.explain_execution_with_visible_indexes(visible_indexes)
372 })
373 }
374
375 pub(in crate::db) fn explain_query_execution_text_with_visible_indexes<E>(
378 &self,
379 query: &Query<E>,
380 ) -> Result<String, QueryError>
381 where
382 E: EntityValue + EntityKind<Canister = C>,
383 {
384 self.with_query_visible_indexes(query, |query, visible_indexes| {
385 query.explain_execution_text_with_visible_indexes(visible_indexes)
386 })
387 }
388
389 pub(in crate::db) fn explain_query_execution_json_with_visible_indexes<E>(
392 &self,
393 query: &Query<E>,
394 ) -> Result<String, QueryError>
395 where
396 E: EntityValue + EntityKind<Canister = C>,
397 {
398 self.with_query_visible_indexes(query, |query, visible_indexes| {
399 query.explain_execution_json_with_visible_indexes(visible_indexes)
400 })
401 }
402
403 pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
406 &self,
407 query: &Query<E>,
408 ) -> Result<String, QueryError>
409 where
410 E: EntityValue + EntityKind<Canister = C>,
411 {
412 self.with_query_visible_indexes(query, |query, visible_indexes| {
413 query.explain_execution_verbose_with_visible_indexes(visible_indexes)
414 })
415 }
416
417 pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
420 &self,
421 query: &Query<E>,
422 strategy: &S,
423 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
424 where
425 E: EntityValue + EntityKind<Canister = C>,
426 S: PreparedFluentAggregateExplainStrategy,
427 {
428 self.with_query_visible_indexes(query, |query, visible_indexes| {
429 query
430 .explain_prepared_aggregate_terminal_with_visible_indexes(visible_indexes, strategy)
431 })
432 }
433
434 pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
437 &self,
438 query: &Query<E>,
439 target_field: &str,
440 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
441 where
442 E: EntityValue + EntityKind<Canister = C>,
443 {
444 self.with_query_visible_indexes(query, |query, visible_indexes| {
445 query.explain_bytes_by_with_visible_indexes(visible_indexes, target_field)
446 })
447 }
448
449 pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
452 &self,
453 query: &Query<E>,
454 strategy: &PreparedFluentProjectionStrategy,
455 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
456 where
457 E: EntityValue + EntityKind<Canister = C>,
458 {
459 self.with_query_visible_indexes(query, |query, visible_indexes| {
460 query.explain_prepared_projection_terminal_with_visible_indexes(
461 visible_indexes,
462 strategy,
463 )
464 })
465 }
466
467 fn ensure_scalar_paged_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
470 match family {
471 ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
472 CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
473 )),
474 ExecutionFamily::Ordered => Ok(()),
475 ExecutionFamily::Grouped => Err(QueryError::invariant(
476 "grouped queries execute via execute(), not page().execute()",
477 )),
478 }
479 }
480
481 fn ensure_grouped_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
484 match family {
485 ExecutionFamily::Grouped => Ok(()),
486 ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
487 "grouped execution requires grouped logical plans",
488 )),
489 }
490 }
491
492 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
494 where
495 E: PersistedRow<Canister = C> + EntityValue,
496 {
497 let mode = query.mode();
499 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
500
501 self.execute_query_dyn(mode, plan)
503 }
504
505 #[cfg(feature = "perf-attribution")]
508 #[doc(hidden)]
509 pub fn execute_query_result_with_attribution<E>(
510 &self,
511 query: &Query<E>,
512 ) -> Result<(LoadQueryResult<E>, QueryExecutionAttribution), QueryError>
513 where
514 E: PersistedRow<Canister = C> + EntityValue,
515 {
516 let (compile_local_instructions, compiled) =
518 measure_query_stage(|| self.compile_query_with_visible_indexes(query));
519 let _compiled = compiled?;
520
521 let (execute_local_instructions, result_and_cache) = measure_query_stage(|| {
523 let (plan, cache_attribution) =
524 self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
525
526 if query.has_grouping() {
527 self.execute_grouped_plan_with_trace(plan, None)
528 .map(|(page, trace)| {
529 let next_cursor = page
530 .next_cursor
531 .map(|token| {
532 let Some(token) = token.as_grouped() else {
533 return Err(
534 QueryError::grouped_paged_emitted_scalar_continuation(),
535 );
536 };
537
538 token.encode().map_err(|err| {
539 QueryError::serialize_internal(format!(
540 "failed to serialize grouped continuation cursor: {err}"
541 ))
542 })
543 })
544 .transpose()?;
545
546 Ok::<(LoadQueryResult<E>, QueryPlanCacheAttribution), QueryError>((
547 LoadQueryResult::Grouped(PagedGroupedExecutionWithTrace::new(
548 page.rows,
549 next_cursor,
550 trace,
551 )),
552 cache_attribution,
553 ))
554 })?
555 } else {
556 self.execute_query_dyn(query.mode(), plan)
557 .map(LoadQueryResult::Rows)
558 .map(|result| (result, cache_attribution))
559 }
560 });
561 let (result, cache_attribution) = result_and_cache?;
562 let total_local_instructions =
563 compile_local_instructions.saturating_add(execute_local_instructions);
564
565 Ok((
566 result,
567 QueryExecutionAttribution {
568 compile_local_instructions,
569 execute_local_instructions,
570 total_local_instructions,
571 shared_query_plan_cache_hits: cache_attribution.hits,
572 shared_query_plan_cache_misses: cache_attribution.misses,
573 },
574 ))
575 }
576
577 #[doc(hidden)]
580 pub fn execute_query_result<E>(
581 &self,
582 query: &Query<E>,
583 ) -> Result<LoadQueryResult<E>, QueryError>
584 where
585 E: PersistedRow<Canister = C> + EntityValue,
586 {
587 if query.has_grouping() {
588 return self
589 .execute_grouped(query, None)
590 .map(LoadQueryResult::Grouped);
591 }
592
593 self.execute_query(query).map(LoadQueryResult::Rows)
594 }
595
596 #[doc(hidden)]
598 pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
599 where
600 E: PersistedRow<Canister = C> + EntityValue,
601 {
602 if !query.mode().is_delete() {
604 return Err(QueryError::unsupported_query(
605 "delete count execution requires delete query mode",
606 ));
607 }
608
609 let plan = self
611 .compile_query_with_visible_indexes(query)?
612 .into_prepared_execution_plan();
613
614 self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
616 .map_err(QueryError::execute)
617 }
618
619 pub(in crate::db) fn execute_query_dyn<E>(
624 &self,
625 mode: QueryMode,
626 plan: PreparedExecutionPlan<E>,
627 ) -> Result<EntityResponse<E>, QueryError>
628 where
629 E: PersistedRow<Canister = C> + EntityValue,
630 {
631 let result = match mode {
632 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
633 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
634 };
635
636 result.map_err(QueryError::execute)
637 }
638
639 pub(in crate::db) fn execute_load_query_with<E, T>(
642 &self,
643 query: &Query<E>,
644 op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
645 ) -> Result<T, QueryError>
646 where
647 E: PersistedRow<Canister = C> + EntityValue,
648 {
649 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
650
651 self.with_metrics(|| op(self.load_executor::<E>(), plan))
652 .map_err(QueryError::execute)
653 }
654
655 pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
660 where
661 E: EntityKind<Canister = C>,
662 {
663 let compiled = self.compile_query_with_visible_indexes(query)?;
664 let explain = compiled.explain();
665 let plan_hash = compiled.plan_hash_hex();
666
667 let (executable, _) =
668 self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
669 let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
670 let execution_family = match query.mode() {
671 QueryMode::Load(_) => Some(executable.execution_family().map_err(QueryError::execute)?),
672 QueryMode::Delete(_) => None,
673 };
674
675 Ok(QueryTracePlan::new(
676 plan_hash,
677 access_strategy,
678 execution_family,
679 explain,
680 ))
681 }
682
683 pub(crate) fn execute_load_query_paged_with_trace<E>(
685 &self,
686 query: &Query<E>,
687 cursor_token: Option<&str>,
688 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
689 where
690 E: PersistedRow<Canister = C> + EntityValue,
691 {
692 let plan = self
694 .cached_prepared_query_plan_for_entity::<E>(query.structural())?
695 .0;
696 Self::ensure_scalar_paged_execution_family(
697 plan.execution_family().map_err(QueryError::execute)?,
698 )?;
699
700 let cursor_bytes = decode_optional_cursor_token(cursor_token)
702 .map_err(QueryError::from_cursor_plan_error)?;
703 let cursor = plan
704 .prepare_cursor(cursor_bytes.as_deref())
705 .map_err(QueryError::from_executor_plan_error)?;
706
707 let (page, trace) = self
709 .with_metrics(|| {
710 self.load_executor::<E>()
711 .execute_paged_with_cursor_traced(plan, cursor)
712 })
713 .map_err(QueryError::execute)?;
714 let next_cursor = page
715 .next_cursor
716 .map(|token| {
717 let Some(token) = token.as_scalar() else {
718 return Err(QueryError::scalar_paged_emitted_grouped_continuation());
719 };
720
721 token.encode().map_err(|err| {
722 QueryError::serialize_internal(format!(
723 "failed to serialize continuation cursor: {err}"
724 ))
725 })
726 })
727 .transpose()?;
728
729 Ok(PagedLoadExecutionWithTrace::new(
730 page.items,
731 next_cursor,
732 trace,
733 ))
734 }
735
736 pub(in crate::db) fn execute_grouped<E>(
741 &self,
742 query: &Query<E>,
743 cursor_token: Option<&str>,
744 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
745 where
746 E: PersistedRow<Canister = C> + EntityValue,
747 {
748 let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
749 let next_cursor = page
750 .next_cursor
751 .map(|token| {
752 let Some(token) = token.as_grouped() else {
753 return Err(QueryError::grouped_paged_emitted_scalar_continuation());
754 };
755
756 token.encode().map_err(|err| {
757 QueryError::serialize_internal(format!(
758 "failed to serialize grouped continuation cursor: {err}"
759 ))
760 })
761 })
762 .transpose()?;
763
764 Ok(PagedGroupedExecutionWithTrace::new(
765 page.rows,
766 next_cursor,
767 trace,
768 ))
769 }
770
771 fn execute_grouped_page_with_trace<E>(
774 &self,
775 query: &Query<E>,
776 cursor_token: Option<&str>,
777 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
778 where
779 E: PersistedRow<Canister = C> + EntityValue,
780 {
781 let plan = self
783 .cached_prepared_query_plan_for_entity::<E>(query.structural())?
784 .0;
785
786 self.execute_grouped_plan_with_trace(plan, cursor_token)
788 }
789
790 fn execute_grouped_plan_with_trace<E>(
792 &self,
793 plan: PreparedExecutionPlan<E>,
794 cursor_token: Option<&str>,
795 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
796 where
797 E: PersistedRow<Canister = C> + EntityValue,
798 {
799 Self::ensure_grouped_execution_family(
801 plan.execution_family().map_err(QueryError::execute)?,
802 )?;
803
804 let cursor = decode_optional_grouped_cursor_token(cursor_token)
806 .map_err(QueryError::from_cursor_plan_error)?;
807 let cursor = plan
808 .prepare_grouped_cursor_token(cursor)
809 .map_err(QueryError::from_executor_plan_error)?;
810
811 self.with_metrics(|| {
814 self.load_executor::<E>()
815 .execute_grouped_paged_with_cursor_traced(plan, cursor)
816 })
817 .map_err(QueryError::execute)
818 }
819}