1#[cfg(feature = "perf-attribution")]
8use crate::db::executor::{GroupedExecutePhaseAttribution, ScalarExecutePhaseAttribution};
9use crate::{
10 db::{
11 DbSession, EntityResponse, LoadQueryResult, PagedGroupedExecutionWithTrace,
12 PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
13 access::AccessStrategy,
14 commit::CommitSchemaFingerprint,
15 cursor::{
16 CursorPlanError, decode_optional_cursor_token, decode_optional_grouped_cursor_token,
17 },
18 diagnostics::ExecutionTrace,
19 executor::{
20 ExecutionFamily, GroupedCursorPage, LoadExecutor, PreparedExecutionPlan,
21 SharedPreparedExecutionPlan,
22 },
23 query::builder::{
24 PreparedFluentAggregateExplainStrategy, PreparedFluentProjectionStrategy,
25 },
26 query::explain::{
27 ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
28 },
29 query::{
30 intent::{CompiledQuery, PlannedQuery, StructuralQuery},
31 plan::{AccessPlannedQuery, QueryMode, VisibleIndexes},
32 },
33 },
34 error::InternalError,
35 model::entity::EntityModel,
36 traits::{CanisterKind, EntityKind, EntityValue, Path},
37};
38#[cfg(feature = "perf-attribution")]
39use candid::CandidType;
40use icydb_utils::Xxh3;
41#[cfg(feature = "perf-attribution")]
42use serde::Deserialize;
43use std::{cell::RefCell, collections::HashMap, hash::BuildHasherDefault};
44
45type CacheBuildHasher = BuildHasherDefault<Xxh3>;
46
47const SHARED_QUERY_PLAN_CACHE_METHOD_VERSION: u8 = 1;
50
51#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
52pub(in crate::db) enum QueryPlanVisibility {
53 StoreNotReady,
54 StoreReady,
55}
56
57#[derive(Clone, Debug, Eq, Hash, PartialEq)]
58pub(in crate::db) struct QueryPlanCacheKey {
59 cache_method_version: u8,
60 entity_path: &'static str,
61 schema_fingerprint: CommitSchemaFingerprint,
62 visibility: QueryPlanVisibility,
63 structural_query: crate::db::query::intent::StructuralQueryCacheKey,
64}
65
66#[derive(Clone, Debug)]
67pub(in crate::db) struct QueryPlanCacheEntry {
68 logical_plan: AccessPlannedQuery,
69 prepared_plan: SharedPreparedExecutionPlan,
70}
71
72impl QueryPlanCacheEntry {
73 #[must_use]
74 pub(in crate::db) const fn new(
75 logical_plan: AccessPlannedQuery,
76 prepared_plan: SharedPreparedExecutionPlan,
77 ) -> Self {
78 Self {
79 logical_plan,
80 prepared_plan,
81 }
82 }
83
84 #[must_use]
85 pub(in crate::db) const fn logical_plan(&self) -> &AccessPlannedQuery {
86 &self.logical_plan
87 }
88
89 #[must_use]
90 pub(in crate::db) fn typed_prepared_plan<E: EntityKind>(&self) -> PreparedExecutionPlan<E> {
91 self.prepared_plan.typed_clone::<E>()
92 }
93
94 #[must_use]
95 pub(in crate::db) const fn prepared_plan(&self) -> &SharedPreparedExecutionPlan {
96 &self.prepared_plan
97 }
98}
99
100pub(in crate::db) type QueryPlanCache =
101 HashMap<QueryPlanCacheKey, QueryPlanCacheEntry, CacheBuildHasher>;
102
103thread_local! {
104 static QUERY_PLAN_CACHES: RefCell<HashMap<usize, QueryPlanCache, CacheBuildHasher>> =
109 RefCell::new(HashMap::default());
110}
111
112#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
113pub(in crate::db) struct QueryPlanCacheAttribution {
114 pub hits: u64,
115 pub misses: u64,
116}
117
118impl QueryPlanCacheAttribution {
119 #[must_use]
120 const fn hit() -> Self {
121 Self { hits: 1, misses: 0 }
122 }
123
124 #[must_use]
125 const fn miss() -> Self {
126 Self { hits: 0, misses: 1 }
127 }
128}
129
130#[cfg(feature = "perf-attribution")]
137#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
138pub struct QueryExecutionAttribution {
139 pub compile_local_instructions: u64,
140 pub runtime_local_instructions: u64,
141 pub finalize_local_instructions: u64,
142 pub direct_data_row_scan_local_instructions: u64,
143 pub direct_data_row_key_stream_local_instructions: u64,
144 pub direct_data_row_row_read_local_instructions: u64,
145 pub direct_data_row_key_encode_local_instructions: u64,
146 pub direct_data_row_store_get_local_instructions: u64,
147 pub direct_data_row_order_window_local_instructions: u64,
148 pub direct_data_row_page_window_local_instructions: u64,
149 pub grouped_stream_local_instructions: u64,
150 pub grouped_fold_local_instructions: u64,
151 pub grouped_finalize_local_instructions: u64,
152 pub grouped_count_borrowed_hash_computations: u64,
153 pub grouped_count_bucket_candidate_checks: u64,
154 pub grouped_count_existing_group_hits: u64,
155 pub grouped_count_new_group_inserts: u64,
156 pub response_decode_local_instructions: u64,
157 pub execute_local_instructions: u64,
158 pub total_local_instructions: u64,
159 pub shared_query_plan_cache_hits: u64,
160 pub shared_query_plan_cache_misses: u64,
161}
162
163#[cfg(feature = "perf-attribution")]
164#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
165struct QueryExecutePhaseAttribution {
166 runtime_local_instructions: u64,
167 finalize_local_instructions: u64,
168 direct_data_row_scan_local_instructions: u64,
169 direct_data_row_key_stream_local_instructions: u64,
170 direct_data_row_row_read_local_instructions: u64,
171 direct_data_row_key_encode_local_instructions: u64,
172 direct_data_row_store_get_local_instructions: u64,
173 direct_data_row_order_window_local_instructions: u64,
174 direct_data_row_page_window_local_instructions: u64,
175 grouped_stream_local_instructions: u64,
176 grouped_fold_local_instructions: u64,
177 grouped_finalize_local_instructions: u64,
178 grouped_count_borrowed_hash_computations: u64,
179 grouped_count_bucket_candidate_checks: u64,
180 grouped_count_existing_group_hits: u64,
181 grouped_count_new_group_inserts: u64,
182}
183
184#[cfg(feature = "perf-attribution")]
185#[expect(
186 clippy::missing_const_for_fn,
187 reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
188)]
189fn read_query_local_instruction_counter() -> u64 {
190 #[cfg(target_arch = "wasm32")]
191 {
192 canic_cdk::api::performance_counter(1)
193 }
194
195 #[cfg(not(target_arch = "wasm32"))]
196 {
197 0
198 }
199}
200
201#[cfg(feature = "perf-attribution")]
202fn measure_query_stage<T, E>(run: impl FnOnce() -> Result<T, E>) -> (u64, Result<T, E>) {
203 let start = read_query_local_instruction_counter();
204 let result = run();
205 let delta = read_query_local_instruction_counter().saturating_sub(start);
206
207 (delta, result)
208}
209
210impl<C: CanisterKind> DbSession<C> {
211 #[cfg(feature = "perf-attribution")]
212 const fn empty_query_execute_phase_attribution() -> QueryExecutePhaseAttribution {
213 QueryExecutePhaseAttribution {
214 runtime_local_instructions: 0,
215 finalize_local_instructions: 0,
216 direct_data_row_scan_local_instructions: 0,
217 direct_data_row_key_stream_local_instructions: 0,
218 direct_data_row_row_read_local_instructions: 0,
219 direct_data_row_key_encode_local_instructions: 0,
220 direct_data_row_store_get_local_instructions: 0,
221 direct_data_row_order_window_local_instructions: 0,
222 direct_data_row_page_window_local_instructions: 0,
223 grouped_stream_local_instructions: 0,
224 grouped_fold_local_instructions: 0,
225 grouped_finalize_local_instructions: 0,
226 grouped_count_borrowed_hash_computations: 0,
227 grouped_count_bucket_candidate_checks: 0,
228 grouped_count_existing_group_hits: 0,
229 grouped_count_new_group_inserts: 0,
230 }
231 }
232
233 #[cfg(feature = "perf-attribution")]
234 const fn scalar_query_execute_phase_attribution(
235 phase: ScalarExecutePhaseAttribution,
236 ) -> QueryExecutePhaseAttribution {
237 QueryExecutePhaseAttribution {
238 runtime_local_instructions: phase.runtime_local_instructions,
239 finalize_local_instructions: phase.finalize_local_instructions,
240 direct_data_row_scan_local_instructions: phase.direct_data_row_scan_local_instructions,
241 direct_data_row_key_stream_local_instructions: phase
242 .direct_data_row_key_stream_local_instructions,
243 direct_data_row_row_read_local_instructions: phase
244 .direct_data_row_row_read_local_instructions,
245 direct_data_row_key_encode_local_instructions: phase
246 .direct_data_row_key_encode_local_instructions,
247 direct_data_row_store_get_local_instructions: phase
248 .direct_data_row_store_get_local_instructions,
249 direct_data_row_order_window_local_instructions: phase
250 .direct_data_row_order_window_local_instructions,
251 direct_data_row_page_window_local_instructions: phase
252 .direct_data_row_page_window_local_instructions,
253 grouped_stream_local_instructions: 0,
254 grouped_fold_local_instructions: 0,
255 grouped_finalize_local_instructions: 0,
256 grouped_count_borrowed_hash_computations: 0,
257 grouped_count_bucket_candidate_checks: 0,
258 grouped_count_existing_group_hits: 0,
259 grouped_count_new_group_inserts: 0,
260 }
261 }
262
263 #[cfg(feature = "perf-attribution")]
264 const fn grouped_query_execute_phase_attribution(
265 phase: GroupedExecutePhaseAttribution,
266 ) -> QueryExecutePhaseAttribution {
267 QueryExecutePhaseAttribution {
268 runtime_local_instructions: phase
269 .stream_local_instructions
270 .saturating_add(phase.fold_local_instructions),
271 finalize_local_instructions: phase.finalize_local_instructions,
272 direct_data_row_scan_local_instructions: 0,
273 direct_data_row_key_stream_local_instructions: 0,
274 direct_data_row_row_read_local_instructions: 0,
275 direct_data_row_key_encode_local_instructions: 0,
276 direct_data_row_store_get_local_instructions: 0,
277 direct_data_row_order_window_local_instructions: 0,
278 direct_data_row_page_window_local_instructions: 0,
279 grouped_stream_local_instructions: phase.stream_local_instructions,
280 grouped_fold_local_instructions: phase.fold_local_instructions,
281 grouped_finalize_local_instructions: phase.finalize_local_instructions,
282 grouped_count_borrowed_hash_computations: phase
283 .grouped_count_borrowed_hash_computations,
284 grouped_count_bucket_candidate_checks: phase.grouped_count_bucket_candidate_checks,
285 grouped_count_existing_group_hits: phase.grouped_count_existing_group_hits,
286 grouped_count_new_group_inserts: phase.grouped_count_new_group_inserts,
287 }
288 }
289
290 fn query_plan_cache_scope_id(&self) -> usize {
291 self.db.cache_scope_id()
292 }
293
294 fn with_query_plan_cache<R>(&self, f: impl FnOnce(&mut QueryPlanCache) -> R) -> R {
295 let scope_id = self.query_plan_cache_scope_id();
296
297 QUERY_PLAN_CACHES.with(|caches| {
298 let mut caches = caches.borrow_mut();
299 let cache = caches.entry(scope_id).or_default();
300
301 f(cache)
302 })
303 }
304
305 const fn visible_indexes_for_model(
306 model: &'static EntityModel,
307 visibility: QueryPlanVisibility,
308 ) -> VisibleIndexes<'static> {
309 match visibility {
310 QueryPlanVisibility::StoreReady => VisibleIndexes::planner_visible(model.indexes()),
311 QueryPlanVisibility::StoreNotReady => VisibleIndexes::none(),
312 }
313 }
314
315 #[cfg(test)]
316 pub(in crate::db) fn query_plan_cache_len(&self) -> usize {
317 self.with_query_plan_cache(|cache| cache.len())
318 }
319
320 #[cfg(test)]
321 pub(in crate::db) fn clear_query_plan_cache_for_tests(&self) {
322 self.with_query_plan_cache(QueryPlanCache::clear);
323 }
324
325 pub(in crate::db) fn query_plan_visibility_for_store_path(
326 &self,
327 store_path: &'static str,
328 ) -> Result<QueryPlanVisibility, QueryError> {
329 let store = self
330 .db
331 .recovered_store(store_path)
332 .map_err(QueryError::execute)?;
333 let visibility = if store.index_state() == crate::db::IndexState::Ready {
334 QueryPlanVisibility::StoreReady
335 } else {
336 QueryPlanVisibility::StoreNotReady
337 };
338
339 Ok(visibility)
340 }
341
342 pub(in crate::db) fn cached_query_plan_entry_for_authority(
343 &self,
344 authority: crate::db::executor::EntityAuthority,
345 schema_fingerprint: CommitSchemaFingerprint,
346 query: &StructuralQuery,
347 ) -> Result<(QueryPlanCacheEntry, QueryPlanCacheAttribution), QueryError> {
348 let visibility = self.query_plan_visibility_for_store_path(authority.store_path())?;
349 let visible_indexes = Self::visible_indexes_for_model(authority.model(), visibility);
350 let normalized_predicate = query.prepare_normalized_scalar_predicate()?;
351 let cache_key = QueryPlanCacheKey::for_authority_with_normalized_predicate(
352 authority,
353 schema_fingerprint,
354 visibility,
355 query,
356 normalized_predicate.as_ref(),
357 );
358
359 {
360 let cached = self.with_query_plan_cache(|cache| cache.get(&cache_key).cloned());
361 if let Some(entry) = cached {
362 return Ok((entry, QueryPlanCacheAttribution::hit()));
363 }
364 }
365
366 let plan = query.build_plan_with_visible_indexes_from_normalized_predicate(
367 &visible_indexes,
368 normalized_predicate,
369 )?;
370 let entry = QueryPlanCacheEntry::new(
371 plan.clone(),
372 SharedPreparedExecutionPlan::from_plan(authority, plan),
373 );
374 self.with_query_plan_cache(|cache| {
375 cache.insert(cache_key, entry.clone());
376 });
377
378 Ok((entry, QueryPlanCacheAttribution::miss()))
379 }
380
381 #[cfg(test)]
382 pub(in crate::db) fn query_plan_cache_key_for_tests(
383 authority: crate::db::executor::EntityAuthority,
384 schema_fingerprint: CommitSchemaFingerprint,
385 visibility: QueryPlanVisibility,
386 query: &StructuralQuery,
387 cache_method_version: u8,
388 ) -> QueryPlanCacheKey {
389 QueryPlanCacheKey::for_authority_with_method_version(
390 authority,
391 schema_fingerprint,
392 visibility,
393 query,
394 cache_method_version,
395 )
396 }
397
398 pub(in crate::db) fn cached_structural_plan_for_authority(
399 &self,
400 authority: crate::db::executor::EntityAuthority,
401 schema_fingerprint: CommitSchemaFingerprint,
402 query: &StructuralQuery,
403 ) -> Result<AccessPlannedQuery, QueryError> {
404 let (entry, _) =
405 self.cached_query_plan_entry_for_authority(authority, schema_fingerprint, query)?;
406
407 Ok(entry.logical_plan().clone())
408 }
409
410 fn with_query_visible_indexes<E, T>(
413 &self,
414 query: &Query<E>,
415 op: impl FnOnce(
416 &Query<E>,
417 &crate::db::query::plan::VisibleIndexes<'static>,
418 ) -> Result<T, QueryError>,
419 ) -> Result<T, QueryError>
420 where
421 E: EntityKind<Canister = C>,
422 {
423 let visibility = self.query_plan_visibility_for_store_path(E::Store::PATH)?;
424 let visible_indexes = Self::visible_indexes_for_model(E::MODEL, visibility);
425
426 op(query, &visible_indexes)
427 }
428
429 fn cached_structural_plan_for_entity<E>(
432 &self,
433 query: &StructuralQuery,
434 ) -> Result<AccessPlannedQuery, QueryError>
435 where
436 E: EntityKind<Canister = C>,
437 {
438 self.cached_structural_plan_for_authority(
439 crate::db::executor::EntityAuthority::for_type::<E>(),
440 crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
441 query,
442 )
443 }
444
445 fn map_cached_structural_plan_for_entity<E, T>(
449 &self,
450 query: &StructuralQuery,
451 map: impl FnOnce(AccessPlannedQuery) -> T,
452 ) -> Result<T, QueryError>
453 where
454 E: EntityKind<Canister = C>,
455 {
456 let plan = self.cached_structural_plan_for_entity::<E>(query)?;
457
458 Ok(map(plan))
459 }
460
461 pub(in crate::db::session) fn cached_prepared_query_plan_for_entity<E>(
462 &self,
463 query: &StructuralQuery,
464 ) -> Result<(PreparedExecutionPlan<E>, QueryPlanCacheAttribution), QueryError>
465 where
466 E: EntityKind<Canister = C>,
467 {
468 let (entry, attribution) = self.cached_query_plan_entry_for_authority(
469 crate::db::executor::EntityAuthority::for_type::<E>(),
470 crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
471 query,
472 )?;
473
474 Ok((entry.typed_prepared_plan::<E>(), attribution))
475 }
476
477 pub(in crate::db) fn compile_query_with_visible_indexes<E>(
480 &self,
481 query: &Query<E>,
482 ) -> Result<CompiledQuery<E>, QueryError>
483 where
484 E: EntityKind<Canister = C>,
485 {
486 self.map_cached_structural_plan_for_entity::<E, _>(
487 query.structural(),
488 Query::<E>::compiled_query_from_plan,
489 )
490 }
491
492 pub(in crate::db) fn planned_query_with_visible_indexes<E>(
495 &self,
496 query: &Query<E>,
497 ) -> Result<PlannedQuery<E>, QueryError>
498 where
499 E: EntityKind<Canister = C>,
500 {
501 self.map_cached_structural_plan_for_entity::<E, _>(
502 query.structural(),
503 Query::<E>::planned_query_from_plan,
504 )
505 }
506
507 pub(in crate::db) fn explain_query_with_visible_indexes<E>(
509 &self,
510 query: &Query<E>,
511 ) -> Result<ExplainPlan, QueryError>
512 where
513 E: EntityKind<Canister = C>,
514 {
515 self.with_query_visible_indexes(query, Query::<E>::explain_with_visible_indexes)
516 }
517
518 pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
521 &self,
522 query: &Query<E>,
523 ) -> Result<String, QueryError>
524 where
525 E: EntityKind<Canister = C>,
526 {
527 self.with_query_visible_indexes(query, Query::<E>::plan_hash_hex_with_visible_indexes)
528 }
529
530 pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
533 &self,
534 query: &Query<E>,
535 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
536 where
537 E: EntityValue + EntityKind<Canister = C>,
538 {
539 self.with_query_visible_indexes(query, Query::<E>::explain_execution_with_visible_indexes)
540 }
541
542 pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
545 &self,
546 query: &Query<E>,
547 ) -> Result<String, QueryError>
548 where
549 E: EntityValue + EntityKind<Canister = C>,
550 {
551 self.with_query_visible_indexes(
552 query,
553 Query::<E>::explain_execution_verbose_with_visible_indexes,
554 )
555 }
556
557 pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
560 &self,
561 query: &Query<E>,
562 strategy: &S,
563 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
564 where
565 E: EntityValue + EntityKind<Canister = C>,
566 S: PreparedFluentAggregateExplainStrategy,
567 {
568 self.with_query_visible_indexes(query, |query, visible_indexes| {
569 query
570 .explain_prepared_aggregate_terminal_with_visible_indexes(visible_indexes, strategy)
571 })
572 }
573
574 pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
577 &self,
578 query: &Query<E>,
579 target_field: &str,
580 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
581 where
582 E: EntityValue + EntityKind<Canister = C>,
583 {
584 self.with_query_visible_indexes(query, |query, visible_indexes| {
585 query.explain_bytes_by_with_visible_indexes(visible_indexes, target_field)
586 })
587 }
588
589 pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
592 &self,
593 query: &Query<E>,
594 strategy: &PreparedFluentProjectionStrategy,
595 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
596 where
597 E: EntityValue + EntityKind<Canister = C>,
598 {
599 self.with_query_visible_indexes(query, |query, visible_indexes| {
600 query.explain_prepared_projection_terminal_with_visible_indexes(
601 visible_indexes,
602 strategy,
603 )
604 })
605 }
606
607 fn ensure_scalar_paged_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
610 match family {
611 ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
612 CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
613 )),
614 ExecutionFamily::Ordered => Ok(()),
615 ExecutionFamily::Grouped => Err(QueryError::invariant(
616 "grouped queries execute via execute(), not page().execute()",
617 )),
618 }
619 }
620
621 fn ensure_grouped_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
624 match family {
625 ExecutionFamily::Grouped => Ok(()),
626 ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
627 "grouped execution requires grouped logical plans",
628 )),
629 }
630 }
631
632 fn finalize_grouped_execution_page(
636 page: GroupedCursorPage,
637 trace: Option<ExecutionTrace>,
638 ) -> Result<PagedGroupedExecutionWithTrace, QueryError> {
639 let next_cursor = page
640 .next_cursor
641 .map(|token| {
642 let Some(token) = token.as_grouped() else {
643 return Err(QueryError::grouped_paged_emitted_scalar_continuation());
644 };
645
646 token.encode().map_err(|err| {
647 QueryError::serialize_internal(format!(
648 "failed to serialize grouped continuation cursor: {err}"
649 ))
650 })
651 })
652 .transpose()?;
653
654 Ok(PagedGroupedExecutionWithTrace::new(
655 page.rows,
656 next_cursor,
657 trace,
658 ))
659 }
660
661 fn execute_grouped_query_result<E>(
665 &self,
666 query: &Query<E>,
667 cursor_token: Option<&str>,
668 ) -> Result<LoadQueryResult<E>, QueryError>
669 where
670 E: PersistedRow<Canister = C> + EntityValue,
671 {
672 self.execute_grouped(query, cursor_token)
673 .map(LoadQueryResult::grouped)
674 }
675
676 #[cfg(feature = "perf-attribution")]
680 fn execute_grouped_query_result_with_attribution<E>(
681 &self,
682 plan: PreparedExecutionPlan<E>,
683 ) -> Result<(LoadQueryResult<E>, QueryExecutePhaseAttribution, u64), QueryError>
684 where
685 E: PersistedRow<Canister = C> + EntityValue,
686 {
687 let (page, trace, phase_attribution) =
688 self.execute_grouped_plan_with_trace_with_phase_attribution(plan, None)?;
689 let grouped = Self::finalize_grouped_execution_page(page, trace)?;
690
691 Ok((
692 LoadQueryResult::grouped(grouped),
693 Self::grouped_query_execute_phase_attribution(phase_attribution),
694 0,
695 ))
696 }
697
698 #[cfg(feature = "perf-attribution")]
702 fn execute_scalar_query_result_with_attribution<E>(
703 &self,
704 mode: QueryMode,
705 plan: PreparedExecutionPlan<E>,
706 ) -> Result<(LoadQueryResult<E>, QueryExecutePhaseAttribution, u64), QueryError>
707 where
708 E: PersistedRow<Canister = C> + EntityValue,
709 {
710 match mode {
711 QueryMode::Load(_) => {
712 let (rows, phase_attribution, response_decode_local_instructions) = self
713 .load_executor::<E>()
714 .execute_with_phase_attribution(plan)
715 .map_err(QueryError::execute)?;
716
717 Ok((
718 LoadQueryResult::rows(rows),
719 Self::scalar_query_execute_phase_attribution(phase_attribution),
720 response_decode_local_instructions,
721 ))
722 }
723 QueryMode::Delete(_) => {
724 let result = self.execute_query_dyn(mode, plan)?;
725
726 Ok((
727 LoadQueryResult::rows(result),
728 Self::empty_query_execute_phase_attribution(),
729 0,
730 ))
731 }
732 }
733 }
734
735 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
737 where
738 E: PersistedRow<Canister = C> + EntityValue,
739 {
740 let mode = query.mode();
742 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
743
744 self.execute_query_dyn(mode, plan)
746 }
747
748 #[cfg(feature = "perf-attribution")]
751 #[doc(hidden)]
752 pub fn execute_query_result_with_attribution<E>(
753 &self,
754 query: &Query<E>,
755 ) -> Result<(LoadQueryResult<E>, QueryExecutionAttribution), QueryError>
756 where
757 E: PersistedRow<Canister = C> + EntityValue,
758 {
759 let (compile_local_instructions, plan_and_cache) = measure_query_stage(|| {
764 self.cached_prepared_query_plan_for_entity::<E>(query.structural())
765 });
766 let (plan, cache_attribution) = plan_and_cache?;
767
768 let (execute_local_instructions, result) = measure_query_stage(|| {
771 if query.has_grouping() {
772 self.execute_grouped_query_result_with_attribution(plan)
773 } else {
774 self.execute_scalar_query_result_with_attribution(query.mode(), plan)
775 }
776 });
777 let (result, execute_phase_attribution, response_decode_local_instructions) = result?;
778 let total_local_instructions =
779 compile_local_instructions.saturating_add(execute_local_instructions);
780
781 Ok((
782 result,
783 QueryExecutionAttribution {
784 compile_local_instructions,
785 runtime_local_instructions: execute_phase_attribution.runtime_local_instructions,
786 finalize_local_instructions: execute_phase_attribution.finalize_local_instructions,
787 direct_data_row_scan_local_instructions: execute_phase_attribution
788 .direct_data_row_scan_local_instructions,
789 direct_data_row_key_stream_local_instructions: execute_phase_attribution
790 .direct_data_row_key_stream_local_instructions,
791 direct_data_row_row_read_local_instructions: execute_phase_attribution
792 .direct_data_row_row_read_local_instructions,
793 direct_data_row_key_encode_local_instructions: execute_phase_attribution
794 .direct_data_row_key_encode_local_instructions,
795 direct_data_row_store_get_local_instructions: execute_phase_attribution
796 .direct_data_row_store_get_local_instructions,
797 direct_data_row_order_window_local_instructions: execute_phase_attribution
798 .direct_data_row_order_window_local_instructions,
799 direct_data_row_page_window_local_instructions: execute_phase_attribution
800 .direct_data_row_page_window_local_instructions,
801 grouped_stream_local_instructions: execute_phase_attribution
802 .grouped_stream_local_instructions,
803 grouped_fold_local_instructions: execute_phase_attribution
804 .grouped_fold_local_instructions,
805 grouped_finalize_local_instructions: execute_phase_attribution
806 .grouped_finalize_local_instructions,
807 grouped_count_borrowed_hash_computations: execute_phase_attribution
808 .grouped_count_borrowed_hash_computations,
809 grouped_count_bucket_candidate_checks: execute_phase_attribution
810 .grouped_count_bucket_candidate_checks,
811 grouped_count_existing_group_hits: execute_phase_attribution
812 .grouped_count_existing_group_hits,
813 grouped_count_new_group_inserts: execute_phase_attribution
814 .grouped_count_new_group_inserts,
815 response_decode_local_instructions,
816 execute_local_instructions,
817 total_local_instructions,
818 shared_query_plan_cache_hits: cache_attribution.hits,
819 shared_query_plan_cache_misses: cache_attribution.misses,
820 },
821 ))
822 }
823
824 #[doc(hidden)]
827 pub fn execute_query_result<E>(
828 &self,
829 query: &Query<E>,
830 ) -> Result<LoadQueryResult<E>, QueryError>
831 where
832 E: PersistedRow<Canister = C> + EntityValue,
833 {
834 if query.has_grouping() {
835 return self.execute_grouped_query_result(query, None);
836 }
837
838 self.execute_query(query).map(LoadQueryResult::rows)
839 }
840
841 #[doc(hidden)]
843 pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
844 where
845 E: PersistedRow<Canister = C> + EntityValue,
846 {
847 if !query.mode().is_delete() {
849 return Err(QueryError::unsupported_query(
850 "delete count execution requires delete query mode",
851 ));
852 }
853
854 let plan = self
856 .compile_query_with_visible_indexes(query)?
857 .into_prepared_execution_plan();
858
859 self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
861 .map_err(QueryError::execute)
862 }
863
864 pub(in crate::db) fn execute_query_dyn<E>(
869 &self,
870 mode: QueryMode,
871 plan: PreparedExecutionPlan<E>,
872 ) -> Result<EntityResponse<E>, QueryError>
873 where
874 E: PersistedRow<Canister = C> + EntityValue,
875 {
876 let result = match mode {
877 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
878 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
879 };
880
881 result.map_err(QueryError::execute)
882 }
883
884 pub(in crate::db) fn execute_load_query_with<E, T>(
887 &self,
888 query: &Query<E>,
889 op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
890 ) -> Result<T, QueryError>
891 where
892 E: PersistedRow<Canister = C> + EntityValue,
893 {
894 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
895
896 self.with_metrics(|| op(self.load_executor::<E>(), plan))
897 .map_err(QueryError::execute)
898 }
899
900 pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
905 where
906 E: EntityKind<Canister = C>,
907 {
908 let compiled = self.compile_query_with_visible_indexes(query)?;
909 let explain = compiled.explain();
910 let plan_hash = compiled.plan_hash_hex();
911
912 let (executable, _) =
913 self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
914 let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
915 let execution_family = match query.mode() {
916 QueryMode::Load(_) => Some(executable.execution_family().map_err(QueryError::execute)?),
917 QueryMode::Delete(_) => None,
918 };
919
920 Ok(QueryTracePlan::new(
921 plan_hash,
922 access_strategy,
923 execution_family,
924 explain,
925 ))
926 }
927
928 pub(crate) fn execute_load_query_paged_with_trace<E>(
930 &self,
931 query: &Query<E>,
932 cursor_token: Option<&str>,
933 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
934 where
935 E: PersistedRow<Canister = C> + EntityValue,
936 {
937 let plan = self
939 .cached_prepared_query_plan_for_entity::<E>(query.structural())?
940 .0;
941 Self::ensure_scalar_paged_execution_family(
942 plan.execution_family().map_err(QueryError::execute)?,
943 )?;
944
945 let cursor_bytes = decode_optional_cursor_token(cursor_token)
947 .map_err(QueryError::from_cursor_plan_error)?;
948 let cursor = plan
949 .prepare_cursor(cursor_bytes.as_deref())
950 .map_err(QueryError::from_executor_plan_error)?;
951
952 let (page, trace) = self
954 .with_metrics(|| {
955 self.load_executor::<E>()
956 .execute_paged_with_cursor_traced(plan, cursor)
957 })
958 .map_err(QueryError::execute)?;
959 let next_cursor = page
960 .next_cursor
961 .map(|token| {
962 let Some(token) = token.as_scalar() else {
963 return Err(QueryError::scalar_paged_emitted_grouped_continuation());
964 };
965
966 token.encode().map_err(|err| {
967 QueryError::serialize_internal(format!(
968 "failed to serialize continuation cursor: {err}"
969 ))
970 })
971 })
972 .transpose()?;
973
974 Ok(PagedLoadExecutionWithTrace::new(
975 page.items,
976 next_cursor,
977 trace,
978 ))
979 }
980
981 pub(in crate::db) fn execute_grouped<E>(
986 &self,
987 query: &Query<E>,
988 cursor_token: Option<&str>,
989 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
990 where
991 E: PersistedRow<Canister = C> + EntityValue,
992 {
993 let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
994 Self::finalize_grouped_execution_page(page, trace)
995 }
996
997 fn execute_grouped_page_with_trace<E>(
1000 &self,
1001 query: &Query<E>,
1002 cursor_token: Option<&str>,
1003 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
1004 where
1005 E: PersistedRow<Canister = C> + EntityValue,
1006 {
1007 let plan = self
1009 .cached_prepared_query_plan_for_entity::<E>(query.structural())?
1010 .0;
1011
1012 self.execute_grouped_plan_with_trace(plan, cursor_token)
1014 }
1015
1016 fn execute_grouped_plan_with_trace<E>(
1018 &self,
1019 plan: PreparedExecutionPlan<E>,
1020 cursor_token: Option<&str>,
1021 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
1022 where
1023 E: PersistedRow<Canister = C> + EntityValue,
1024 {
1025 Self::ensure_grouped_execution_family(
1027 plan.execution_family().map_err(QueryError::execute)?,
1028 )?;
1029
1030 let cursor = decode_optional_grouped_cursor_token(cursor_token)
1032 .map_err(QueryError::from_cursor_plan_error)?;
1033 let cursor = plan
1034 .prepare_grouped_cursor_token(cursor)
1035 .map_err(QueryError::from_executor_plan_error)?;
1036
1037 self.with_metrics(|| {
1040 self.load_executor::<E>()
1041 .execute_grouped_paged_with_cursor_traced(plan, cursor)
1042 })
1043 .map_err(QueryError::execute)
1044 }
1045
1046 #[cfg(feature = "perf-attribution")]
1047 fn execute_grouped_plan_with_trace_with_phase_attribution<E>(
1048 &self,
1049 plan: PreparedExecutionPlan<E>,
1050 cursor_token: Option<&str>,
1051 ) -> Result<
1052 (
1053 GroupedCursorPage,
1054 Option<ExecutionTrace>,
1055 GroupedExecutePhaseAttribution,
1056 ),
1057 QueryError,
1058 >
1059 where
1060 E: PersistedRow<Canister = C> + EntityValue,
1061 {
1062 Self::ensure_grouped_execution_family(
1063 plan.execution_family().map_err(QueryError::execute)?,
1064 )?;
1065
1066 let cursor = decode_optional_grouped_cursor_token(cursor_token)
1067 .map_err(QueryError::from_cursor_plan_error)?;
1068 let cursor = plan
1069 .prepare_grouped_cursor_token(cursor)
1070 .map_err(QueryError::from_executor_plan_error)?;
1071
1072 self.with_metrics(|| {
1073 self.load_executor::<E>()
1074 .execute_grouped_paged_with_cursor_traced_with_phase_attribution(plan, cursor)
1075 })
1076 .map_err(QueryError::execute)
1077 }
1078}
1079
1080impl QueryPlanCacheKey {
1081 fn for_authority_with_normalized_predicate(
1082 authority: crate::db::executor::EntityAuthority,
1083 schema_fingerprint: CommitSchemaFingerprint,
1084 visibility: QueryPlanVisibility,
1085 query: &StructuralQuery,
1086 normalized_predicate: Option<&crate::db::predicate::Predicate>,
1087 ) -> Self {
1088 Self::for_authority_with_normalized_predicate_and_method_version(
1089 authority,
1090 schema_fingerprint,
1091 visibility,
1092 query,
1093 normalized_predicate,
1094 SHARED_QUERY_PLAN_CACHE_METHOD_VERSION,
1095 )
1096 }
1097
1098 const fn from_authority_parts(
1102 authority: crate::db::executor::EntityAuthority,
1103 schema_fingerprint: CommitSchemaFingerprint,
1104 visibility: QueryPlanVisibility,
1105 structural_query: crate::db::query::intent::StructuralQueryCacheKey,
1106 cache_method_version: u8,
1107 ) -> Self {
1108 Self {
1109 cache_method_version,
1110 entity_path: authority.entity_path(),
1111 schema_fingerprint,
1112 visibility,
1113 structural_query,
1114 }
1115 }
1116
1117 #[cfg(test)]
1118 fn for_authority_with_method_version(
1119 authority: crate::db::executor::EntityAuthority,
1120 schema_fingerprint: CommitSchemaFingerprint,
1121 visibility: QueryPlanVisibility,
1122 query: &StructuralQuery,
1123 cache_method_version: u8,
1124 ) -> Self {
1125 Self::from_authority_parts(
1126 authority,
1127 schema_fingerprint,
1128 visibility,
1129 query.structural_cache_key(),
1130 cache_method_version,
1131 )
1132 }
1133
1134 fn for_authority_with_normalized_predicate_and_method_version(
1135 authority: crate::db::executor::EntityAuthority,
1136 schema_fingerprint: CommitSchemaFingerprint,
1137 visibility: QueryPlanVisibility,
1138 query: &StructuralQuery,
1139 normalized_predicate: Option<&crate::db::predicate::Predicate>,
1140 cache_method_version: u8,
1141 ) -> Self {
1142 Self::from_authority_parts(
1143 authority,
1144 schema_fingerprint,
1145 visibility,
1146 query.structural_cache_key_with_normalized_predicate(normalized_predicate),
1147 cache_method_version,
1148 )
1149 }
1150}