1use crate::{
8 db::{
9 DbSession, EntityResponse, LoadQueryResult, PagedGroupedExecutionWithTrace,
10 PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
11 access::AccessStrategy,
12 commit::CommitSchemaFingerprint,
13 cursor::{
14 CursorPlanError, decode_optional_cursor_token, decode_optional_grouped_cursor_token,
15 },
16 diagnostics::ExecutionTrace,
17 executor::{
18 ExecutionFamily, GroupedCursorPage, LoadExecutor, PreparedExecutionPlan,
19 SharedPreparedExecutionPlan,
20 },
21 query::builder::{
22 PreparedFluentAggregateExplainStrategy, PreparedFluentProjectionStrategy,
23 },
24 query::explain::{
25 ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
26 },
27 query::{
28 intent::{CompiledQuery, PlannedQuery, StructuralQuery},
29 plan::{AccessPlannedQuery, QueryMode, VisibleIndexes},
30 },
31 },
32 error::InternalError,
33 model::entity::EntityModel,
34 traits::{CanisterKind, EntityKind, EntityValue, Path},
35};
36#[cfg(feature = "perf-attribution")]
37use candid::CandidType;
38use icydb_utils::Xxh3;
39#[cfg(feature = "perf-attribution")]
40use serde::Deserialize;
41use std::{cell::RefCell, collections::HashMap, hash::BuildHasherDefault};
42
43type CacheBuildHasher = BuildHasherDefault<Xxh3>;
44
45#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
46pub(in crate::db) enum QueryPlanVisibility {
47 StoreNotReady,
48 StoreReady,
49}
50
51#[derive(Clone, Debug, Eq, Hash, PartialEq)]
52pub(in crate::db) struct QueryPlanCacheKey {
53 entity_path: &'static str,
54 schema_fingerprint: CommitSchemaFingerprint,
55 visibility: QueryPlanVisibility,
56 structural_query: crate::db::query::intent::StructuralQueryCacheKey,
57}
58
59#[derive(Clone, Debug)]
60pub(in crate::db) struct QueryPlanCacheEntry {
61 logical_plan: AccessPlannedQuery,
62 prepared_plan: SharedPreparedExecutionPlan,
63}
64
65impl QueryPlanCacheEntry {
66 #[must_use]
67 pub(in crate::db) const fn new(
68 logical_plan: AccessPlannedQuery,
69 prepared_plan: SharedPreparedExecutionPlan,
70 ) -> Self {
71 Self {
72 logical_plan,
73 prepared_plan,
74 }
75 }
76
77 #[must_use]
78 pub(in crate::db) const fn logical_plan(&self) -> &AccessPlannedQuery {
79 &self.logical_plan
80 }
81
82 #[must_use]
83 pub(in crate::db) fn typed_prepared_plan<E: EntityKind>(&self) -> PreparedExecutionPlan<E> {
84 self.prepared_plan.typed_clone::<E>()
85 }
86}
87
88pub(in crate::db) type QueryPlanCache =
89 HashMap<QueryPlanCacheKey, QueryPlanCacheEntry, CacheBuildHasher>;
90
91thread_local! {
92 static QUERY_PLAN_CACHES: RefCell<HashMap<usize, QueryPlanCache, CacheBuildHasher>> =
97 RefCell::new(HashMap::default());
98}
99
100#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
101pub(in crate::db) struct QueryPlanCacheAttribution {
102 pub hits: u64,
103 pub misses: u64,
104}
105
106impl QueryPlanCacheAttribution {
107 #[must_use]
108 const fn hit() -> Self {
109 Self { hits: 1, misses: 0 }
110 }
111
112 #[must_use]
113 const fn miss() -> Self {
114 Self { hits: 0, misses: 1 }
115 }
116}
117
118#[cfg(feature = "perf-attribution")]
125#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
126pub struct QueryExecutionAttribution {
127 pub compile_local_instructions: u64,
128 pub execute_local_instructions: u64,
129 pub total_local_instructions: u64,
130 pub shared_query_plan_cache_hits: u64,
131 pub shared_query_plan_cache_misses: u64,
132}
133
134#[cfg(feature = "perf-attribution")]
135#[expect(
136 clippy::missing_const_for_fn,
137 reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
138)]
139fn read_query_local_instruction_counter() -> u64 {
140 #[cfg(target_arch = "wasm32")]
141 {
142 canic_cdk::api::performance_counter(1)
143 }
144
145 #[cfg(not(target_arch = "wasm32"))]
146 {
147 0
148 }
149}
150
151#[cfg(feature = "perf-attribution")]
152fn measure_query_stage<T, E>(run: impl FnOnce() -> Result<T, E>) -> (u64, Result<T, E>) {
153 let start = read_query_local_instruction_counter();
154 let result = run();
155 let delta = read_query_local_instruction_counter().saturating_sub(start);
156
157 (delta, result)
158}
159
160impl<C: CanisterKind> DbSession<C> {
161 fn query_plan_cache_scope_id(&self) -> usize {
162 self.db.cache_scope_id()
163 }
164
165 fn with_query_plan_cache<R>(&self, f: impl FnOnce(&mut QueryPlanCache) -> R) -> R {
166 let scope_id = self.query_plan_cache_scope_id();
167
168 QUERY_PLAN_CACHES.with(|caches| {
169 let mut caches = caches.borrow_mut();
170 let cache = caches.entry(scope_id).or_default();
171
172 f(cache)
173 })
174 }
175
176 const fn visible_indexes_for_model(
177 model: &'static EntityModel,
178 visibility: QueryPlanVisibility,
179 ) -> VisibleIndexes<'static> {
180 match visibility {
181 QueryPlanVisibility::StoreReady => VisibleIndexes::planner_visible(model.indexes()),
182 QueryPlanVisibility::StoreNotReady => VisibleIndexes::none(),
183 }
184 }
185
186 #[cfg(test)]
187 pub(in crate::db) fn query_plan_cache_len(&self) -> usize {
188 self.with_query_plan_cache(|cache| cache.len())
189 }
190
191 #[cfg(test)]
192 pub(in crate::db) fn clear_query_plan_cache_for_tests(&self) {
193 self.with_query_plan_cache(QueryPlanCache::clear);
194 }
195
196 pub(in crate::db) fn query_plan_visibility_for_store_path(
197 &self,
198 store_path: &'static str,
199 ) -> Result<QueryPlanVisibility, QueryError> {
200 let store = self
201 .db
202 .recovered_store(store_path)
203 .map_err(QueryError::execute)?;
204 let visibility = if store.index_state() == crate::db::IndexState::Ready {
205 QueryPlanVisibility::StoreReady
206 } else {
207 QueryPlanVisibility::StoreNotReady
208 };
209
210 Ok(visibility)
211 }
212
213 pub(in crate::db) fn cached_query_plan_entry_for_authority(
214 &self,
215 authority: crate::db::executor::EntityAuthority,
216 schema_fingerprint: CommitSchemaFingerprint,
217 query: &StructuralQuery,
218 ) -> Result<(QueryPlanCacheEntry, QueryPlanCacheAttribution), QueryError> {
219 let visibility = self.query_plan_visibility_for_store_path(authority.store_path())?;
220 let cache_key = QueryPlanCacheKey {
221 entity_path: authority.entity_path(),
222 schema_fingerprint,
223 visibility,
224 structural_query: query.structural_cache_key(),
225 };
226
227 {
228 let cached = self.with_query_plan_cache(|cache| cache.get(&cache_key).cloned());
229 if let Some(entry) = cached {
230 return Ok((entry, QueryPlanCacheAttribution::hit()));
231 }
232 }
233
234 let visible_indexes = Self::visible_indexes_for_model(authority.model(), visibility);
235 let plan = query.build_plan_with_visible_indexes(&visible_indexes)?;
236 let entry = QueryPlanCacheEntry::new(
237 plan.clone(),
238 SharedPreparedExecutionPlan::from_plan(authority, plan),
239 );
240 self.with_query_plan_cache(|cache| {
241 cache.insert(cache_key, entry.clone());
242 });
243
244 Ok((entry, QueryPlanCacheAttribution::miss()))
245 }
246
247 pub(in crate::db) fn cached_structural_plan_for_authority(
248 &self,
249 authority: crate::db::executor::EntityAuthority,
250 schema_fingerprint: CommitSchemaFingerprint,
251 query: &StructuralQuery,
252 ) -> Result<AccessPlannedQuery, QueryError> {
253 let (entry, _) =
254 self.cached_query_plan_entry_for_authority(authority, schema_fingerprint, query)?;
255
256 Ok(entry.logical_plan().clone())
257 }
258
259 fn with_query_visible_indexes<E, T>(
262 &self,
263 query: &Query<E>,
264 op: impl FnOnce(
265 &Query<E>,
266 &crate::db::query::plan::VisibleIndexes<'static>,
267 ) -> Result<T, QueryError>,
268 ) -> Result<T, QueryError>
269 where
270 E: EntityKind<Canister = C>,
271 {
272 let visibility = self.query_plan_visibility_for_store_path(E::Store::PATH)?;
273 let visible_indexes = Self::visible_indexes_for_model(E::MODEL, visibility);
274
275 op(query, &visible_indexes)
276 }
277
278 fn cached_structural_plan_for_entity<E>(
281 &self,
282 query: &StructuralQuery,
283 ) -> Result<AccessPlannedQuery, QueryError>
284 where
285 E: EntityKind<Canister = C>,
286 {
287 self.cached_structural_plan_for_authority(
288 crate::db::executor::EntityAuthority::for_type::<E>(),
289 crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
290 query,
291 )
292 }
293
294 fn cached_prepared_query_plan_for_entity<E>(
295 &self,
296 query: &StructuralQuery,
297 ) -> Result<(PreparedExecutionPlan<E>, QueryPlanCacheAttribution), QueryError>
298 where
299 E: EntityKind<Canister = C>,
300 {
301 let (entry, attribution) = self.cached_query_plan_entry_for_authority(
302 crate::db::executor::EntityAuthority::for_type::<E>(),
303 crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
304 query,
305 )?;
306
307 Ok((entry.typed_prepared_plan::<E>(), attribution))
308 }
309
310 pub(in crate::db) fn compile_query_with_visible_indexes<E>(
313 &self,
314 query: &Query<E>,
315 ) -> Result<CompiledQuery<E>, QueryError>
316 where
317 E: EntityKind<Canister = C>,
318 {
319 let plan = self.cached_structural_plan_for_entity::<E>(query.structural())?;
320
321 Ok(Query::<E>::compiled_query_from_plan(plan))
322 }
323
324 pub(in crate::db) fn planned_query_with_visible_indexes<E>(
327 &self,
328 query: &Query<E>,
329 ) -> Result<PlannedQuery<E>, QueryError>
330 where
331 E: EntityKind<Canister = C>,
332 {
333 let plan = self.cached_structural_plan_for_entity::<E>(query.structural())?;
334
335 Ok(Query::<E>::planned_query_from_plan(plan))
336 }
337
338 pub(in crate::db) fn explain_query_with_visible_indexes<E>(
340 &self,
341 query: &Query<E>,
342 ) -> Result<ExplainPlan, QueryError>
343 where
344 E: EntityKind<Canister = C>,
345 {
346 self.with_query_visible_indexes(query, |query, visible_indexes| {
347 query.explain_with_visible_indexes(visible_indexes)
348 })
349 }
350
351 pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
354 &self,
355 query: &Query<E>,
356 ) -> Result<String, QueryError>
357 where
358 E: EntityKind<Canister = C>,
359 {
360 self.with_query_visible_indexes(query, |query, visible_indexes| {
361 query.plan_hash_hex_with_visible_indexes(visible_indexes)
362 })
363 }
364
365 pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
368 &self,
369 query: &Query<E>,
370 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
371 where
372 E: EntityValue + EntityKind<Canister = C>,
373 {
374 self.with_query_visible_indexes(query, |query, visible_indexes| {
375 query.explain_execution_with_visible_indexes(visible_indexes)
376 })
377 }
378
379 pub(in crate::db) fn explain_query_execution_text_with_visible_indexes<E>(
382 &self,
383 query: &Query<E>,
384 ) -> Result<String, QueryError>
385 where
386 E: EntityValue + EntityKind<Canister = C>,
387 {
388 self.with_query_visible_indexes(query, |query, visible_indexes| {
389 query.explain_execution_text_with_visible_indexes(visible_indexes)
390 })
391 }
392
393 pub(in crate::db) fn explain_query_execution_json_with_visible_indexes<E>(
396 &self,
397 query: &Query<E>,
398 ) -> Result<String, QueryError>
399 where
400 E: EntityValue + EntityKind<Canister = C>,
401 {
402 self.with_query_visible_indexes(query, |query, visible_indexes| {
403 query.explain_execution_json_with_visible_indexes(visible_indexes)
404 })
405 }
406
407 pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
410 &self,
411 query: &Query<E>,
412 ) -> Result<String, QueryError>
413 where
414 E: EntityValue + EntityKind<Canister = C>,
415 {
416 self.with_query_visible_indexes(query, |query, visible_indexes| {
417 query.explain_execution_verbose_with_visible_indexes(visible_indexes)
418 })
419 }
420
421 pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
424 &self,
425 query: &Query<E>,
426 strategy: &S,
427 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
428 where
429 E: EntityValue + EntityKind<Canister = C>,
430 S: PreparedFluentAggregateExplainStrategy,
431 {
432 self.with_query_visible_indexes(query, |query, visible_indexes| {
433 query
434 .explain_prepared_aggregate_terminal_with_visible_indexes(visible_indexes, strategy)
435 })
436 }
437
438 pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
441 &self,
442 query: &Query<E>,
443 target_field: &str,
444 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
445 where
446 E: EntityValue + EntityKind<Canister = C>,
447 {
448 self.with_query_visible_indexes(query, |query, visible_indexes| {
449 query.explain_bytes_by_with_visible_indexes(visible_indexes, target_field)
450 })
451 }
452
453 pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
456 &self,
457 query: &Query<E>,
458 strategy: &PreparedFluentProjectionStrategy,
459 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
460 where
461 E: EntityValue + EntityKind<Canister = C>,
462 {
463 self.with_query_visible_indexes(query, |query, visible_indexes| {
464 query.explain_prepared_projection_terminal_with_visible_indexes(
465 visible_indexes,
466 strategy,
467 )
468 })
469 }
470
471 fn ensure_scalar_paged_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
474 match family {
475 ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
476 CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
477 )),
478 ExecutionFamily::Ordered => Ok(()),
479 ExecutionFamily::Grouped => Err(QueryError::invariant(
480 "grouped queries execute via execute(), not page().execute()",
481 )),
482 }
483 }
484
485 fn ensure_grouped_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
488 match family {
489 ExecutionFamily::Grouped => Ok(()),
490 ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
491 "grouped execution requires grouped logical plans",
492 )),
493 }
494 }
495
496 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
498 where
499 E: PersistedRow<Canister = C> + EntityValue,
500 {
501 let mode = query.mode();
503 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
504
505 self.execute_query_dyn(mode, plan)
507 }
508
509 #[cfg(feature = "perf-attribution")]
512 #[doc(hidden)]
513 pub fn execute_query_result_with_attribution<E>(
514 &self,
515 query: &Query<E>,
516 ) -> Result<(LoadQueryResult<E>, QueryExecutionAttribution), QueryError>
517 where
518 E: PersistedRow<Canister = C> + EntityValue,
519 {
520 let (compile_local_instructions, plan_and_cache) = measure_query_stage(|| {
525 self.cached_prepared_query_plan_for_entity::<E>(query.structural())
526 });
527 let (plan, cache_attribution) = plan_and_cache?;
528
529 let (execute_local_instructions, result) = measure_query_stage(|| {
532 if query.has_grouping() {
533 self.execute_grouped_plan_with_trace(plan, None)
534 .map(|(page, trace)| {
535 let next_cursor = page
536 .next_cursor
537 .map(|token| {
538 let Some(token) = token.as_grouped() else {
539 return Err(
540 QueryError::grouped_paged_emitted_scalar_continuation(),
541 );
542 };
543
544 token.encode().map_err(|err| {
545 QueryError::serialize_internal(format!(
546 "failed to serialize grouped continuation cursor: {err}"
547 ))
548 })
549 })
550 .transpose()?;
551
552 Ok::<LoadQueryResult<E>, QueryError>(LoadQueryResult::Grouped(
553 PagedGroupedExecutionWithTrace::new(page.rows, next_cursor, trace),
554 ))
555 })?
556 } else {
557 self.execute_query_dyn(query.mode(), plan)
558 .map(LoadQueryResult::Rows)
559 }
560 });
561 let result = result?;
562 let total_local_instructions =
563 compile_local_instructions.saturating_add(execute_local_instructions);
564
565 Ok((
566 result,
567 QueryExecutionAttribution {
568 compile_local_instructions,
569 execute_local_instructions,
570 total_local_instructions,
571 shared_query_plan_cache_hits: cache_attribution.hits,
572 shared_query_plan_cache_misses: cache_attribution.misses,
573 },
574 ))
575 }
576
577 #[doc(hidden)]
580 pub fn execute_query_result<E>(
581 &self,
582 query: &Query<E>,
583 ) -> Result<LoadQueryResult<E>, QueryError>
584 where
585 E: PersistedRow<Canister = C> + EntityValue,
586 {
587 if query.has_grouping() {
588 return self
589 .execute_grouped(query, None)
590 .map(LoadQueryResult::Grouped);
591 }
592
593 self.execute_query(query).map(LoadQueryResult::Rows)
594 }
595
596 #[doc(hidden)]
598 pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
599 where
600 E: PersistedRow<Canister = C> + EntityValue,
601 {
602 if !query.mode().is_delete() {
604 return Err(QueryError::unsupported_query(
605 "delete count execution requires delete query mode",
606 ));
607 }
608
609 let plan = self
611 .compile_query_with_visible_indexes(query)?
612 .into_prepared_execution_plan();
613
614 self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
616 .map_err(QueryError::execute)
617 }
618
619 pub(in crate::db) fn execute_query_dyn<E>(
624 &self,
625 mode: QueryMode,
626 plan: PreparedExecutionPlan<E>,
627 ) -> Result<EntityResponse<E>, QueryError>
628 where
629 E: PersistedRow<Canister = C> + EntityValue,
630 {
631 let result = match mode {
632 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
633 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
634 };
635
636 result.map_err(QueryError::execute)
637 }
638
639 pub(in crate::db) fn execute_load_query_with<E, T>(
642 &self,
643 query: &Query<E>,
644 op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
645 ) -> Result<T, QueryError>
646 where
647 E: PersistedRow<Canister = C> + EntityValue,
648 {
649 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
650
651 self.with_metrics(|| op(self.load_executor::<E>(), plan))
652 .map_err(QueryError::execute)
653 }
654
655 pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
660 where
661 E: EntityKind<Canister = C>,
662 {
663 let compiled = self.compile_query_with_visible_indexes(query)?;
664 let explain = compiled.explain();
665 let plan_hash = compiled.plan_hash_hex();
666
667 let (executable, _) =
668 self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
669 let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
670 let execution_family = match query.mode() {
671 QueryMode::Load(_) => Some(executable.execution_family().map_err(QueryError::execute)?),
672 QueryMode::Delete(_) => None,
673 };
674
675 Ok(QueryTracePlan::new(
676 plan_hash,
677 access_strategy,
678 execution_family,
679 explain,
680 ))
681 }
682
683 pub(crate) fn execute_load_query_paged_with_trace<E>(
685 &self,
686 query: &Query<E>,
687 cursor_token: Option<&str>,
688 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
689 where
690 E: PersistedRow<Canister = C> + EntityValue,
691 {
692 let plan = self
694 .cached_prepared_query_plan_for_entity::<E>(query.structural())?
695 .0;
696 Self::ensure_scalar_paged_execution_family(
697 plan.execution_family().map_err(QueryError::execute)?,
698 )?;
699
700 let cursor_bytes = decode_optional_cursor_token(cursor_token)
702 .map_err(QueryError::from_cursor_plan_error)?;
703 let cursor = plan
704 .prepare_cursor(cursor_bytes.as_deref())
705 .map_err(QueryError::from_executor_plan_error)?;
706
707 let (page, trace) = self
709 .with_metrics(|| {
710 self.load_executor::<E>()
711 .execute_paged_with_cursor_traced(plan, cursor)
712 })
713 .map_err(QueryError::execute)?;
714 let next_cursor = page
715 .next_cursor
716 .map(|token| {
717 let Some(token) = token.as_scalar() else {
718 return Err(QueryError::scalar_paged_emitted_grouped_continuation());
719 };
720
721 token.encode().map_err(|err| {
722 QueryError::serialize_internal(format!(
723 "failed to serialize continuation cursor: {err}"
724 ))
725 })
726 })
727 .transpose()?;
728
729 Ok(PagedLoadExecutionWithTrace::new(
730 page.items,
731 next_cursor,
732 trace,
733 ))
734 }
735
736 pub(in crate::db) fn execute_grouped<E>(
741 &self,
742 query: &Query<E>,
743 cursor_token: Option<&str>,
744 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
745 where
746 E: PersistedRow<Canister = C> + EntityValue,
747 {
748 let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
749 let next_cursor = page
750 .next_cursor
751 .map(|token| {
752 let Some(token) = token.as_grouped() else {
753 return Err(QueryError::grouped_paged_emitted_scalar_continuation());
754 };
755
756 token.encode().map_err(|err| {
757 QueryError::serialize_internal(format!(
758 "failed to serialize grouped continuation cursor: {err}"
759 ))
760 })
761 })
762 .transpose()?;
763
764 Ok(PagedGroupedExecutionWithTrace::new(
765 page.rows,
766 next_cursor,
767 trace,
768 ))
769 }
770
771 fn execute_grouped_page_with_trace<E>(
774 &self,
775 query: &Query<E>,
776 cursor_token: Option<&str>,
777 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
778 where
779 E: PersistedRow<Canister = C> + EntityValue,
780 {
781 let plan = self
783 .cached_prepared_query_plan_for_entity::<E>(query.structural())?
784 .0;
785
786 self.execute_grouped_plan_with_trace(plan, cursor_token)
788 }
789
790 fn execute_grouped_plan_with_trace<E>(
792 &self,
793 plan: PreparedExecutionPlan<E>,
794 cursor_token: Option<&str>,
795 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
796 where
797 E: PersistedRow<Canister = C> + EntityValue,
798 {
799 Self::ensure_grouped_execution_family(
801 plan.execution_family().map_err(QueryError::execute)?,
802 )?;
803
804 let cursor = decode_optional_grouped_cursor_token(cursor_token)
806 .map_err(QueryError::from_cursor_plan_error)?;
807 let cursor = plan
808 .prepare_grouped_cursor_token(cursor)
809 .map_err(QueryError::from_executor_plan_error)?;
810
811 self.with_metrics(|| {
814 self.load_executor::<E>()
815 .execute_grouped_paged_with_cursor_traced(plan, cursor)
816 })
817 .map_err(QueryError::execute)
818 }
819}