1#[cfg(feature = "diagnostics")]
8use crate::db::executor::{
9 GroupedCountAttribution, GroupedExecutePhaseAttribution, ScalarExecutePhaseAttribution,
10};
11use crate::{
12 db::{
13 DbSession, EntityResponse, LoadQueryResult, PagedGroupedExecutionWithTrace,
14 PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
15 access::AccessStrategy,
16 commit::CommitSchemaFingerprint,
17 cursor::{
18 CursorPlanError, decode_optional_cursor_token, decode_optional_grouped_cursor_token,
19 },
20 diagnostics::ExecutionTrace,
21 executor::{
22 ExecutionFamily, GroupedCursorPage, LoadExecutor, PreparedExecutionPlan,
23 SharedPreparedExecutionPlan,
24 },
25 query::builder::{
26 PreparedFluentAggregateExplainStrategy, PreparedFluentProjectionStrategy,
27 },
28 query::explain::{
29 ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
30 },
31 query::{
32 intent::{CompiledQuery, PlannedQuery, StructuralQuery},
33 plan::{AccessPlannedQuery, QueryMode, VisibleIndexes},
34 },
35 },
36 error::InternalError,
37 model::entity::EntityModel,
38 traits::{CanisterKind, EntityKind, EntityValue, Path},
39};
40#[cfg(feature = "diagnostics")]
41use candid::CandidType;
42use icydb_utils::Xxh3;
43#[cfg(feature = "diagnostics")]
44use serde::Deserialize;
45use std::{cell::RefCell, collections::HashMap, hash::BuildHasherDefault};
46
47type CacheBuildHasher = BuildHasherDefault<Xxh3>;
48
49const SHARED_QUERY_PLAN_CACHE_METHOD_VERSION: u8 = 1;
52
53#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
54pub(in crate::db) enum QueryPlanVisibility {
55 StoreNotReady,
56 StoreReady,
57}
58
59#[derive(Clone, Debug, Eq, Hash, PartialEq)]
60pub(in crate::db) struct QueryPlanCacheKey {
61 cache_method_version: u8,
62 entity_path: &'static str,
63 schema_fingerprint: CommitSchemaFingerprint,
64 visibility: QueryPlanVisibility,
65 structural_query: crate::db::query::intent::StructuralQueryCacheKey,
66}
67
68#[derive(Clone, Debug)]
69pub(in crate::db) struct QueryPlanCacheEntry {
70 logical_plan: AccessPlannedQuery,
71 prepared_plan: SharedPreparedExecutionPlan,
72}
73
74impl QueryPlanCacheEntry {
75 #[must_use]
76 pub(in crate::db) const fn new(
77 logical_plan: AccessPlannedQuery,
78 prepared_plan: SharedPreparedExecutionPlan,
79 ) -> Self {
80 Self {
81 logical_plan,
82 prepared_plan,
83 }
84 }
85
86 #[must_use]
87 pub(in crate::db) const fn logical_plan(&self) -> &AccessPlannedQuery {
88 &self.logical_plan
89 }
90
91 #[must_use]
92 pub(in crate::db) fn typed_prepared_plan<E: EntityKind>(&self) -> PreparedExecutionPlan<E> {
93 self.prepared_plan.typed_clone::<E>()
94 }
95
96 #[must_use]
97 pub(in crate::db) const fn prepared_plan(&self) -> &SharedPreparedExecutionPlan {
98 &self.prepared_plan
99 }
100}
101
102pub(in crate::db) type QueryPlanCache =
103 HashMap<QueryPlanCacheKey, QueryPlanCacheEntry, CacheBuildHasher>;
104
105thread_local! {
106 static QUERY_PLAN_CACHES: RefCell<HashMap<usize, QueryPlanCache, CacheBuildHasher>> =
111 RefCell::new(HashMap::default());
112}
113
114#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
115pub(in crate::db) struct QueryPlanCacheAttribution {
116 pub hits: u64,
117 pub misses: u64,
118}
119
120impl QueryPlanCacheAttribution {
121 #[must_use]
122 const fn hit() -> Self {
123 Self { hits: 1, misses: 0 }
124 }
125
126 #[must_use]
127 const fn miss() -> Self {
128 Self { hits: 0, misses: 1 }
129 }
130}
131
132#[cfg(feature = "diagnostics")]
139#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
140pub struct QueryExecutionAttribution {
141 pub compile_local_instructions: u64,
142 pub runtime_local_instructions: u64,
143 pub finalize_local_instructions: u64,
144 pub direct_data_row_scan_local_instructions: u64,
145 pub direct_data_row_key_stream_local_instructions: u64,
146 pub direct_data_row_row_read_local_instructions: u64,
147 pub direct_data_row_key_encode_local_instructions: u64,
148 pub direct_data_row_store_get_local_instructions: u64,
149 pub direct_data_row_order_window_local_instructions: u64,
150 pub direct_data_row_page_window_local_instructions: u64,
151 pub grouped_stream_local_instructions: u64,
152 pub grouped_fold_local_instructions: u64,
153 pub grouped_finalize_local_instructions: u64,
154 pub grouped_count_borrowed_hash_computations: u64,
155 pub grouped_count_bucket_candidate_checks: u64,
156 pub grouped_count_existing_group_hits: u64,
157 pub grouped_count_new_group_inserts: u64,
158 pub grouped_count_row_materialization_local_instructions: u64,
159 pub grouped_count_group_lookup_local_instructions: u64,
160 pub grouped_count_existing_group_update_local_instructions: u64,
161 pub grouped_count_new_group_insert_local_instructions: u64,
162 pub response_decode_local_instructions: u64,
163 pub execute_local_instructions: u64,
164 pub total_local_instructions: u64,
165 pub shared_query_plan_cache_hits: u64,
166 pub shared_query_plan_cache_misses: u64,
167}
168
169#[cfg(feature = "diagnostics")]
170#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
171struct QueryExecutePhaseAttribution {
172 runtime_local_instructions: u64,
173 finalize_local_instructions: u64,
174 direct_data_row_scan_local_instructions: u64,
175 direct_data_row_key_stream_local_instructions: u64,
176 direct_data_row_row_read_local_instructions: u64,
177 direct_data_row_key_encode_local_instructions: u64,
178 direct_data_row_store_get_local_instructions: u64,
179 direct_data_row_order_window_local_instructions: u64,
180 direct_data_row_page_window_local_instructions: u64,
181 grouped_stream_local_instructions: u64,
182 grouped_fold_local_instructions: u64,
183 grouped_finalize_local_instructions: u64,
184 grouped_count: GroupedCountAttribution,
185}
186
187#[cfg(feature = "diagnostics")]
188#[expect(
189 clippy::missing_const_for_fn,
190 reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
191)]
192fn read_query_local_instruction_counter() -> u64 {
193 #[cfg(target_arch = "wasm32")]
194 {
195 canic_cdk::api::performance_counter(1)
196 }
197
198 #[cfg(not(target_arch = "wasm32"))]
199 {
200 0
201 }
202}
203
204#[cfg(feature = "diagnostics")]
205fn measure_query_stage<T, E>(run: impl FnOnce() -> Result<T, E>) -> (u64, Result<T, E>) {
206 let start = read_query_local_instruction_counter();
207 let result = run();
208 let delta = read_query_local_instruction_counter().saturating_sub(start);
209
210 (delta, result)
211}
212
213impl<C: CanisterKind> DbSession<C> {
214 #[cfg(feature = "diagnostics")]
215 const fn empty_query_execute_phase_attribution() -> QueryExecutePhaseAttribution {
216 QueryExecutePhaseAttribution {
217 runtime_local_instructions: 0,
218 finalize_local_instructions: 0,
219 direct_data_row_scan_local_instructions: 0,
220 direct_data_row_key_stream_local_instructions: 0,
221 direct_data_row_row_read_local_instructions: 0,
222 direct_data_row_key_encode_local_instructions: 0,
223 direct_data_row_store_get_local_instructions: 0,
224 direct_data_row_order_window_local_instructions: 0,
225 direct_data_row_page_window_local_instructions: 0,
226 grouped_stream_local_instructions: 0,
227 grouped_fold_local_instructions: 0,
228 grouped_finalize_local_instructions: 0,
229 grouped_count: GroupedCountAttribution::none(),
230 }
231 }
232
233 #[cfg(feature = "diagnostics")]
234 const fn scalar_query_execute_phase_attribution(
235 phase: ScalarExecutePhaseAttribution,
236 ) -> QueryExecutePhaseAttribution {
237 QueryExecutePhaseAttribution {
238 runtime_local_instructions: phase.runtime_local_instructions,
239 finalize_local_instructions: phase.finalize_local_instructions,
240 direct_data_row_scan_local_instructions: phase.direct_data_row_scan_local_instructions,
241 direct_data_row_key_stream_local_instructions: phase
242 .direct_data_row_key_stream_local_instructions,
243 direct_data_row_row_read_local_instructions: phase
244 .direct_data_row_row_read_local_instructions,
245 direct_data_row_key_encode_local_instructions: phase
246 .direct_data_row_key_encode_local_instructions,
247 direct_data_row_store_get_local_instructions: phase
248 .direct_data_row_store_get_local_instructions,
249 direct_data_row_order_window_local_instructions: phase
250 .direct_data_row_order_window_local_instructions,
251 direct_data_row_page_window_local_instructions: phase
252 .direct_data_row_page_window_local_instructions,
253 grouped_stream_local_instructions: 0,
254 grouped_fold_local_instructions: 0,
255 grouped_finalize_local_instructions: 0,
256 grouped_count: GroupedCountAttribution::none(),
257 }
258 }
259
260 #[cfg(feature = "diagnostics")]
261 const fn grouped_query_execute_phase_attribution(
262 phase: GroupedExecutePhaseAttribution,
263 ) -> QueryExecutePhaseAttribution {
264 QueryExecutePhaseAttribution {
265 runtime_local_instructions: phase
266 .stream_local_instructions
267 .saturating_add(phase.fold_local_instructions),
268 finalize_local_instructions: phase.finalize_local_instructions,
269 direct_data_row_scan_local_instructions: 0,
270 direct_data_row_key_stream_local_instructions: 0,
271 direct_data_row_row_read_local_instructions: 0,
272 direct_data_row_key_encode_local_instructions: 0,
273 direct_data_row_store_get_local_instructions: 0,
274 direct_data_row_order_window_local_instructions: 0,
275 direct_data_row_page_window_local_instructions: 0,
276 grouped_stream_local_instructions: phase.stream_local_instructions,
277 grouped_fold_local_instructions: phase.fold_local_instructions,
278 grouped_finalize_local_instructions: phase.finalize_local_instructions,
279 grouped_count: phase.grouped_count,
280 }
281 }
282
283 fn query_plan_cache_scope_id(&self) -> usize {
284 self.db.cache_scope_id()
285 }
286
287 fn with_query_plan_cache<R>(&self, f: impl FnOnce(&mut QueryPlanCache) -> R) -> R {
288 let scope_id = self.query_plan_cache_scope_id();
289
290 QUERY_PLAN_CACHES.with(|caches| {
291 let mut caches = caches.borrow_mut();
292 let cache = caches.entry(scope_id).or_default();
293
294 f(cache)
295 })
296 }
297
298 const fn visible_indexes_for_model(
299 model: &'static EntityModel,
300 visibility: QueryPlanVisibility,
301 ) -> VisibleIndexes<'static> {
302 match visibility {
303 QueryPlanVisibility::StoreReady => VisibleIndexes::planner_visible(model.indexes()),
304 QueryPlanVisibility::StoreNotReady => VisibleIndexes::none(),
305 }
306 }
307
308 #[cfg(test)]
309 pub(in crate::db) fn query_plan_cache_len(&self) -> usize {
310 self.with_query_plan_cache(|cache| cache.len())
311 }
312
313 #[cfg(test)]
314 pub(in crate::db) fn clear_query_plan_cache_for_tests(&self) {
315 self.with_query_plan_cache(QueryPlanCache::clear);
316 }
317
318 pub(in crate::db) fn query_plan_visibility_for_store_path(
319 &self,
320 store_path: &'static str,
321 ) -> Result<QueryPlanVisibility, QueryError> {
322 let store = self
323 .db
324 .recovered_store(store_path)
325 .map_err(QueryError::execute)?;
326 let visibility = if store.index_state() == crate::db::IndexState::Ready {
327 QueryPlanVisibility::StoreReady
328 } else {
329 QueryPlanVisibility::StoreNotReady
330 };
331
332 Ok(visibility)
333 }
334
335 pub(in crate::db) fn cached_query_plan_entry_for_authority(
336 &self,
337 authority: crate::db::executor::EntityAuthority,
338 schema_fingerprint: CommitSchemaFingerprint,
339 query: &StructuralQuery,
340 ) -> Result<(QueryPlanCacheEntry, QueryPlanCacheAttribution), QueryError> {
341 let visibility = self.query_plan_visibility_for_store_path(authority.store_path())?;
342 let visible_indexes = Self::visible_indexes_for_model(authority.model(), visibility);
343 let normalized_predicate = query.prepare_normalized_scalar_predicate()?;
344 let cache_key = QueryPlanCacheKey::for_authority_with_normalized_predicate(
345 authority,
346 schema_fingerprint,
347 visibility,
348 query,
349 normalized_predicate.as_ref(),
350 );
351
352 {
353 let cached = self.with_query_plan_cache(|cache| cache.get(&cache_key).cloned());
354 if let Some(entry) = cached {
355 return Ok((entry, QueryPlanCacheAttribution::hit()));
356 }
357 }
358
359 let plan = query.build_plan_with_visible_indexes_from_normalized_predicate(
360 &visible_indexes,
361 normalized_predicate,
362 )?;
363 let entry = QueryPlanCacheEntry::new(
364 plan.clone(),
365 SharedPreparedExecutionPlan::from_plan(authority, plan),
366 );
367 self.with_query_plan_cache(|cache| {
368 cache.insert(cache_key, entry.clone());
369 });
370
371 Ok((entry, QueryPlanCacheAttribution::miss()))
372 }
373
374 #[cfg(test)]
375 pub(in crate::db) fn query_plan_cache_key_for_tests(
376 authority: crate::db::executor::EntityAuthority,
377 schema_fingerprint: CommitSchemaFingerprint,
378 visibility: QueryPlanVisibility,
379 query: &StructuralQuery,
380 cache_method_version: u8,
381 ) -> QueryPlanCacheKey {
382 QueryPlanCacheKey::for_authority_with_method_version(
383 authority,
384 schema_fingerprint,
385 visibility,
386 query,
387 cache_method_version,
388 )
389 }
390
391 pub(in crate::db) fn cached_structural_plan_for_authority(
392 &self,
393 authority: crate::db::executor::EntityAuthority,
394 schema_fingerprint: CommitSchemaFingerprint,
395 query: &StructuralQuery,
396 ) -> Result<AccessPlannedQuery, QueryError> {
397 let (entry, _) =
398 self.cached_query_plan_entry_for_authority(authority, schema_fingerprint, query)?;
399
400 Ok(entry.logical_plan().clone())
401 }
402
403 fn with_query_visible_indexes<E, T>(
406 &self,
407 query: &Query<E>,
408 op: impl FnOnce(
409 &Query<E>,
410 &crate::db::query::plan::VisibleIndexes<'static>,
411 ) -> Result<T, QueryError>,
412 ) -> Result<T, QueryError>
413 where
414 E: EntityKind<Canister = C>,
415 {
416 let visibility = self.query_plan_visibility_for_store_path(E::Store::PATH)?;
417 let visible_indexes = Self::visible_indexes_for_model(E::MODEL, visibility);
418
419 op(query, &visible_indexes)
420 }
421
422 fn cached_structural_plan_for_entity<E>(
425 &self,
426 query: &StructuralQuery,
427 ) -> Result<AccessPlannedQuery, QueryError>
428 where
429 E: EntityKind<Canister = C>,
430 {
431 self.cached_structural_plan_for_authority(
432 crate::db::executor::EntityAuthority::for_type::<E>(),
433 crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
434 query,
435 )
436 }
437
438 fn map_cached_structural_plan_for_entity<E, T>(
442 &self,
443 query: &StructuralQuery,
444 map: impl FnOnce(AccessPlannedQuery) -> T,
445 ) -> Result<T, QueryError>
446 where
447 E: EntityKind<Canister = C>,
448 {
449 let plan = self.cached_structural_plan_for_entity::<E>(query)?;
450
451 Ok(map(plan))
452 }
453
454 pub(in crate::db::session) fn cached_prepared_query_plan_for_entity<E>(
455 &self,
456 query: &StructuralQuery,
457 ) -> Result<(PreparedExecutionPlan<E>, QueryPlanCacheAttribution), QueryError>
458 where
459 E: EntityKind<Canister = C>,
460 {
461 let (entry, attribution) = self.cached_query_plan_entry_for_authority(
462 crate::db::executor::EntityAuthority::for_type::<E>(),
463 crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
464 query,
465 )?;
466
467 Ok((entry.typed_prepared_plan::<E>(), attribution))
468 }
469
470 pub(in crate::db) fn compile_query_with_visible_indexes<E>(
473 &self,
474 query: &Query<E>,
475 ) -> Result<CompiledQuery<E>, QueryError>
476 where
477 E: EntityKind<Canister = C>,
478 {
479 self.map_cached_structural_plan_for_entity::<E, _>(
480 query.structural(),
481 Query::<E>::compiled_query_from_plan,
482 )
483 }
484
485 pub(in crate::db) fn planned_query_with_visible_indexes<E>(
488 &self,
489 query: &Query<E>,
490 ) -> Result<PlannedQuery<E>, QueryError>
491 where
492 E: EntityKind<Canister = C>,
493 {
494 self.map_cached_structural_plan_for_entity::<E, _>(
495 query.structural(),
496 Query::<E>::planned_query_from_plan,
497 )
498 }
499
500 pub(in crate::db) fn explain_query_with_visible_indexes<E>(
502 &self,
503 query: &Query<E>,
504 ) -> Result<ExplainPlan, QueryError>
505 where
506 E: EntityKind<Canister = C>,
507 {
508 self.with_query_visible_indexes(query, Query::<E>::explain_with_visible_indexes)
509 }
510
511 pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
514 &self,
515 query: &Query<E>,
516 ) -> Result<String, QueryError>
517 where
518 E: EntityKind<Canister = C>,
519 {
520 self.with_query_visible_indexes(query, Query::<E>::plan_hash_hex_with_visible_indexes)
521 }
522
523 pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
526 &self,
527 query: &Query<E>,
528 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
529 where
530 E: EntityValue + EntityKind<Canister = C>,
531 {
532 self.with_query_visible_indexes(query, Query::<E>::explain_execution_with_visible_indexes)
533 }
534
535 pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
538 &self,
539 query: &Query<E>,
540 ) -> Result<String, QueryError>
541 where
542 E: EntityValue + EntityKind<Canister = C>,
543 {
544 self.with_query_visible_indexes(
545 query,
546 Query::<E>::explain_execution_verbose_with_visible_indexes,
547 )
548 }
549
550 pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
553 &self,
554 query: &Query<E>,
555 strategy: &S,
556 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
557 where
558 E: EntityValue + EntityKind<Canister = C>,
559 S: PreparedFluentAggregateExplainStrategy,
560 {
561 self.with_query_visible_indexes(query, |query, visible_indexes| {
562 query
563 .explain_prepared_aggregate_terminal_with_visible_indexes(visible_indexes, strategy)
564 })
565 }
566
567 pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
570 &self,
571 query: &Query<E>,
572 target_field: &str,
573 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
574 where
575 E: EntityValue + EntityKind<Canister = C>,
576 {
577 self.with_query_visible_indexes(query, |query, visible_indexes| {
578 query.explain_bytes_by_with_visible_indexes(visible_indexes, target_field)
579 })
580 }
581
582 pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
585 &self,
586 query: &Query<E>,
587 strategy: &PreparedFluentProjectionStrategy,
588 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
589 where
590 E: EntityValue + EntityKind<Canister = C>,
591 {
592 self.with_query_visible_indexes(query, |query, visible_indexes| {
593 query.explain_prepared_projection_terminal_with_visible_indexes(
594 visible_indexes,
595 strategy,
596 )
597 })
598 }
599
600 fn ensure_scalar_paged_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
603 match family {
604 ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
605 CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
606 )),
607 ExecutionFamily::Ordered => Ok(()),
608 ExecutionFamily::Grouped => Err(QueryError::invariant(
609 "grouped queries execute via execute(), not page().execute()",
610 )),
611 }
612 }
613
614 fn ensure_grouped_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
617 match family {
618 ExecutionFamily::Grouped => Ok(()),
619 ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
620 "grouped execution requires grouped logical plans",
621 )),
622 }
623 }
624
625 fn finalize_grouped_execution_page(
629 page: GroupedCursorPage,
630 trace: Option<ExecutionTrace>,
631 ) -> Result<PagedGroupedExecutionWithTrace, QueryError> {
632 let next_cursor = page
633 .next_cursor
634 .map(|token| {
635 let Some(token) = token.as_grouped() else {
636 return Err(QueryError::grouped_paged_emitted_scalar_continuation());
637 };
638
639 token.encode().map_err(|err| {
640 QueryError::serialize_internal(format!(
641 "failed to serialize grouped continuation cursor: {err}"
642 ))
643 })
644 })
645 .transpose()?;
646
647 Ok(PagedGroupedExecutionWithTrace::new(
648 page.rows,
649 next_cursor,
650 trace,
651 ))
652 }
653
654 fn execute_grouped_query_result<E>(
658 &self,
659 query: &Query<E>,
660 cursor_token: Option<&str>,
661 ) -> Result<LoadQueryResult<E>, QueryError>
662 where
663 E: PersistedRow<Canister = C> + EntityValue,
664 {
665 self.execute_grouped(query, cursor_token)
666 .map(LoadQueryResult::grouped)
667 }
668
669 #[cfg(feature = "diagnostics")]
673 fn execute_grouped_query_result_with_attribution<E>(
674 &self,
675 plan: PreparedExecutionPlan<E>,
676 ) -> Result<(LoadQueryResult<E>, QueryExecutePhaseAttribution, u64), QueryError>
677 where
678 E: PersistedRow<Canister = C> + EntityValue,
679 {
680 let (page, trace, phase_attribution) =
681 self.execute_grouped_plan_with_trace_with_phase_attribution(plan, None)?;
682 let grouped = Self::finalize_grouped_execution_page(page, trace)?;
683
684 Ok((
685 LoadQueryResult::grouped(grouped),
686 Self::grouped_query_execute_phase_attribution(phase_attribution),
687 0,
688 ))
689 }
690
691 #[cfg(feature = "diagnostics")]
695 fn execute_scalar_query_result_with_attribution<E>(
696 &self,
697 mode: QueryMode,
698 plan: PreparedExecutionPlan<E>,
699 ) -> Result<(LoadQueryResult<E>, QueryExecutePhaseAttribution, u64), QueryError>
700 where
701 E: PersistedRow<Canister = C> + EntityValue,
702 {
703 match mode {
704 QueryMode::Load(_) => {
705 let (rows, phase_attribution, response_decode_local_instructions) = self
706 .load_executor::<E>()
707 .execute_with_phase_attribution(plan)
708 .map_err(QueryError::execute)?;
709
710 Ok((
711 LoadQueryResult::rows(rows),
712 Self::scalar_query_execute_phase_attribution(phase_attribution),
713 response_decode_local_instructions,
714 ))
715 }
716 QueryMode::Delete(_) => {
717 let result = self.execute_query_dyn(mode, plan)?;
718
719 Ok((
720 LoadQueryResult::rows(result),
721 Self::empty_query_execute_phase_attribution(),
722 0,
723 ))
724 }
725 }
726 }
727
728 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
730 where
731 E: PersistedRow<Canister = C> + EntityValue,
732 {
733 let mode = query.mode();
735 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
736
737 self.execute_query_dyn(mode, plan)
739 }
740
741 #[cfg(feature = "diagnostics")]
744 #[doc(hidden)]
745 pub fn execute_query_result_with_attribution<E>(
746 &self,
747 query: &Query<E>,
748 ) -> Result<(LoadQueryResult<E>, QueryExecutionAttribution), QueryError>
749 where
750 E: PersistedRow<Canister = C> + EntityValue,
751 {
752 let (compile_local_instructions, plan_and_cache) = measure_query_stage(|| {
757 self.cached_prepared_query_plan_for_entity::<E>(query.structural())
758 });
759 let (plan, cache_attribution) = plan_and_cache?;
760
761 let (execute_local_instructions, result) = measure_query_stage(|| {
764 if query.has_grouping() {
765 self.execute_grouped_query_result_with_attribution(plan)
766 } else {
767 self.execute_scalar_query_result_with_attribution(query.mode(), plan)
768 }
769 });
770 let (result, execute_phase_attribution, response_decode_local_instructions) = result?;
771 let total_local_instructions =
772 compile_local_instructions.saturating_add(execute_local_instructions);
773
774 Ok((
775 result,
776 QueryExecutionAttribution {
777 compile_local_instructions,
778 runtime_local_instructions: execute_phase_attribution.runtime_local_instructions,
779 finalize_local_instructions: execute_phase_attribution.finalize_local_instructions,
780 direct_data_row_scan_local_instructions: execute_phase_attribution
781 .direct_data_row_scan_local_instructions,
782 direct_data_row_key_stream_local_instructions: execute_phase_attribution
783 .direct_data_row_key_stream_local_instructions,
784 direct_data_row_row_read_local_instructions: execute_phase_attribution
785 .direct_data_row_row_read_local_instructions,
786 direct_data_row_key_encode_local_instructions: execute_phase_attribution
787 .direct_data_row_key_encode_local_instructions,
788 direct_data_row_store_get_local_instructions: execute_phase_attribution
789 .direct_data_row_store_get_local_instructions,
790 direct_data_row_order_window_local_instructions: execute_phase_attribution
791 .direct_data_row_order_window_local_instructions,
792 direct_data_row_page_window_local_instructions: execute_phase_attribution
793 .direct_data_row_page_window_local_instructions,
794 grouped_stream_local_instructions: execute_phase_attribution
795 .grouped_stream_local_instructions,
796 grouped_fold_local_instructions: execute_phase_attribution
797 .grouped_fold_local_instructions,
798 grouped_finalize_local_instructions: execute_phase_attribution
799 .grouped_finalize_local_instructions,
800 grouped_count_borrowed_hash_computations: execute_phase_attribution
801 .grouped_count
802 .borrowed_hash_computations,
803 grouped_count_bucket_candidate_checks: execute_phase_attribution
804 .grouped_count
805 .bucket_candidate_checks,
806 grouped_count_existing_group_hits: execute_phase_attribution
807 .grouped_count
808 .existing_group_hits,
809 grouped_count_new_group_inserts: execute_phase_attribution
810 .grouped_count
811 .new_group_inserts,
812 grouped_count_row_materialization_local_instructions: execute_phase_attribution
813 .grouped_count
814 .row_materialization_local_instructions,
815 grouped_count_group_lookup_local_instructions: execute_phase_attribution
816 .grouped_count
817 .group_lookup_local_instructions,
818 grouped_count_existing_group_update_local_instructions: execute_phase_attribution
819 .grouped_count
820 .existing_group_update_local_instructions,
821 grouped_count_new_group_insert_local_instructions: execute_phase_attribution
822 .grouped_count
823 .new_group_insert_local_instructions,
824 response_decode_local_instructions,
825 execute_local_instructions,
826 total_local_instructions,
827 shared_query_plan_cache_hits: cache_attribution.hits,
828 shared_query_plan_cache_misses: cache_attribution.misses,
829 },
830 ))
831 }
832
833 #[doc(hidden)]
836 pub fn execute_query_result<E>(
837 &self,
838 query: &Query<E>,
839 ) -> Result<LoadQueryResult<E>, QueryError>
840 where
841 E: PersistedRow<Canister = C> + EntityValue,
842 {
843 if query.has_grouping() {
844 return self.execute_grouped_query_result(query, None);
845 }
846
847 self.execute_query(query).map(LoadQueryResult::rows)
848 }
849
850 #[doc(hidden)]
852 pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
853 where
854 E: PersistedRow<Canister = C> + EntityValue,
855 {
856 if !query.mode().is_delete() {
858 return Err(QueryError::unsupported_query(
859 "delete count execution requires delete query mode",
860 ));
861 }
862
863 let plan = self
865 .compile_query_with_visible_indexes(query)?
866 .into_prepared_execution_plan();
867
868 self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
870 .map_err(QueryError::execute)
871 }
872
873 pub(in crate::db) fn execute_query_dyn<E>(
878 &self,
879 mode: QueryMode,
880 plan: PreparedExecutionPlan<E>,
881 ) -> Result<EntityResponse<E>, QueryError>
882 where
883 E: PersistedRow<Canister = C> + EntityValue,
884 {
885 let result = match mode {
886 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
887 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
888 };
889
890 result.map_err(QueryError::execute)
891 }
892
893 pub(in crate::db) fn execute_load_query_with<E, T>(
896 &self,
897 query: &Query<E>,
898 op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
899 ) -> Result<T, QueryError>
900 where
901 E: PersistedRow<Canister = C> + EntityValue,
902 {
903 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
904
905 self.with_metrics(|| op(self.load_executor::<E>(), plan))
906 .map_err(QueryError::execute)
907 }
908
909 pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
914 where
915 E: EntityKind<Canister = C>,
916 {
917 let compiled = self.compile_query_with_visible_indexes(query)?;
918 let explain = compiled.explain();
919 let plan_hash = compiled.plan_hash_hex();
920
921 let (executable, _) =
922 self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
923 let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
924 let execution_family = match query.mode() {
925 QueryMode::Load(_) => Some(executable.execution_family().map_err(QueryError::execute)?),
926 QueryMode::Delete(_) => None,
927 };
928
929 Ok(QueryTracePlan::new(
930 plan_hash,
931 access_strategy,
932 execution_family,
933 explain,
934 ))
935 }
936
937 pub(crate) fn execute_load_query_paged_with_trace<E>(
939 &self,
940 query: &Query<E>,
941 cursor_token: Option<&str>,
942 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
943 where
944 E: PersistedRow<Canister = C> + EntityValue,
945 {
946 let plan = self
948 .cached_prepared_query_plan_for_entity::<E>(query.structural())?
949 .0;
950 Self::ensure_scalar_paged_execution_family(
951 plan.execution_family().map_err(QueryError::execute)?,
952 )?;
953
954 let cursor_bytes = decode_optional_cursor_token(cursor_token)
956 .map_err(QueryError::from_cursor_plan_error)?;
957 let cursor = plan
958 .prepare_cursor(cursor_bytes.as_deref())
959 .map_err(QueryError::from_executor_plan_error)?;
960
961 let (page, trace) = self
963 .with_metrics(|| {
964 self.load_executor::<E>()
965 .execute_paged_with_cursor_traced(plan, cursor)
966 })
967 .map_err(QueryError::execute)?;
968 let next_cursor = page
969 .next_cursor
970 .map(|token| {
971 let Some(token) = token.as_scalar() else {
972 return Err(QueryError::scalar_paged_emitted_grouped_continuation());
973 };
974
975 token.encode().map_err(|err| {
976 QueryError::serialize_internal(format!(
977 "failed to serialize continuation cursor: {err}"
978 ))
979 })
980 })
981 .transpose()?;
982
983 Ok(PagedLoadExecutionWithTrace::new(
984 page.items,
985 next_cursor,
986 trace,
987 ))
988 }
989
990 pub(in crate::db) fn execute_grouped<E>(
995 &self,
996 query: &Query<E>,
997 cursor_token: Option<&str>,
998 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
999 where
1000 E: PersistedRow<Canister = C> + EntityValue,
1001 {
1002 let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
1003 Self::finalize_grouped_execution_page(page, trace)
1004 }
1005
1006 fn execute_grouped_page_with_trace<E>(
1009 &self,
1010 query: &Query<E>,
1011 cursor_token: Option<&str>,
1012 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
1013 where
1014 E: PersistedRow<Canister = C> + EntityValue,
1015 {
1016 let plan = self
1018 .cached_prepared_query_plan_for_entity::<E>(query.structural())?
1019 .0;
1020
1021 self.execute_grouped_plan_with_trace(plan, cursor_token)
1023 }
1024
1025 fn execute_grouped_plan_with_trace<E>(
1027 &self,
1028 plan: PreparedExecutionPlan<E>,
1029 cursor_token: Option<&str>,
1030 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
1031 where
1032 E: PersistedRow<Canister = C> + EntityValue,
1033 {
1034 Self::ensure_grouped_execution_family(
1036 plan.execution_family().map_err(QueryError::execute)?,
1037 )?;
1038
1039 let cursor = decode_optional_grouped_cursor_token(cursor_token)
1041 .map_err(QueryError::from_cursor_plan_error)?;
1042 let cursor = plan
1043 .prepare_grouped_cursor_token(cursor)
1044 .map_err(QueryError::from_executor_plan_error)?;
1045
1046 self.with_metrics(|| {
1049 self.load_executor::<E>()
1050 .execute_grouped_paged_with_cursor_traced(plan, cursor)
1051 })
1052 .map_err(QueryError::execute)
1053 }
1054
1055 #[cfg(feature = "diagnostics")]
1056 fn execute_grouped_plan_with_trace_with_phase_attribution<E>(
1057 &self,
1058 plan: PreparedExecutionPlan<E>,
1059 cursor_token: Option<&str>,
1060 ) -> Result<
1061 (
1062 GroupedCursorPage,
1063 Option<ExecutionTrace>,
1064 GroupedExecutePhaseAttribution,
1065 ),
1066 QueryError,
1067 >
1068 where
1069 E: PersistedRow<Canister = C> + EntityValue,
1070 {
1071 Self::ensure_grouped_execution_family(
1072 plan.execution_family().map_err(QueryError::execute)?,
1073 )?;
1074
1075 let cursor = decode_optional_grouped_cursor_token(cursor_token)
1076 .map_err(QueryError::from_cursor_plan_error)?;
1077 let cursor = plan
1078 .prepare_grouped_cursor_token(cursor)
1079 .map_err(QueryError::from_executor_plan_error)?;
1080
1081 self.with_metrics(|| {
1082 self.load_executor::<E>()
1083 .execute_grouped_paged_with_cursor_traced_with_phase_attribution(plan, cursor)
1084 })
1085 .map_err(QueryError::execute)
1086 }
1087}
1088
1089impl QueryPlanCacheKey {
1090 fn for_authority_with_normalized_predicate(
1091 authority: crate::db::executor::EntityAuthority,
1092 schema_fingerprint: CommitSchemaFingerprint,
1093 visibility: QueryPlanVisibility,
1094 query: &StructuralQuery,
1095 normalized_predicate: Option<&crate::db::predicate::Predicate>,
1096 ) -> Self {
1097 Self::for_authority_with_normalized_predicate_and_method_version(
1098 authority,
1099 schema_fingerprint,
1100 visibility,
1101 query,
1102 normalized_predicate,
1103 SHARED_QUERY_PLAN_CACHE_METHOD_VERSION,
1104 )
1105 }
1106
1107 const fn from_authority_parts(
1111 authority: crate::db::executor::EntityAuthority,
1112 schema_fingerprint: CommitSchemaFingerprint,
1113 visibility: QueryPlanVisibility,
1114 structural_query: crate::db::query::intent::StructuralQueryCacheKey,
1115 cache_method_version: u8,
1116 ) -> Self {
1117 Self {
1118 cache_method_version,
1119 entity_path: authority.entity_path(),
1120 schema_fingerprint,
1121 visibility,
1122 structural_query,
1123 }
1124 }
1125
1126 #[cfg(test)]
1127 fn for_authority_with_method_version(
1128 authority: crate::db::executor::EntityAuthority,
1129 schema_fingerprint: CommitSchemaFingerprint,
1130 visibility: QueryPlanVisibility,
1131 query: &StructuralQuery,
1132 cache_method_version: u8,
1133 ) -> Self {
1134 Self::from_authority_parts(
1135 authority,
1136 schema_fingerprint,
1137 visibility,
1138 query.structural_cache_key(),
1139 cache_method_version,
1140 )
1141 }
1142
1143 fn for_authority_with_normalized_predicate_and_method_version(
1144 authority: crate::db::executor::EntityAuthority,
1145 schema_fingerprint: CommitSchemaFingerprint,
1146 visibility: QueryPlanVisibility,
1147 query: &StructuralQuery,
1148 normalized_predicate: Option<&crate::db::predicate::Predicate>,
1149 cache_method_version: u8,
1150 ) -> Self {
1151 Self::from_authority_parts(
1152 authority,
1153 schema_fingerprint,
1154 visibility,
1155 query.structural_cache_key_with_normalized_predicate(normalized_predicate),
1156 cache_method_version,
1157 )
1158 }
1159}