1#[cfg(feature = "perf-attribution")]
8use crate::db::executor::ScalarExecutePhaseAttribution;
9use crate::{
10 db::{
11 DbSession, EntityResponse, LoadQueryResult, PagedGroupedExecutionWithTrace,
12 PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
13 access::AccessStrategy,
14 commit::CommitSchemaFingerprint,
15 cursor::{
16 CursorPlanError, decode_optional_cursor_token, decode_optional_grouped_cursor_token,
17 },
18 diagnostics::ExecutionTrace,
19 executor::{
20 ExecutionFamily, GroupedCursorPage, LoadExecutor, PreparedExecutionPlan,
21 SharedPreparedExecutionPlan,
22 },
23 query::builder::{
24 PreparedFluentAggregateExplainStrategy, PreparedFluentProjectionStrategy,
25 },
26 query::explain::{
27 ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
28 },
29 query::{
30 intent::{CompiledQuery, PlannedQuery, StructuralQuery},
31 plan::{AccessPlannedQuery, QueryMode, VisibleIndexes},
32 },
33 },
34 error::InternalError,
35 model::entity::EntityModel,
36 traits::{CanisterKind, EntityKind, EntityValue, Path},
37};
38#[cfg(feature = "perf-attribution")]
39use candid::CandidType;
40use icydb_utils::Xxh3;
41#[cfg(feature = "perf-attribution")]
42use serde::Deserialize;
43use std::{cell::RefCell, collections::HashMap, hash::BuildHasherDefault};
44
45type CacheBuildHasher = BuildHasherDefault<Xxh3>;
46
47const SHARED_QUERY_PLAN_CACHE_METHOD_VERSION: u8 = 1;
50
51#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
52pub(in crate::db) enum QueryPlanVisibility {
53 StoreNotReady,
54 StoreReady,
55}
56
57#[derive(Clone, Debug, Eq, Hash, PartialEq)]
58pub(in crate::db) struct QueryPlanCacheKey {
59 cache_method_version: u8,
60 entity_path: &'static str,
61 schema_fingerprint: CommitSchemaFingerprint,
62 visibility: QueryPlanVisibility,
63 structural_query: crate::db::query::intent::StructuralQueryCacheKey,
64}
65
66#[derive(Clone, Debug)]
67pub(in crate::db) struct QueryPlanCacheEntry {
68 logical_plan: AccessPlannedQuery,
69 prepared_plan: SharedPreparedExecutionPlan,
70}
71
72impl QueryPlanCacheEntry {
73 #[must_use]
74 pub(in crate::db) const fn new(
75 logical_plan: AccessPlannedQuery,
76 prepared_plan: SharedPreparedExecutionPlan,
77 ) -> Self {
78 Self {
79 logical_plan,
80 prepared_plan,
81 }
82 }
83
84 #[must_use]
85 pub(in crate::db) const fn logical_plan(&self) -> &AccessPlannedQuery {
86 &self.logical_plan
87 }
88
89 #[must_use]
90 pub(in crate::db) fn typed_prepared_plan<E: EntityKind>(&self) -> PreparedExecutionPlan<E> {
91 self.prepared_plan.typed_clone::<E>()
92 }
93}
94
95pub(in crate::db) type QueryPlanCache =
96 HashMap<QueryPlanCacheKey, QueryPlanCacheEntry, CacheBuildHasher>;
97
98thread_local! {
99 static QUERY_PLAN_CACHES: RefCell<HashMap<usize, QueryPlanCache, CacheBuildHasher>> =
104 RefCell::new(HashMap::default());
105}
106
107#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
108pub(in crate::db) struct QueryPlanCacheAttribution {
109 pub hits: u64,
110 pub misses: u64,
111}
112
113impl QueryPlanCacheAttribution {
114 #[must_use]
115 const fn hit() -> Self {
116 Self { hits: 1, misses: 0 }
117 }
118
119 #[must_use]
120 const fn miss() -> Self {
121 Self { hits: 0, misses: 1 }
122 }
123}
124
125#[cfg(feature = "perf-attribution")]
132#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
133pub struct QueryExecutionAttribution {
134 pub compile_local_instructions: u64,
135 pub runtime_local_instructions: u64,
136 pub finalize_local_instructions: u64,
137 pub response_decode_local_instructions: u64,
138 pub execute_local_instructions: u64,
139 pub total_local_instructions: u64,
140 pub shared_query_plan_cache_hits: u64,
141 pub shared_query_plan_cache_misses: u64,
142}
143
144#[cfg(feature = "perf-attribution")]
145#[expect(
146 clippy::missing_const_for_fn,
147 reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
148)]
149fn read_query_local_instruction_counter() -> u64 {
150 #[cfg(target_arch = "wasm32")]
151 {
152 canic_cdk::api::performance_counter(1)
153 }
154
155 #[cfg(not(target_arch = "wasm32"))]
156 {
157 0
158 }
159}
160
161#[cfg(feature = "perf-attribution")]
162fn measure_query_stage<T, E>(run: impl FnOnce() -> Result<T, E>) -> (u64, Result<T, E>) {
163 let start = read_query_local_instruction_counter();
164 let result = run();
165 let delta = read_query_local_instruction_counter().saturating_sub(start);
166
167 (delta, result)
168}
169
170impl<C: CanisterKind> DbSession<C> {
171 #[cfg(feature = "perf-attribution")]
172 const fn empty_scalar_execute_phase_attribution() -> ScalarExecutePhaseAttribution {
173 ScalarExecutePhaseAttribution {
174 runtime_local_instructions: 0,
175 finalize_local_instructions: 0,
176 }
177 }
178
179 fn query_plan_cache_scope_id(&self) -> usize {
180 self.db.cache_scope_id()
181 }
182
183 fn with_query_plan_cache<R>(&self, f: impl FnOnce(&mut QueryPlanCache) -> R) -> R {
184 let scope_id = self.query_plan_cache_scope_id();
185
186 QUERY_PLAN_CACHES.with(|caches| {
187 let mut caches = caches.borrow_mut();
188 let cache = caches.entry(scope_id).or_default();
189
190 f(cache)
191 })
192 }
193
194 const fn visible_indexes_for_model(
195 model: &'static EntityModel,
196 visibility: QueryPlanVisibility,
197 ) -> VisibleIndexes<'static> {
198 match visibility {
199 QueryPlanVisibility::StoreReady => VisibleIndexes::planner_visible(model.indexes()),
200 QueryPlanVisibility::StoreNotReady => VisibleIndexes::none(),
201 }
202 }
203
204 #[cfg(test)]
205 pub(in crate::db) fn query_plan_cache_len(&self) -> usize {
206 self.with_query_plan_cache(|cache| cache.len())
207 }
208
209 #[cfg(test)]
210 pub(in crate::db) fn clear_query_plan_cache_for_tests(&self) {
211 self.with_query_plan_cache(QueryPlanCache::clear);
212 }
213
214 pub(in crate::db) fn query_plan_visibility_for_store_path(
215 &self,
216 store_path: &'static str,
217 ) -> Result<QueryPlanVisibility, QueryError> {
218 let store = self
219 .db
220 .recovered_store(store_path)
221 .map_err(QueryError::execute)?;
222 let visibility = if store.index_state() == crate::db::IndexState::Ready {
223 QueryPlanVisibility::StoreReady
224 } else {
225 QueryPlanVisibility::StoreNotReady
226 };
227
228 Ok(visibility)
229 }
230
231 pub(in crate::db) fn cached_query_plan_entry_for_authority(
232 &self,
233 authority: crate::db::executor::EntityAuthority,
234 schema_fingerprint: CommitSchemaFingerprint,
235 query: &StructuralQuery,
236 ) -> Result<(QueryPlanCacheEntry, QueryPlanCacheAttribution), QueryError> {
237 let visibility = self.query_plan_visibility_for_store_path(authority.store_path())?;
238 let cache_key =
239 QueryPlanCacheKey::for_authority(authority, schema_fingerprint, visibility, query);
240
241 {
242 let cached = self.with_query_plan_cache(|cache| cache.get(&cache_key).cloned());
243 if let Some(entry) = cached {
244 return Ok((entry, QueryPlanCacheAttribution::hit()));
245 }
246 }
247
248 let visible_indexes = Self::visible_indexes_for_model(authority.model(), visibility);
249 let plan = query.build_plan_with_visible_indexes(&visible_indexes)?;
250 let entry = QueryPlanCacheEntry::new(
251 plan.clone(),
252 SharedPreparedExecutionPlan::from_plan(authority, plan),
253 );
254 self.with_query_plan_cache(|cache| {
255 cache.insert(cache_key, entry.clone());
256 });
257
258 Ok((entry, QueryPlanCacheAttribution::miss()))
259 }
260
261 #[cfg(test)]
262 pub(in crate::db) fn query_plan_cache_key_for_tests(
263 authority: crate::db::executor::EntityAuthority,
264 schema_fingerprint: CommitSchemaFingerprint,
265 visibility: QueryPlanVisibility,
266 query: &StructuralQuery,
267 cache_method_version: u8,
268 ) -> QueryPlanCacheKey {
269 QueryPlanCacheKey::for_authority_with_method_version(
270 authority,
271 schema_fingerprint,
272 visibility,
273 query,
274 cache_method_version,
275 )
276 }
277
278 pub(in crate::db) fn cached_structural_plan_for_authority(
279 &self,
280 authority: crate::db::executor::EntityAuthority,
281 schema_fingerprint: CommitSchemaFingerprint,
282 query: &StructuralQuery,
283 ) -> Result<AccessPlannedQuery, QueryError> {
284 let (entry, _) =
285 self.cached_query_plan_entry_for_authority(authority, schema_fingerprint, query)?;
286
287 Ok(entry.logical_plan().clone())
288 }
289
290 fn with_query_visible_indexes<E, T>(
293 &self,
294 query: &Query<E>,
295 op: impl FnOnce(
296 &Query<E>,
297 &crate::db::query::plan::VisibleIndexes<'static>,
298 ) -> Result<T, QueryError>,
299 ) -> Result<T, QueryError>
300 where
301 E: EntityKind<Canister = C>,
302 {
303 let visibility = self.query_plan_visibility_for_store_path(E::Store::PATH)?;
304 let visible_indexes = Self::visible_indexes_for_model(E::MODEL, visibility);
305
306 op(query, &visible_indexes)
307 }
308
309 fn cached_structural_plan_for_entity<E>(
312 &self,
313 query: &StructuralQuery,
314 ) -> Result<AccessPlannedQuery, QueryError>
315 where
316 E: EntityKind<Canister = C>,
317 {
318 self.cached_structural_plan_for_authority(
319 crate::db::executor::EntityAuthority::for_type::<E>(),
320 crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
321 query,
322 )
323 }
324
325 pub(in crate::db::session) fn cached_prepared_query_plan_for_entity<E>(
326 &self,
327 query: &StructuralQuery,
328 ) -> Result<(PreparedExecutionPlan<E>, QueryPlanCacheAttribution), QueryError>
329 where
330 E: EntityKind<Canister = C>,
331 {
332 let (entry, attribution) = self.cached_query_plan_entry_for_authority(
333 crate::db::executor::EntityAuthority::for_type::<E>(),
334 crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
335 query,
336 )?;
337
338 Ok((entry.typed_prepared_plan::<E>(), attribution))
339 }
340
341 pub(in crate::db) fn compile_query_with_visible_indexes<E>(
344 &self,
345 query: &Query<E>,
346 ) -> Result<CompiledQuery<E>, QueryError>
347 where
348 E: EntityKind<Canister = C>,
349 {
350 let plan = self.cached_structural_plan_for_entity::<E>(query.structural())?;
351
352 Ok(Query::<E>::compiled_query_from_plan(plan))
353 }
354
355 pub(in crate::db) fn planned_query_with_visible_indexes<E>(
358 &self,
359 query: &Query<E>,
360 ) -> Result<PlannedQuery<E>, QueryError>
361 where
362 E: EntityKind<Canister = C>,
363 {
364 let plan = self.cached_structural_plan_for_entity::<E>(query.structural())?;
365
366 Ok(Query::<E>::planned_query_from_plan(plan))
367 }
368
369 pub(in crate::db) fn explain_query_with_visible_indexes<E>(
371 &self,
372 query: &Query<E>,
373 ) -> Result<ExplainPlan, QueryError>
374 where
375 E: EntityKind<Canister = C>,
376 {
377 self.with_query_visible_indexes(query, |query, visible_indexes| {
378 query.explain_with_visible_indexes(visible_indexes)
379 })
380 }
381
382 pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
385 &self,
386 query: &Query<E>,
387 ) -> Result<String, QueryError>
388 where
389 E: EntityKind<Canister = C>,
390 {
391 self.with_query_visible_indexes(query, |query, visible_indexes| {
392 query.plan_hash_hex_with_visible_indexes(visible_indexes)
393 })
394 }
395
396 pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
399 &self,
400 query: &Query<E>,
401 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
402 where
403 E: EntityValue + EntityKind<Canister = C>,
404 {
405 self.with_query_visible_indexes(query, |query, visible_indexes| {
406 query.explain_execution_with_visible_indexes(visible_indexes)
407 })
408 }
409
410 pub(in crate::db) fn explain_query_execution_text_with_visible_indexes<E>(
413 &self,
414 query: &Query<E>,
415 ) -> Result<String, QueryError>
416 where
417 E: EntityValue + EntityKind<Canister = C>,
418 {
419 self.with_query_visible_indexes(query, |query, visible_indexes| {
420 query.explain_execution_text_with_visible_indexes(visible_indexes)
421 })
422 }
423
424 pub(in crate::db) fn explain_query_execution_json_with_visible_indexes<E>(
427 &self,
428 query: &Query<E>,
429 ) -> Result<String, QueryError>
430 where
431 E: EntityValue + EntityKind<Canister = C>,
432 {
433 self.with_query_visible_indexes(query, |query, visible_indexes| {
434 query.explain_execution_json_with_visible_indexes(visible_indexes)
435 })
436 }
437
438 pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
441 &self,
442 query: &Query<E>,
443 ) -> Result<String, QueryError>
444 where
445 E: EntityValue + EntityKind<Canister = C>,
446 {
447 self.with_query_visible_indexes(query, |query, visible_indexes| {
448 query.explain_execution_verbose_with_visible_indexes(visible_indexes)
449 })
450 }
451
452 pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
455 &self,
456 query: &Query<E>,
457 strategy: &S,
458 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
459 where
460 E: EntityValue + EntityKind<Canister = C>,
461 S: PreparedFluentAggregateExplainStrategy,
462 {
463 self.with_query_visible_indexes(query, |query, visible_indexes| {
464 query
465 .explain_prepared_aggregate_terminal_with_visible_indexes(visible_indexes, strategy)
466 })
467 }
468
469 pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
472 &self,
473 query: &Query<E>,
474 target_field: &str,
475 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
476 where
477 E: EntityValue + EntityKind<Canister = C>,
478 {
479 self.with_query_visible_indexes(query, |query, visible_indexes| {
480 query.explain_bytes_by_with_visible_indexes(visible_indexes, target_field)
481 })
482 }
483
484 pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
487 &self,
488 query: &Query<E>,
489 strategy: &PreparedFluentProjectionStrategy,
490 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
491 where
492 E: EntityValue + EntityKind<Canister = C>,
493 {
494 self.with_query_visible_indexes(query, |query, visible_indexes| {
495 query.explain_prepared_projection_terminal_with_visible_indexes(
496 visible_indexes,
497 strategy,
498 )
499 })
500 }
501
502 fn ensure_scalar_paged_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
505 match family {
506 ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
507 CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
508 )),
509 ExecutionFamily::Ordered => Ok(()),
510 ExecutionFamily::Grouped => Err(QueryError::invariant(
511 "grouped queries execute via execute(), not page().execute()",
512 )),
513 }
514 }
515
516 fn ensure_grouped_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
519 match family {
520 ExecutionFamily::Grouped => Ok(()),
521 ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
522 "grouped execution requires grouped logical plans",
523 )),
524 }
525 }
526
527 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
529 where
530 E: PersistedRow<Canister = C> + EntityValue,
531 {
532 let mode = query.mode();
534 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
535
536 self.execute_query_dyn(mode, plan)
538 }
539
540 #[cfg(feature = "perf-attribution")]
543 #[doc(hidden)]
544 pub fn execute_query_result_with_attribution<E>(
545 &self,
546 query: &Query<E>,
547 ) -> Result<(LoadQueryResult<E>, QueryExecutionAttribution), QueryError>
548 where
549 E: PersistedRow<Canister = C> + EntityValue,
550 {
551 let (compile_local_instructions, plan_and_cache) = measure_query_stage(|| {
556 self.cached_prepared_query_plan_for_entity::<E>(query.structural())
557 });
558 let (plan, cache_attribution) = plan_and_cache?;
559
560 let (execute_local_instructions, result) = measure_query_stage(|| {
563 if query.has_grouping() {
564 self.execute_grouped_plan_with_trace(plan, None)
565 .map(|(page, trace)| {
566 let next_cursor = page
567 .next_cursor
568 .map(|token| {
569 let Some(token) = token.as_grouped() else {
570 return Err(
571 QueryError::grouped_paged_emitted_scalar_continuation(),
572 );
573 };
574
575 token.encode().map_err(|err| {
576 QueryError::serialize_internal(format!(
577 "failed to serialize grouped continuation cursor: {err}"
578 ))
579 })
580 })
581 .transpose()?;
582
583 Ok::<(LoadQueryResult<E>, ScalarExecutePhaseAttribution, u64), QueryError>(
584 (
585 LoadQueryResult::Grouped(PagedGroupedExecutionWithTrace::new(
586 page.rows,
587 next_cursor,
588 trace,
589 )),
590 Self::empty_scalar_execute_phase_attribution(),
591 0,
592 ),
593 )
594 })?
595 } else {
596 match query.mode() {
597 QueryMode::Load(_) => {
598 let (rows, phase_attribution, response_decode_local_instructions) = self
599 .load_executor::<E>()
600 .execute_with_phase_attribution(plan)
601 .map_err(QueryError::execute)?;
602
603 Ok::<(LoadQueryResult<E>, ScalarExecutePhaseAttribution, u64), QueryError>(
604 (
605 LoadQueryResult::Rows(rows),
606 phase_attribution,
607 response_decode_local_instructions,
608 ),
609 )
610 }
611 QueryMode::Delete(_) => {
612 let result = self.execute_query_dyn(query.mode(), plan)?;
613
614 Ok((
615 LoadQueryResult::Rows(result),
616 Self::empty_scalar_execute_phase_attribution(),
617 0,
618 ))
619 }
620 }
621 }
622 });
623 let (result, execute_phase_attribution, response_decode_local_instructions) = result?;
624 let total_local_instructions =
625 compile_local_instructions.saturating_add(execute_local_instructions);
626
627 Ok((
628 result,
629 QueryExecutionAttribution {
630 compile_local_instructions,
631 runtime_local_instructions: execute_phase_attribution.runtime_local_instructions,
632 finalize_local_instructions: execute_phase_attribution.finalize_local_instructions,
633 response_decode_local_instructions,
634 execute_local_instructions,
635 total_local_instructions,
636 shared_query_plan_cache_hits: cache_attribution.hits,
637 shared_query_plan_cache_misses: cache_attribution.misses,
638 },
639 ))
640 }
641
642 #[doc(hidden)]
645 pub fn execute_query_result<E>(
646 &self,
647 query: &Query<E>,
648 ) -> Result<LoadQueryResult<E>, QueryError>
649 where
650 E: PersistedRow<Canister = C> + EntityValue,
651 {
652 if query.has_grouping() {
653 return self
654 .execute_grouped(query, None)
655 .map(LoadQueryResult::Grouped);
656 }
657
658 self.execute_query(query).map(LoadQueryResult::Rows)
659 }
660
661 #[doc(hidden)]
663 pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
664 where
665 E: PersistedRow<Canister = C> + EntityValue,
666 {
667 if !query.mode().is_delete() {
669 return Err(QueryError::unsupported_query(
670 "delete count execution requires delete query mode",
671 ));
672 }
673
674 let plan = self
676 .compile_query_with_visible_indexes(query)?
677 .into_prepared_execution_plan();
678
679 self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
681 .map_err(QueryError::execute)
682 }
683
684 pub(in crate::db) fn execute_query_dyn<E>(
689 &self,
690 mode: QueryMode,
691 plan: PreparedExecutionPlan<E>,
692 ) -> Result<EntityResponse<E>, QueryError>
693 where
694 E: PersistedRow<Canister = C> + EntityValue,
695 {
696 let result = match mode {
697 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
698 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
699 };
700
701 result.map_err(QueryError::execute)
702 }
703
704 pub(in crate::db) fn execute_load_query_with<E, T>(
707 &self,
708 query: &Query<E>,
709 op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
710 ) -> Result<T, QueryError>
711 where
712 E: PersistedRow<Canister = C> + EntityValue,
713 {
714 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
715
716 self.with_metrics(|| op(self.load_executor::<E>(), plan))
717 .map_err(QueryError::execute)
718 }
719
720 pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
725 where
726 E: EntityKind<Canister = C>,
727 {
728 let compiled = self.compile_query_with_visible_indexes(query)?;
729 let explain = compiled.explain();
730 let plan_hash = compiled.plan_hash_hex();
731
732 let (executable, _) =
733 self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
734 let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
735 let execution_family = match query.mode() {
736 QueryMode::Load(_) => Some(executable.execution_family().map_err(QueryError::execute)?),
737 QueryMode::Delete(_) => None,
738 };
739
740 Ok(QueryTracePlan::new(
741 plan_hash,
742 access_strategy,
743 execution_family,
744 explain,
745 ))
746 }
747
748 pub(crate) fn execute_load_query_paged_with_trace<E>(
750 &self,
751 query: &Query<E>,
752 cursor_token: Option<&str>,
753 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
754 where
755 E: PersistedRow<Canister = C> + EntityValue,
756 {
757 let plan = self
759 .cached_prepared_query_plan_for_entity::<E>(query.structural())?
760 .0;
761 Self::ensure_scalar_paged_execution_family(
762 plan.execution_family().map_err(QueryError::execute)?,
763 )?;
764
765 let cursor_bytes = decode_optional_cursor_token(cursor_token)
767 .map_err(QueryError::from_cursor_plan_error)?;
768 let cursor = plan
769 .prepare_cursor(cursor_bytes.as_deref())
770 .map_err(QueryError::from_executor_plan_error)?;
771
772 let (page, trace) = self
774 .with_metrics(|| {
775 self.load_executor::<E>()
776 .execute_paged_with_cursor_traced(plan, cursor)
777 })
778 .map_err(QueryError::execute)?;
779 let next_cursor = page
780 .next_cursor
781 .map(|token| {
782 let Some(token) = token.as_scalar() else {
783 return Err(QueryError::scalar_paged_emitted_grouped_continuation());
784 };
785
786 token.encode().map_err(|err| {
787 QueryError::serialize_internal(format!(
788 "failed to serialize continuation cursor: {err}"
789 ))
790 })
791 })
792 .transpose()?;
793
794 Ok(PagedLoadExecutionWithTrace::new(
795 page.items,
796 next_cursor,
797 trace,
798 ))
799 }
800
801 pub(in crate::db) fn execute_grouped<E>(
806 &self,
807 query: &Query<E>,
808 cursor_token: Option<&str>,
809 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
810 where
811 E: PersistedRow<Canister = C> + EntityValue,
812 {
813 let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
814 let next_cursor = page
815 .next_cursor
816 .map(|token| {
817 let Some(token) = token.as_grouped() else {
818 return Err(QueryError::grouped_paged_emitted_scalar_continuation());
819 };
820
821 token.encode().map_err(|err| {
822 QueryError::serialize_internal(format!(
823 "failed to serialize grouped continuation cursor: {err}"
824 ))
825 })
826 })
827 .transpose()?;
828
829 Ok(PagedGroupedExecutionWithTrace::new(
830 page.rows,
831 next_cursor,
832 trace,
833 ))
834 }
835
836 fn execute_grouped_page_with_trace<E>(
839 &self,
840 query: &Query<E>,
841 cursor_token: Option<&str>,
842 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
843 where
844 E: PersistedRow<Canister = C> + EntityValue,
845 {
846 let plan = self
848 .cached_prepared_query_plan_for_entity::<E>(query.structural())?
849 .0;
850
851 self.execute_grouped_plan_with_trace(plan, cursor_token)
853 }
854
855 fn execute_grouped_plan_with_trace<E>(
857 &self,
858 plan: PreparedExecutionPlan<E>,
859 cursor_token: Option<&str>,
860 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
861 where
862 E: PersistedRow<Canister = C> + EntityValue,
863 {
864 Self::ensure_grouped_execution_family(
866 plan.execution_family().map_err(QueryError::execute)?,
867 )?;
868
869 let cursor = decode_optional_grouped_cursor_token(cursor_token)
871 .map_err(QueryError::from_cursor_plan_error)?;
872 let cursor = plan
873 .prepare_grouped_cursor_token(cursor)
874 .map_err(QueryError::from_executor_plan_error)?;
875
876 self.with_metrics(|| {
879 self.load_executor::<E>()
880 .execute_grouped_paged_with_cursor_traced(plan, cursor)
881 })
882 .map_err(QueryError::execute)
883 }
884}
885
886impl QueryPlanCacheKey {
887 fn for_authority(
888 authority: crate::db::executor::EntityAuthority,
889 schema_fingerprint: CommitSchemaFingerprint,
890 visibility: QueryPlanVisibility,
891 query: &StructuralQuery,
892 ) -> Self {
893 Self::for_authority_with_method_version(
894 authority,
895 schema_fingerprint,
896 visibility,
897 query,
898 SHARED_QUERY_PLAN_CACHE_METHOD_VERSION,
899 )
900 }
901
902 fn for_authority_with_method_version(
903 authority: crate::db::executor::EntityAuthority,
904 schema_fingerprint: CommitSchemaFingerprint,
905 visibility: QueryPlanVisibility,
906 query: &StructuralQuery,
907 cache_method_version: u8,
908 ) -> Self {
909 Self {
910 cache_method_version,
911 entity_path: authority.entity_path(),
912 schema_fingerprint,
913 visibility,
914 structural_query: query.structural_cache_key(),
915 }
916 }
917}