1#[cfg(feature = "perf-attribution")]
8use crate::db::executor::ScalarExecutePhaseAttribution;
9use crate::{
10 db::{
11 DbSession, EntityResponse, LoadQueryResult, PagedGroupedExecutionWithTrace,
12 PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
13 access::AccessStrategy,
14 commit::CommitSchemaFingerprint,
15 cursor::{
16 CursorPlanError, decode_optional_cursor_token, decode_optional_grouped_cursor_token,
17 },
18 diagnostics::ExecutionTrace,
19 executor::{
20 ExecutionFamily, GroupedCursorPage, LoadExecutor, PreparedExecutionPlan,
21 SharedPreparedExecutionPlan,
22 },
23 query::builder::{
24 PreparedFluentAggregateExplainStrategy, PreparedFluentProjectionStrategy,
25 },
26 query::explain::{
27 ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
28 },
29 query::{
30 intent::{CompiledQuery, PlannedQuery, StructuralQuery},
31 plan::{AccessPlannedQuery, QueryMode, VisibleIndexes},
32 },
33 },
34 error::InternalError,
35 model::entity::EntityModel,
36 traits::{CanisterKind, EntityKind, EntityValue, Path},
37};
38#[cfg(feature = "perf-attribution")]
39use candid::CandidType;
40use icydb_utils::Xxh3;
41#[cfg(feature = "perf-attribution")]
42use serde::Deserialize;
43use std::{cell::RefCell, collections::HashMap, hash::BuildHasherDefault};
44
45type CacheBuildHasher = BuildHasherDefault<Xxh3>;
46
47const SHARED_QUERY_PLAN_CACHE_METHOD_VERSION: u8 = 1;
50
51#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
52pub(in crate::db) enum QueryPlanVisibility {
53 StoreNotReady,
54 StoreReady,
55}
56
57#[derive(Clone, Debug, Eq, Hash, PartialEq)]
58pub(in crate::db) struct QueryPlanCacheKey {
59 cache_method_version: u8,
60 entity_path: &'static str,
61 schema_fingerprint: CommitSchemaFingerprint,
62 visibility: QueryPlanVisibility,
63 structural_query: crate::db::query::intent::StructuralQueryCacheKey,
64}
65
66#[derive(Clone, Debug)]
67pub(in crate::db) struct QueryPlanCacheEntry {
68 logical_plan: AccessPlannedQuery,
69 prepared_plan: SharedPreparedExecutionPlan,
70}
71
72impl QueryPlanCacheEntry {
73 #[must_use]
74 pub(in crate::db) const fn new(
75 logical_plan: AccessPlannedQuery,
76 prepared_plan: SharedPreparedExecutionPlan,
77 ) -> Self {
78 Self {
79 logical_plan,
80 prepared_plan,
81 }
82 }
83
84 #[must_use]
85 pub(in crate::db) const fn logical_plan(&self) -> &AccessPlannedQuery {
86 &self.logical_plan
87 }
88
89 #[must_use]
90 pub(in crate::db) fn typed_prepared_plan<E: EntityKind>(&self) -> PreparedExecutionPlan<E> {
91 self.prepared_plan.typed_clone::<E>()
92 }
93
94 #[must_use]
95 pub(in crate::db) const fn prepared_plan(&self) -> &SharedPreparedExecutionPlan {
96 &self.prepared_plan
97 }
98}
99
100pub(in crate::db) type QueryPlanCache =
101 HashMap<QueryPlanCacheKey, QueryPlanCacheEntry, CacheBuildHasher>;
102
103thread_local! {
104 static QUERY_PLAN_CACHES: RefCell<HashMap<usize, QueryPlanCache, CacheBuildHasher>> =
109 RefCell::new(HashMap::default());
110}
111
112#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
113pub(in crate::db) struct QueryPlanCacheAttribution {
114 pub hits: u64,
115 pub misses: u64,
116}
117
118impl QueryPlanCacheAttribution {
119 #[must_use]
120 const fn hit() -> Self {
121 Self { hits: 1, misses: 0 }
122 }
123
124 #[must_use]
125 const fn miss() -> Self {
126 Self { hits: 0, misses: 1 }
127 }
128}
129
130#[cfg(feature = "perf-attribution")]
137#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
138pub struct QueryExecutionAttribution {
139 pub compile_local_instructions: u64,
140 pub runtime_local_instructions: u64,
141 pub finalize_local_instructions: u64,
142 pub direct_data_row_scan_local_instructions: u64,
143 pub direct_data_row_key_stream_local_instructions: u64,
144 pub direct_data_row_row_read_local_instructions: u64,
145 pub direct_data_row_key_encode_local_instructions: u64,
146 pub direct_data_row_store_get_local_instructions: u64,
147 pub direct_data_row_order_window_local_instructions: u64,
148 pub direct_data_row_page_window_local_instructions: u64,
149 pub response_decode_local_instructions: u64,
150 pub execute_local_instructions: u64,
151 pub total_local_instructions: u64,
152 pub shared_query_plan_cache_hits: u64,
153 pub shared_query_plan_cache_misses: u64,
154}
155
156#[cfg(feature = "perf-attribution")]
157#[expect(
158 clippy::missing_const_for_fn,
159 reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
160)]
161fn read_query_local_instruction_counter() -> u64 {
162 #[cfg(target_arch = "wasm32")]
163 {
164 canic_cdk::api::performance_counter(1)
165 }
166
167 #[cfg(not(target_arch = "wasm32"))]
168 {
169 0
170 }
171}
172
173#[cfg(feature = "perf-attribution")]
174fn measure_query_stage<T, E>(run: impl FnOnce() -> Result<T, E>) -> (u64, Result<T, E>) {
175 let start = read_query_local_instruction_counter();
176 let result = run();
177 let delta = read_query_local_instruction_counter().saturating_sub(start);
178
179 (delta, result)
180}
181
182impl<C: CanisterKind> DbSession<C> {
183 #[cfg(feature = "perf-attribution")]
184 const fn empty_scalar_execute_phase_attribution() -> ScalarExecutePhaseAttribution {
185 ScalarExecutePhaseAttribution {
186 runtime_local_instructions: 0,
187 finalize_local_instructions: 0,
188 direct_data_row_scan_local_instructions: 0,
189 direct_data_row_key_stream_local_instructions: 0,
190 direct_data_row_row_read_local_instructions: 0,
191 direct_data_row_key_encode_local_instructions: 0,
192 direct_data_row_store_get_local_instructions: 0,
193 direct_data_row_order_window_local_instructions: 0,
194 direct_data_row_page_window_local_instructions: 0,
195 }
196 }
197
198 fn query_plan_cache_scope_id(&self) -> usize {
199 self.db.cache_scope_id()
200 }
201
202 fn with_query_plan_cache<R>(&self, f: impl FnOnce(&mut QueryPlanCache) -> R) -> R {
203 let scope_id = self.query_plan_cache_scope_id();
204
205 QUERY_PLAN_CACHES.with(|caches| {
206 let mut caches = caches.borrow_mut();
207 let cache = caches.entry(scope_id).or_default();
208
209 f(cache)
210 })
211 }
212
213 const fn visible_indexes_for_model(
214 model: &'static EntityModel,
215 visibility: QueryPlanVisibility,
216 ) -> VisibleIndexes<'static> {
217 match visibility {
218 QueryPlanVisibility::StoreReady => VisibleIndexes::planner_visible(model.indexes()),
219 QueryPlanVisibility::StoreNotReady => VisibleIndexes::none(),
220 }
221 }
222
223 #[cfg(test)]
224 pub(in crate::db) fn query_plan_cache_len(&self) -> usize {
225 self.with_query_plan_cache(|cache| cache.len())
226 }
227
228 #[cfg(test)]
229 pub(in crate::db) fn clear_query_plan_cache_for_tests(&self) {
230 self.with_query_plan_cache(QueryPlanCache::clear);
231 }
232
233 pub(in crate::db) fn query_plan_visibility_for_store_path(
234 &self,
235 store_path: &'static str,
236 ) -> Result<QueryPlanVisibility, QueryError> {
237 let store = self
238 .db
239 .recovered_store(store_path)
240 .map_err(QueryError::execute)?;
241 let visibility = if store.index_state() == crate::db::IndexState::Ready {
242 QueryPlanVisibility::StoreReady
243 } else {
244 QueryPlanVisibility::StoreNotReady
245 };
246
247 Ok(visibility)
248 }
249
250 pub(in crate::db) fn cached_query_plan_entry_for_authority(
251 &self,
252 authority: crate::db::executor::EntityAuthority,
253 schema_fingerprint: CommitSchemaFingerprint,
254 query: &StructuralQuery,
255 ) -> Result<(QueryPlanCacheEntry, QueryPlanCacheAttribution), QueryError> {
256 let visibility = self.query_plan_visibility_for_store_path(authority.store_path())?;
257 let visible_indexes = Self::visible_indexes_for_model(authority.model(), visibility);
258 let normalized_predicate = query.prepare_normalized_scalar_predicate()?;
259 let cache_key = QueryPlanCacheKey::for_authority_with_normalized_predicate(
260 authority,
261 schema_fingerprint,
262 visibility,
263 query,
264 normalized_predicate.as_ref(),
265 );
266
267 {
268 let cached = self.with_query_plan_cache(|cache| cache.get(&cache_key).cloned());
269 if let Some(entry) = cached {
270 return Ok((entry, QueryPlanCacheAttribution::hit()));
271 }
272 }
273
274 let plan = query.build_plan_with_visible_indexes_from_normalized_predicate(
275 &visible_indexes,
276 normalized_predicate,
277 )?;
278 let entry = QueryPlanCacheEntry::new(
279 plan.clone(),
280 SharedPreparedExecutionPlan::from_plan(authority, plan),
281 );
282 self.with_query_plan_cache(|cache| {
283 cache.insert(cache_key, entry.clone());
284 });
285
286 Ok((entry, QueryPlanCacheAttribution::miss()))
287 }
288
289 #[cfg(test)]
290 pub(in crate::db) fn query_plan_cache_key_for_tests(
291 authority: crate::db::executor::EntityAuthority,
292 schema_fingerprint: CommitSchemaFingerprint,
293 visibility: QueryPlanVisibility,
294 query: &StructuralQuery,
295 cache_method_version: u8,
296 ) -> QueryPlanCacheKey {
297 QueryPlanCacheKey::for_authority_with_method_version(
298 authority,
299 schema_fingerprint,
300 visibility,
301 query,
302 cache_method_version,
303 )
304 }
305
306 pub(in crate::db) fn cached_structural_plan_for_authority(
307 &self,
308 authority: crate::db::executor::EntityAuthority,
309 schema_fingerprint: CommitSchemaFingerprint,
310 query: &StructuralQuery,
311 ) -> Result<AccessPlannedQuery, QueryError> {
312 let (entry, _) =
313 self.cached_query_plan_entry_for_authority(authority, schema_fingerprint, query)?;
314
315 Ok(entry.logical_plan().clone())
316 }
317
318 fn with_query_visible_indexes<E, T>(
321 &self,
322 query: &Query<E>,
323 op: impl FnOnce(
324 &Query<E>,
325 &crate::db::query::plan::VisibleIndexes<'static>,
326 ) -> Result<T, QueryError>,
327 ) -> Result<T, QueryError>
328 where
329 E: EntityKind<Canister = C>,
330 {
331 let visibility = self.query_plan_visibility_for_store_path(E::Store::PATH)?;
332 let visible_indexes = Self::visible_indexes_for_model(E::MODEL, visibility);
333
334 op(query, &visible_indexes)
335 }
336
337 fn cached_structural_plan_for_entity<E>(
340 &self,
341 query: &StructuralQuery,
342 ) -> Result<AccessPlannedQuery, QueryError>
343 where
344 E: EntityKind<Canister = C>,
345 {
346 self.cached_structural_plan_for_authority(
347 crate::db::executor::EntityAuthority::for_type::<E>(),
348 crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
349 query,
350 )
351 }
352
353 fn map_cached_structural_plan_for_entity<E, T>(
357 &self,
358 query: &StructuralQuery,
359 map: impl FnOnce(AccessPlannedQuery) -> T,
360 ) -> Result<T, QueryError>
361 where
362 E: EntityKind<Canister = C>,
363 {
364 let plan = self.cached_structural_plan_for_entity::<E>(query)?;
365
366 Ok(map(plan))
367 }
368
369 pub(in crate::db::session) fn cached_prepared_query_plan_for_entity<E>(
370 &self,
371 query: &StructuralQuery,
372 ) -> Result<(PreparedExecutionPlan<E>, QueryPlanCacheAttribution), QueryError>
373 where
374 E: EntityKind<Canister = C>,
375 {
376 let (entry, attribution) = self.cached_query_plan_entry_for_authority(
377 crate::db::executor::EntityAuthority::for_type::<E>(),
378 crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
379 query,
380 )?;
381
382 Ok((entry.typed_prepared_plan::<E>(), attribution))
383 }
384
385 pub(in crate::db) fn compile_query_with_visible_indexes<E>(
388 &self,
389 query: &Query<E>,
390 ) -> Result<CompiledQuery<E>, QueryError>
391 where
392 E: EntityKind<Canister = C>,
393 {
394 self.map_cached_structural_plan_for_entity::<E, _>(
395 query.structural(),
396 Query::<E>::compiled_query_from_plan,
397 )
398 }
399
400 pub(in crate::db) fn planned_query_with_visible_indexes<E>(
403 &self,
404 query: &Query<E>,
405 ) -> Result<PlannedQuery<E>, QueryError>
406 where
407 E: EntityKind<Canister = C>,
408 {
409 self.map_cached_structural_plan_for_entity::<E, _>(
410 query.structural(),
411 Query::<E>::planned_query_from_plan,
412 )
413 }
414
415 pub(in crate::db) fn explain_query_with_visible_indexes<E>(
417 &self,
418 query: &Query<E>,
419 ) -> Result<ExplainPlan, QueryError>
420 where
421 E: EntityKind<Canister = C>,
422 {
423 self.with_query_visible_indexes(query, Query::<E>::explain_with_visible_indexes)
424 }
425
426 pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
429 &self,
430 query: &Query<E>,
431 ) -> Result<String, QueryError>
432 where
433 E: EntityKind<Canister = C>,
434 {
435 self.with_query_visible_indexes(query, Query::<E>::plan_hash_hex_with_visible_indexes)
436 }
437
438 pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
441 &self,
442 query: &Query<E>,
443 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
444 where
445 E: EntityValue + EntityKind<Canister = C>,
446 {
447 self.with_query_visible_indexes(query, Query::<E>::explain_execution_with_visible_indexes)
448 }
449
450 pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
453 &self,
454 query: &Query<E>,
455 ) -> Result<String, QueryError>
456 where
457 E: EntityValue + EntityKind<Canister = C>,
458 {
459 self.with_query_visible_indexes(
460 query,
461 Query::<E>::explain_execution_verbose_with_visible_indexes,
462 )
463 }
464
465 pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
468 &self,
469 query: &Query<E>,
470 strategy: &S,
471 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
472 where
473 E: EntityValue + EntityKind<Canister = C>,
474 S: PreparedFluentAggregateExplainStrategy,
475 {
476 self.with_query_visible_indexes(query, |query, visible_indexes| {
477 query
478 .explain_prepared_aggregate_terminal_with_visible_indexes(visible_indexes, strategy)
479 })
480 }
481
482 pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
485 &self,
486 query: &Query<E>,
487 target_field: &str,
488 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
489 where
490 E: EntityValue + EntityKind<Canister = C>,
491 {
492 self.with_query_visible_indexes(query, |query, visible_indexes| {
493 query.explain_bytes_by_with_visible_indexes(visible_indexes, target_field)
494 })
495 }
496
497 pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
500 &self,
501 query: &Query<E>,
502 strategy: &PreparedFluentProjectionStrategy,
503 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
504 where
505 E: EntityValue + EntityKind<Canister = C>,
506 {
507 self.with_query_visible_indexes(query, |query, visible_indexes| {
508 query.explain_prepared_projection_terminal_with_visible_indexes(
509 visible_indexes,
510 strategy,
511 )
512 })
513 }
514
515 fn ensure_scalar_paged_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
518 match family {
519 ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
520 CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
521 )),
522 ExecutionFamily::Ordered => Ok(()),
523 ExecutionFamily::Grouped => Err(QueryError::invariant(
524 "grouped queries execute via execute(), not page().execute()",
525 )),
526 }
527 }
528
529 fn ensure_grouped_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
532 match family {
533 ExecutionFamily::Grouped => Ok(()),
534 ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
535 "grouped execution requires grouped logical plans",
536 )),
537 }
538 }
539
540 fn finalize_grouped_execution_page(
544 page: GroupedCursorPage,
545 trace: Option<ExecutionTrace>,
546 ) -> Result<PagedGroupedExecutionWithTrace, QueryError> {
547 let next_cursor = page
548 .next_cursor
549 .map(|token| {
550 let Some(token) = token.as_grouped() else {
551 return Err(QueryError::grouped_paged_emitted_scalar_continuation());
552 };
553
554 token.encode().map_err(|err| {
555 QueryError::serialize_internal(format!(
556 "failed to serialize grouped continuation cursor: {err}"
557 ))
558 })
559 })
560 .transpose()?;
561
562 Ok(PagedGroupedExecutionWithTrace::new(
563 page.rows,
564 next_cursor,
565 trace,
566 ))
567 }
568
569 fn execute_grouped_query_result<E>(
573 &self,
574 query: &Query<E>,
575 cursor_token: Option<&str>,
576 ) -> Result<LoadQueryResult<E>, QueryError>
577 where
578 E: PersistedRow<Canister = C> + EntityValue,
579 {
580 self.execute_grouped(query, cursor_token)
581 .map(LoadQueryResult::grouped)
582 }
583
584 #[cfg(feature = "perf-attribution")]
588 fn execute_grouped_query_result_with_attribution<E>(
589 &self,
590 plan: PreparedExecutionPlan<E>,
591 ) -> Result<(LoadQueryResult<E>, ScalarExecutePhaseAttribution, u64), QueryError>
592 where
593 E: PersistedRow<Canister = C> + EntityValue,
594 {
595 let (page, trace) = self.execute_grouped_plan_with_trace(plan, None)?;
596 let grouped = Self::finalize_grouped_execution_page(page, trace)?;
597
598 Ok((
599 LoadQueryResult::grouped(grouped),
600 Self::empty_scalar_execute_phase_attribution(),
601 0,
602 ))
603 }
604
605 #[cfg(feature = "perf-attribution")]
609 fn execute_scalar_query_result_with_attribution<E>(
610 &self,
611 mode: QueryMode,
612 plan: PreparedExecutionPlan<E>,
613 ) -> Result<(LoadQueryResult<E>, ScalarExecutePhaseAttribution, u64), QueryError>
614 where
615 E: PersistedRow<Canister = C> + EntityValue,
616 {
617 match mode {
618 QueryMode::Load(_) => {
619 let (rows, phase_attribution, response_decode_local_instructions) = self
620 .load_executor::<E>()
621 .execute_with_phase_attribution(plan)
622 .map_err(QueryError::execute)?;
623
624 Ok((
625 LoadQueryResult::rows(rows),
626 phase_attribution,
627 response_decode_local_instructions,
628 ))
629 }
630 QueryMode::Delete(_) => {
631 let result = self.execute_query_dyn(mode, plan)?;
632
633 Ok((
634 LoadQueryResult::rows(result),
635 Self::empty_scalar_execute_phase_attribution(),
636 0,
637 ))
638 }
639 }
640 }
641
642 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
644 where
645 E: PersistedRow<Canister = C> + EntityValue,
646 {
647 let mode = query.mode();
649 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
650
651 self.execute_query_dyn(mode, plan)
653 }
654
655 #[cfg(feature = "perf-attribution")]
658 #[doc(hidden)]
659 pub fn execute_query_result_with_attribution<E>(
660 &self,
661 query: &Query<E>,
662 ) -> Result<(LoadQueryResult<E>, QueryExecutionAttribution), QueryError>
663 where
664 E: PersistedRow<Canister = C> + EntityValue,
665 {
666 let (compile_local_instructions, plan_and_cache) = measure_query_stage(|| {
671 self.cached_prepared_query_plan_for_entity::<E>(query.structural())
672 });
673 let (plan, cache_attribution) = plan_and_cache?;
674
675 let (execute_local_instructions, result) = measure_query_stage(|| {
678 if query.has_grouping() {
679 self.execute_grouped_query_result_with_attribution(plan)
680 } else {
681 self.execute_scalar_query_result_with_attribution(query.mode(), plan)
682 }
683 });
684 let (result, execute_phase_attribution, response_decode_local_instructions) = result?;
685 let total_local_instructions =
686 compile_local_instructions.saturating_add(execute_local_instructions);
687
688 Ok((
689 result,
690 QueryExecutionAttribution {
691 compile_local_instructions,
692 runtime_local_instructions: execute_phase_attribution.runtime_local_instructions,
693 finalize_local_instructions: execute_phase_attribution.finalize_local_instructions,
694 direct_data_row_scan_local_instructions: execute_phase_attribution
695 .direct_data_row_scan_local_instructions,
696 direct_data_row_key_stream_local_instructions: execute_phase_attribution
697 .direct_data_row_key_stream_local_instructions,
698 direct_data_row_row_read_local_instructions: execute_phase_attribution
699 .direct_data_row_row_read_local_instructions,
700 direct_data_row_key_encode_local_instructions: execute_phase_attribution
701 .direct_data_row_key_encode_local_instructions,
702 direct_data_row_store_get_local_instructions: execute_phase_attribution
703 .direct_data_row_store_get_local_instructions,
704 direct_data_row_order_window_local_instructions: execute_phase_attribution
705 .direct_data_row_order_window_local_instructions,
706 direct_data_row_page_window_local_instructions: execute_phase_attribution
707 .direct_data_row_page_window_local_instructions,
708 response_decode_local_instructions,
709 execute_local_instructions,
710 total_local_instructions,
711 shared_query_plan_cache_hits: cache_attribution.hits,
712 shared_query_plan_cache_misses: cache_attribution.misses,
713 },
714 ))
715 }
716
717 #[doc(hidden)]
720 pub fn execute_query_result<E>(
721 &self,
722 query: &Query<E>,
723 ) -> Result<LoadQueryResult<E>, QueryError>
724 where
725 E: PersistedRow<Canister = C> + EntityValue,
726 {
727 if query.has_grouping() {
728 return self.execute_grouped_query_result(query, None);
729 }
730
731 self.execute_query(query).map(LoadQueryResult::rows)
732 }
733
734 #[doc(hidden)]
736 pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
737 where
738 E: PersistedRow<Canister = C> + EntityValue,
739 {
740 if !query.mode().is_delete() {
742 return Err(QueryError::unsupported_query(
743 "delete count execution requires delete query mode",
744 ));
745 }
746
747 let plan = self
749 .compile_query_with_visible_indexes(query)?
750 .into_prepared_execution_plan();
751
752 self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
754 .map_err(QueryError::execute)
755 }
756
757 pub(in crate::db) fn execute_query_dyn<E>(
762 &self,
763 mode: QueryMode,
764 plan: PreparedExecutionPlan<E>,
765 ) -> Result<EntityResponse<E>, QueryError>
766 where
767 E: PersistedRow<Canister = C> + EntityValue,
768 {
769 let result = match mode {
770 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
771 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
772 };
773
774 result.map_err(QueryError::execute)
775 }
776
777 pub(in crate::db) fn execute_load_query_with<E, T>(
780 &self,
781 query: &Query<E>,
782 op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
783 ) -> Result<T, QueryError>
784 where
785 E: PersistedRow<Canister = C> + EntityValue,
786 {
787 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
788
789 self.with_metrics(|| op(self.load_executor::<E>(), plan))
790 .map_err(QueryError::execute)
791 }
792
793 pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
798 where
799 E: EntityKind<Canister = C>,
800 {
801 let compiled = self.compile_query_with_visible_indexes(query)?;
802 let explain = compiled.explain();
803 let plan_hash = compiled.plan_hash_hex();
804
805 let (executable, _) =
806 self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
807 let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
808 let execution_family = match query.mode() {
809 QueryMode::Load(_) => Some(executable.execution_family().map_err(QueryError::execute)?),
810 QueryMode::Delete(_) => None,
811 };
812
813 Ok(QueryTracePlan::new(
814 plan_hash,
815 access_strategy,
816 execution_family,
817 explain,
818 ))
819 }
820
821 pub(crate) fn execute_load_query_paged_with_trace<E>(
823 &self,
824 query: &Query<E>,
825 cursor_token: Option<&str>,
826 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
827 where
828 E: PersistedRow<Canister = C> + EntityValue,
829 {
830 let plan = self
832 .cached_prepared_query_plan_for_entity::<E>(query.structural())?
833 .0;
834 Self::ensure_scalar_paged_execution_family(
835 plan.execution_family().map_err(QueryError::execute)?,
836 )?;
837
838 let cursor_bytes = decode_optional_cursor_token(cursor_token)
840 .map_err(QueryError::from_cursor_plan_error)?;
841 let cursor = plan
842 .prepare_cursor(cursor_bytes.as_deref())
843 .map_err(QueryError::from_executor_plan_error)?;
844
845 let (page, trace) = self
847 .with_metrics(|| {
848 self.load_executor::<E>()
849 .execute_paged_with_cursor_traced(plan, cursor)
850 })
851 .map_err(QueryError::execute)?;
852 let next_cursor = page
853 .next_cursor
854 .map(|token| {
855 let Some(token) = token.as_scalar() else {
856 return Err(QueryError::scalar_paged_emitted_grouped_continuation());
857 };
858
859 token.encode().map_err(|err| {
860 QueryError::serialize_internal(format!(
861 "failed to serialize continuation cursor: {err}"
862 ))
863 })
864 })
865 .transpose()?;
866
867 Ok(PagedLoadExecutionWithTrace::new(
868 page.items,
869 next_cursor,
870 trace,
871 ))
872 }
873
874 pub(in crate::db) fn execute_grouped<E>(
879 &self,
880 query: &Query<E>,
881 cursor_token: Option<&str>,
882 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
883 where
884 E: PersistedRow<Canister = C> + EntityValue,
885 {
886 let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
887 Self::finalize_grouped_execution_page(page, trace)
888 }
889
890 fn execute_grouped_page_with_trace<E>(
893 &self,
894 query: &Query<E>,
895 cursor_token: Option<&str>,
896 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
897 where
898 E: PersistedRow<Canister = C> + EntityValue,
899 {
900 let plan = self
902 .cached_prepared_query_plan_for_entity::<E>(query.structural())?
903 .0;
904
905 self.execute_grouped_plan_with_trace(plan, cursor_token)
907 }
908
909 fn execute_grouped_plan_with_trace<E>(
911 &self,
912 plan: PreparedExecutionPlan<E>,
913 cursor_token: Option<&str>,
914 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
915 where
916 E: PersistedRow<Canister = C> + EntityValue,
917 {
918 Self::ensure_grouped_execution_family(
920 plan.execution_family().map_err(QueryError::execute)?,
921 )?;
922
923 let cursor = decode_optional_grouped_cursor_token(cursor_token)
925 .map_err(QueryError::from_cursor_plan_error)?;
926 let cursor = plan
927 .prepare_grouped_cursor_token(cursor)
928 .map_err(QueryError::from_executor_plan_error)?;
929
930 self.with_metrics(|| {
933 self.load_executor::<E>()
934 .execute_grouped_paged_with_cursor_traced(plan, cursor)
935 })
936 .map_err(QueryError::execute)
937 }
938}
939
940impl QueryPlanCacheKey {
941 fn for_authority_with_normalized_predicate(
942 authority: crate::db::executor::EntityAuthority,
943 schema_fingerprint: CommitSchemaFingerprint,
944 visibility: QueryPlanVisibility,
945 query: &StructuralQuery,
946 normalized_predicate: Option<&crate::db::predicate::Predicate>,
947 ) -> Self {
948 Self::for_authority_with_normalized_predicate_and_method_version(
949 authority,
950 schema_fingerprint,
951 visibility,
952 query,
953 normalized_predicate,
954 SHARED_QUERY_PLAN_CACHE_METHOD_VERSION,
955 )
956 }
957
958 const fn from_authority_parts(
962 authority: crate::db::executor::EntityAuthority,
963 schema_fingerprint: CommitSchemaFingerprint,
964 visibility: QueryPlanVisibility,
965 structural_query: crate::db::query::intent::StructuralQueryCacheKey,
966 cache_method_version: u8,
967 ) -> Self {
968 Self {
969 cache_method_version,
970 entity_path: authority.entity_path(),
971 schema_fingerprint,
972 visibility,
973 structural_query,
974 }
975 }
976
977 #[cfg(test)]
978 fn for_authority_with_method_version(
979 authority: crate::db::executor::EntityAuthority,
980 schema_fingerprint: CommitSchemaFingerprint,
981 visibility: QueryPlanVisibility,
982 query: &StructuralQuery,
983 cache_method_version: u8,
984 ) -> Self {
985 Self::from_authority_parts(
986 authority,
987 schema_fingerprint,
988 visibility,
989 query.structural_cache_key(),
990 cache_method_version,
991 )
992 }
993
994 fn for_authority_with_normalized_predicate_and_method_version(
995 authority: crate::db::executor::EntityAuthority,
996 schema_fingerprint: CommitSchemaFingerprint,
997 visibility: QueryPlanVisibility,
998 query: &StructuralQuery,
999 normalized_predicate: Option<&crate::db::predicate::Predicate>,
1000 cache_method_version: u8,
1001 ) -> Self {
1002 Self::from_authority_parts(
1003 authority,
1004 schema_fingerprint,
1005 visibility,
1006 query.structural_cache_key_with_normalized_predicate(normalized_predicate),
1007 cache_method_version,
1008 )
1009 }
1010}