1#[cfg(feature = "diagnostics")]
8use crate::db::executor::{
9 GroupedCountAttribution, GroupedExecutePhaseAttribution, ScalarExecutePhaseAttribution,
10};
11use crate::{
12 db::{
13 DbSession, EntityResponse, LoadQueryResult, PagedGroupedExecutionWithTrace,
14 PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
15 access::AccessStrategy,
16 commit::CommitSchemaFingerprint,
17 cursor::{
18 CursorPlanError, decode_optional_cursor_token, decode_optional_grouped_cursor_token,
19 },
20 diagnostics::ExecutionTrace,
21 executor::{
22 ExecutionFamily, GroupedCursorPage, LoadExecutor, PreparedExecutionPlan,
23 SharedPreparedExecutionPlan,
24 },
25 query::builder::{
26 PreparedFluentAggregateExplainStrategy, PreparedFluentProjectionStrategy,
27 },
28 query::explain::{
29 ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
30 },
31 query::{
32 intent::{CompiledQuery, PlannedQuery, StructuralQuery},
33 plan::{AccessPlannedQuery, QueryMode, VisibleIndexes},
34 },
35 },
36 error::InternalError,
37 model::entity::EntityModel,
38 traits::{CanisterKind, EntityKind, EntityValue, Path},
39};
40#[cfg(feature = "diagnostics")]
41use candid::CandidType;
42use icydb_utils::Xxh3;
43#[cfg(feature = "diagnostics")]
44use serde::Deserialize;
45use std::{cell::RefCell, collections::HashMap, hash::BuildHasherDefault};
46
47type CacheBuildHasher = BuildHasherDefault<Xxh3>;
48
49const SHARED_QUERY_PLAN_CACHE_METHOD_VERSION: u8 = 1;
52
53#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
54pub(in crate::db) enum QueryPlanVisibility {
55 StoreNotReady,
56 StoreReady,
57}
58
59#[derive(Clone, Debug, Eq, Hash, PartialEq)]
60pub(in crate::db) struct QueryPlanCacheKey {
61 cache_method_version: u8,
62 entity_path: &'static str,
63 schema_fingerprint: CommitSchemaFingerprint,
64 visibility: QueryPlanVisibility,
65 structural_query: crate::db::query::intent::StructuralQueryCacheKey,
66}
67
68#[derive(Clone, Debug)]
69pub(in crate::db) struct QueryPlanCacheEntry {
70 logical_plan: AccessPlannedQuery,
71 prepared_plan: SharedPreparedExecutionPlan,
72}
73
74impl QueryPlanCacheEntry {
75 #[must_use]
76 pub(in crate::db) const fn new(
77 logical_plan: AccessPlannedQuery,
78 prepared_plan: SharedPreparedExecutionPlan,
79 ) -> Self {
80 Self {
81 logical_plan,
82 prepared_plan,
83 }
84 }
85
86 #[must_use]
87 pub(in crate::db) const fn logical_plan(&self) -> &AccessPlannedQuery {
88 &self.logical_plan
89 }
90
91 #[must_use]
92 pub(in crate::db) fn typed_prepared_plan<E: EntityKind>(&self) -> PreparedExecutionPlan<E> {
93 self.prepared_plan.typed_clone::<E>()
94 }
95
96 #[must_use]
97 pub(in crate::db) const fn prepared_plan(&self) -> &SharedPreparedExecutionPlan {
98 &self.prepared_plan
99 }
100}
101
102pub(in crate::db) type QueryPlanCache =
103 HashMap<QueryPlanCacheKey, QueryPlanCacheEntry, CacheBuildHasher>;
104
105thread_local! {
106 static QUERY_PLAN_CACHES: RefCell<HashMap<usize, QueryPlanCache, CacheBuildHasher>> =
111 RefCell::new(HashMap::default());
112}
113
114#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
115pub(in crate::db) struct QueryPlanCacheAttribution {
116 pub hits: u64,
117 pub misses: u64,
118}
119
120impl QueryPlanCacheAttribution {
121 #[must_use]
122 const fn hit() -> Self {
123 Self { hits: 1, misses: 0 }
124 }
125
126 #[must_use]
127 const fn miss() -> Self {
128 Self { hits: 0, misses: 1 }
129 }
130}
131
132#[cfg(feature = "diagnostics")]
139#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
140pub struct QueryExecutionAttribution {
141 pub compile_local_instructions: u64,
142 pub runtime_local_instructions: u64,
143 pub finalize_local_instructions: u64,
144 pub direct_data_row_scan_local_instructions: u64,
145 pub direct_data_row_key_stream_local_instructions: u64,
146 pub direct_data_row_row_read_local_instructions: u64,
147 pub direct_data_row_key_encode_local_instructions: u64,
148 pub direct_data_row_store_get_local_instructions: u64,
149 pub direct_data_row_order_window_local_instructions: u64,
150 pub direct_data_row_page_window_local_instructions: u64,
151 pub grouped_stream_local_instructions: u64,
152 pub grouped_fold_local_instructions: u64,
153 pub grouped_finalize_local_instructions: u64,
154 pub grouped_count_borrowed_hash_computations: u64,
155 pub grouped_count_bucket_candidate_checks: u64,
156 pub grouped_count_existing_group_hits: u64,
157 pub grouped_count_new_group_inserts: u64,
158 pub grouped_count_row_materialization_local_instructions: u64,
159 pub grouped_count_group_lookup_local_instructions: u64,
160 pub grouped_count_existing_group_update_local_instructions: u64,
161 pub grouped_count_new_group_insert_local_instructions: u64,
162 pub response_decode_local_instructions: u64,
163 pub execute_local_instructions: u64,
164 pub total_local_instructions: u64,
165 pub shared_query_plan_cache_hits: u64,
166 pub shared_query_plan_cache_misses: u64,
167}
168
169#[cfg(feature = "diagnostics")]
170#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
171struct QueryExecutePhaseAttribution {
172 runtime_local_instructions: u64,
173 finalize_local_instructions: u64,
174 direct_data_row_scan_local_instructions: u64,
175 direct_data_row_key_stream_local_instructions: u64,
176 direct_data_row_row_read_local_instructions: u64,
177 direct_data_row_key_encode_local_instructions: u64,
178 direct_data_row_store_get_local_instructions: u64,
179 direct_data_row_order_window_local_instructions: u64,
180 direct_data_row_page_window_local_instructions: u64,
181 grouped_stream_local_instructions: u64,
182 grouped_fold_local_instructions: u64,
183 grouped_finalize_local_instructions: u64,
184 grouped_count: GroupedCountAttribution,
185}
186
187#[cfg(feature = "diagnostics")]
188#[expect(
189 clippy::missing_const_for_fn,
190 reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
191)]
192fn read_query_local_instruction_counter() -> u64 {
193 #[cfg(target_arch = "wasm32")]
194 {
195 canic_cdk::api::performance_counter(1)
196 }
197
198 #[cfg(not(target_arch = "wasm32"))]
199 {
200 0
201 }
202}
203
204#[cfg(feature = "diagnostics")]
205fn measure_query_stage<T, E>(run: impl FnOnce() -> Result<T, E>) -> (u64, Result<T, E>) {
206 let start = read_query_local_instruction_counter();
207 let result = run();
208 let delta = read_query_local_instruction_counter().saturating_sub(start);
209
210 (delta, result)
211}
212
213impl<C: CanisterKind> DbSession<C> {
214 #[cfg(feature = "diagnostics")]
215 const fn empty_query_execute_phase_attribution() -> QueryExecutePhaseAttribution {
216 QueryExecutePhaseAttribution {
217 runtime_local_instructions: 0,
218 finalize_local_instructions: 0,
219 direct_data_row_scan_local_instructions: 0,
220 direct_data_row_key_stream_local_instructions: 0,
221 direct_data_row_row_read_local_instructions: 0,
222 direct_data_row_key_encode_local_instructions: 0,
223 direct_data_row_store_get_local_instructions: 0,
224 direct_data_row_order_window_local_instructions: 0,
225 direct_data_row_page_window_local_instructions: 0,
226 grouped_stream_local_instructions: 0,
227 grouped_fold_local_instructions: 0,
228 grouped_finalize_local_instructions: 0,
229 grouped_count: GroupedCountAttribution::none(),
230 }
231 }
232
233 #[cfg(feature = "diagnostics")]
234 const fn scalar_query_execute_phase_attribution(
235 phase: ScalarExecutePhaseAttribution,
236 ) -> QueryExecutePhaseAttribution {
237 QueryExecutePhaseAttribution {
238 runtime_local_instructions: phase.runtime_local_instructions,
239 finalize_local_instructions: phase.finalize_local_instructions,
240 direct_data_row_scan_local_instructions: phase.direct_data_row_scan_local_instructions,
241 direct_data_row_key_stream_local_instructions: phase
242 .direct_data_row_key_stream_local_instructions,
243 direct_data_row_row_read_local_instructions: phase
244 .direct_data_row_row_read_local_instructions,
245 direct_data_row_key_encode_local_instructions: phase
246 .direct_data_row_key_encode_local_instructions,
247 direct_data_row_store_get_local_instructions: phase
248 .direct_data_row_store_get_local_instructions,
249 direct_data_row_order_window_local_instructions: phase
250 .direct_data_row_order_window_local_instructions,
251 direct_data_row_page_window_local_instructions: phase
252 .direct_data_row_page_window_local_instructions,
253 grouped_stream_local_instructions: 0,
254 grouped_fold_local_instructions: 0,
255 grouped_finalize_local_instructions: 0,
256 grouped_count: GroupedCountAttribution::none(),
257 }
258 }
259
260 #[cfg(feature = "diagnostics")]
261 const fn grouped_query_execute_phase_attribution(
262 phase: GroupedExecutePhaseAttribution,
263 ) -> QueryExecutePhaseAttribution {
264 QueryExecutePhaseAttribution {
265 runtime_local_instructions: phase
266 .stream_local_instructions
267 .saturating_add(phase.fold_local_instructions),
268 finalize_local_instructions: phase.finalize_local_instructions,
269 direct_data_row_scan_local_instructions: 0,
270 direct_data_row_key_stream_local_instructions: 0,
271 direct_data_row_row_read_local_instructions: 0,
272 direct_data_row_key_encode_local_instructions: 0,
273 direct_data_row_store_get_local_instructions: 0,
274 direct_data_row_order_window_local_instructions: 0,
275 direct_data_row_page_window_local_instructions: 0,
276 grouped_stream_local_instructions: phase.stream_local_instructions,
277 grouped_fold_local_instructions: phase.fold_local_instructions,
278 grouped_finalize_local_instructions: phase.finalize_local_instructions,
279 grouped_count: phase.grouped_count,
280 }
281 }
282
283 fn with_query_plan_cache<R>(&self, f: impl FnOnce(&mut QueryPlanCache) -> R) -> R {
284 let scope_id = self.db.cache_scope_id();
285
286 QUERY_PLAN_CACHES.with(|caches| {
287 let mut caches = caches.borrow_mut();
288 let cache = caches.entry(scope_id).or_default();
289
290 f(cache)
291 })
292 }
293
294 const fn visible_indexes_for_model(
295 model: &'static EntityModel,
296 visibility: QueryPlanVisibility,
297 ) -> VisibleIndexes<'static> {
298 match visibility {
299 QueryPlanVisibility::StoreReady => VisibleIndexes::planner_visible(model.indexes()),
300 QueryPlanVisibility::StoreNotReady => VisibleIndexes::none(),
301 }
302 }
303
304 #[cfg(test)]
305 pub(in crate::db) fn query_plan_cache_len(&self) -> usize {
306 self.with_query_plan_cache(|cache| cache.len())
307 }
308
309 #[cfg(test)]
310 pub(in crate::db) fn clear_query_plan_cache_for_tests(&self) {
311 self.with_query_plan_cache(QueryPlanCache::clear);
312 }
313
314 pub(in crate::db) fn query_plan_visibility_for_store_path(
315 &self,
316 store_path: &'static str,
317 ) -> Result<QueryPlanVisibility, QueryError> {
318 let store = self
319 .db
320 .recovered_store(store_path)
321 .map_err(QueryError::execute)?;
322 let visibility = if store.index_state() == crate::db::IndexState::Ready {
323 QueryPlanVisibility::StoreReady
324 } else {
325 QueryPlanVisibility::StoreNotReady
326 };
327
328 Ok(visibility)
329 }
330
331 pub(in crate::db) fn cached_query_plan_entry_for_authority(
332 &self,
333 authority: crate::db::executor::EntityAuthority,
334 schema_fingerprint: CommitSchemaFingerprint,
335 query: &StructuralQuery,
336 ) -> Result<(QueryPlanCacheEntry, QueryPlanCacheAttribution), QueryError> {
337 let visibility = self.query_plan_visibility_for_store_path(authority.store_path())?;
338 let visible_indexes = Self::visible_indexes_for_model(authority.model(), visibility);
339 let normalized_predicate = query.prepare_normalized_scalar_predicate()?;
340 let cache_key =
341 QueryPlanCacheKey::for_authority_with_normalized_predicate_and_method_version(
342 authority,
343 schema_fingerprint,
344 visibility,
345 query,
346 normalized_predicate.as_ref(),
347 SHARED_QUERY_PLAN_CACHE_METHOD_VERSION,
348 );
349
350 {
351 let cached = self.with_query_plan_cache(|cache| cache.get(&cache_key).cloned());
352 if let Some(entry) = cached {
353 return Ok((entry, QueryPlanCacheAttribution::hit()));
354 }
355 }
356
357 let plan = query.build_plan_with_visible_indexes_from_normalized_predicate(
358 &visible_indexes,
359 normalized_predicate,
360 )?;
361 let entry = QueryPlanCacheEntry::new(
362 plan.clone(),
363 SharedPreparedExecutionPlan::from_plan(authority, plan),
364 );
365 self.with_query_plan_cache(|cache| {
366 cache.insert(cache_key, entry.clone());
367 });
368
369 Ok((entry, QueryPlanCacheAttribution::miss()))
370 }
371
372 #[cfg(test)]
373 pub(in crate::db) fn query_plan_cache_key_for_tests(
374 authority: crate::db::executor::EntityAuthority,
375 schema_fingerprint: CommitSchemaFingerprint,
376 visibility: QueryPlanVisibility,
377 query: &StructuralQuery,
378 cache_method_version: u8,
379 ) -> QueryPlanCacheKey {
380 QueryPlanCacheKey::for_authority_with_method_version(
381 authority,
382 schema_fingerprint,
383 visibility,
384 query,
385 cache_method_version,
386 )
387 }
388
389 fn with_query_visible_indexes<E, T>(
392 &self,
393 query: &Query<E>,
394 op: impl FnOnce(
395 &Query<E>,
396 &crate::db::query::plan::VisibleIndexes<'static>,
397 ) -> Result<T, QueryError>,
398 ) -> Result<T, QueryError>
399 where
400 E: EntityKind<Canister = C>,
401 {
402 let visibility = self.query_plan_visibility_for_store_path(E::Store::PATH)?;
403 let visible_indexes = Self::visible_indexes_for_model(E::MODEL, visibility);
404
405 op(query, &visible_indexes)
406 }
407
408 pub(in crate::db::session) fn cached_prepared_query_plan_for_entity<E>(
409 &self,
410 query: &StructuralQuery,
411 ) -> Result<(PreparedExecutionPlan<E>, QueryPlanCacheAttribution), QueryError>
412 where
413 E: EntityKind<Canister = C>,
414 {
415 let (entry, attribution) = self.cached_query_plan_entry_for_authority(
416 crate::db::executor::EntityAuthority::for_type::<E>(),
417 crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
418 query,
419 )?;
420
421 Ok((entry.typed_prepared_plan::<E>(), attribution))
422 }
423
424 pub(in crate::db) fn compile_query_with_visible_indexes<E>(
427 &self,
428 query: &Query<E>,
429 ) -> Result<CompiledQuery<E>, QueryError>
430 where
431 E: EntityKind<Canister = C>,
432 {
433 let (entry, _) = self.cached_query_plan_entry_for_authority(
434 crate::db::executor::EntityAuthority::for_type::<E>(),
435 crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
436 query.structural(),
437 )?;
438
439 Ok(Query::<E>::compiled_query_from_plan(
440 entry.logical_plan().clone(),
441 ))
442 }
443
444 pub(in crate::db) fn planned_query_with_visible_indexes<E>(
447 &self,
448 query: &Query<E>,
449 ) -> Result<PlannedQuery<E>, QueryError>
450 where
451 E: EntityKind<Canister = C>,
452 {
453 let (entry, _) = self.cached_query_plan_entry_for_authority(
454 crate::db::executor::EntityAuthority::for_type::<E>(),
455 crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
456 query.structural(),
457 )?;
458
459 Ok(Query::<E>::planned_query_from_plan(
460 entry.logical_plan().clone(),
461 ))
462 }
463
464 pub(in crate::db) fn explain_query_with_visible_indexes<E>(
466 &self,
467 query: &Query<E>,
468 ) -> Result<ExplainPlan, QueryError>
469 where
470 E: EntityKind<Canister = C>,
471 {
472 self.with_query_visible_indexes(query, Query::<E>::explain_with_visible_indexes)
473 }
474
475 pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
478 &self,
479 query: &Query<E>,
480 ) -> Result<String, QueryError>
481 where
482 E: EntityKind<Canister = C>,
483 {
484 self.with_query_visible_indexes(query, Query::<E>::plan_hash_hex_with_visible_indexes)
485 }
486
487 pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
490 &self,
491 query: &Query<E>,
492 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
493 where
494 E: EntityValue + EntityKind<Canister = C>,
495 {
496 self.with_query_visible_indexes(query, Query::<E>::explain_execution_with_visible_indexes)
497 }
498
499 pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
502 &self,
503 query: &Query<E>,
504 ) -> Result<String, QueryError>
505 where
506 E: EntityValue + EntityKind<Canister = C>,
507 {
508 self.with_query_visible_indexes(
509 query,
510 Query::<E>::explain_execution_verbose_with_visible_indexes,
511 )
512 }
513
514 pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
517 &self,
518 query: &Query<E>,
519 strategy: &S,
520 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
521 where
522 E: EntityValue + EntityKind<Canister = C>,
523 S: PreparedFluentAggregateExplainStrategy,
524 {
525 self.with_query_visible_indexes(query, |query, visible_indexes| {
526 query
527 .explain_prepared_aggregate_terminal_with_visible_indexes(visible_indexes, strategy)
528 })
529 }
530
531 pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
534 &self,
535 query: &Query<E>,
536 target_field: &str,
537 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
538 where
539 E: EntityValue + EntityKind<Canister = C>,
540 {
541 self.with_query_visible_indexes(query, |query, visible_indexes| {
542 query.explain_bytes_by_with_visible_indexes(visible_indexes, target_field)
543 })
544 }
545
546 pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
549 &self,
550 query: &Query<E>,
551 strategy: &PreparedFluentProjectionStrategy,
552 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
553 where
554 E: EntityValue + EntityKind<Canister = C>,
555 {
556 self.with_query_visible_indexes(query, |query, visible_indexes| {
557 query.explain_prepared_projection_terminal_with_visible_indexes(
558 visible_indexes,
559 strategy,
560 )
561 })
562 }
563
564 fn ensure_scalar_paged_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
567 match family {
568 ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
569 CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
570 )),
571 ExecutionFamily::Ordered => Ok(()),
572 ExecutionFamily::Grouped => Err(QueryError::invariant(
573 "grouped queries execute via execute(), not page().execute()",
574 )),
575 }
576 }
577
578 fn ensure_grouped_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
581 match family {
582 ExecutionFamily::Grouped => Ok(()),
583 ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
584 "grouped execution requires grouped logical plans",
585 )),
586 }
587 }
588
589 fn finalize_grouped_execution_page(
593 page: GroupedCursorPage,
594 trace: Option<ExecutionTrace>,
595 ) -> Result<PagedGroupedExecutionWithTrace, QueryError> {
596 let next_cursor = page
597 .next_cursor
598 .map(|token| {
599 let Some(token) = token.as_grouped() else {
600 return Err(QueryError::grouped_paged_emitted_scalar_continuation());
601 };
602
603 token.encode().map_err(|err| {
604 QueryError::serialize_internal(format!(
605 "failed to serialize grouped continuation cursor: {err}"
606 ))
607 })
608 })
609 .transpose()?;
610
611 Ok(PagedGroupedExecutionWithTrace::new(
612 page.rows,
613 next_cursor,
614 trace,
615 ))
616 }
617
618 #[cfg(feature = "diagnostics")]
622 fn execute_grouped_query_result_with_attribution<E>(
623 &self,
624 plan: PreparedExecutionPlan<E>,
625 ) -> Result<(LoadQueryResult<E>, QueryExecutePhaseAttribution, u64), QueryError>
626 where
627 E: PersistedRow<Canister = C> + EntityValue,
628 {
629 let (page, trace, phase_attribution) =
630 self.execute_grouped_plan_with_trace_with_phase_attribution(plan, None)?;
631 let grouped = Self::finalize_grouped_execution_page(page, trace)?;
632
633 Ok((
634 LoadQueryResult::Grouped(grouped),
635 Self::grouped_query_execute_phase_attribution(phase_attribution),
636 0,
637 ))
638 }
639
640 #[cfg(feature = "diagnostics")]
644 fn execute_scalar_query_result_with_attribution<E>(
645 &self,
646 mode: QueryMode,
647 plan: PreparedExecutionPlan<E>,
648 ) -> Result<(LoadQueryResult<E>, QueryExecutePhaseAttribution, u64), QueryError>
649 where
650 E: PersistedRow<Canister = C> + EntityValue,
651 {
652 match mode {
653 QueryMode::Load(_) => {
654 let (rows, phase_attribution, response_decode_local_instructions) = self
655 .load_executor::<E>()
656 .execute_with_phase_attribution(plan)
657 .map_err(QueryError::execute)?;
658
659 Ok((
660 LoadQueryResult::Rows(rows),
661 Self::scalar_query_execute_phase_attribution(phase_attribution),
662 response_decode_local_instructions,
663 ))
664 }
665 QueryMode::Delete(_) => {
666 let result = self.execute_query_dyn(mode, plan)?;
667
668 Ok((
669 LoadQueryResult::Rows(result),
670 Self::empty_query_execute_phase_attribution(),
671 0,
672 ))
673 }
674 }
675 }
676
677 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
679 where
680 E: PersistedRow<Canister = C> + EntityValue,
681 {
682 let mode = query.mode();
684 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
685
686 self.execute_query_dyn(mode, plan)
688 }
689
690 #[cfg(feature = "diagnostics")]
693 #[doc(hidden)]
694 pub fn execute_query_result_with_attribution<E>(
695 &self,
696 query: &Query<E>,
697 ) -> Result<(LoadQueryResult<E>, QueryExecutionAttribution), QueryError>
698 where
699 E: PersistedRow<Canister = C> + EntityValue,
700 {
701 let (compile_local_instructions, plan_and_cache) = measure_query_stage(|| {
706 self.cached_prepared_query_plan_for_entity::<E>(query.structural())
707 });
708 let (plan, cache_attribution) = plan_and_cache?;
709
710 let (execute_local_instructions, result) = measure_query_stage(|| {
713 if query.has_grouping() {
714 self.execute_grouped_query_result_with_attribution(plan)
715 } else {
716 self.execute_scalar_query_result_with_attribution(query.mode(), plan)
717 }
718 });
719 let (result, execute_phase_attribution, response_decode_local_instructions) = result?;
720 let total_local_instructions =
721 compile_local_instructions.saturating_add(execute_local_instructions);
722
723 Ok((
724 result,
725 QueryExecutionAttribution {
726 compile_local_instructions,
727 runtime_local_instructions: execute_phase_attribution.runtime_local_instructions,
728 finalize_local_instructions: execute_phase_attribution.finalize_local_instructions,
729 direct_data_row_scan_local_instructions: execute_phase_attribution
730 .direct_data_row_scan_local_instructions,
731 direct_data_row_key_stream_local_instructions: execute_phase_attribution
732 .direct_data_row_key_stream_local_instructions,
733 direct_data_row_row_read_local_instructions: execute_phase_attribution
734 .direct_data_row_row_read_local_instructions,
735 direct_data_row_key_encode_local_instructions: execute_phase_attribution
736 .direct_data_row_key_encode_local_instructions,
737 direct_data_row_store_get_local_instructions: execute_phase_attribution
738 .direct_data_row_store_get_local_instructions,
739 direct_data_row_order_window_local_instructions: execute_phase_attribution
740 .direct_data_row_order_window_local_instructions,
741 direct_data_row_page_window_local_instructions: execute_phase_attribution
742 .direct_data_row_page_window_local_instructions,
743 grouped_stream_local_instructions: execute_phase_attribution
744 .grouped_stream_local_instructions,
745 grouped_fold_local_instructions: execute_phase_attribution
746 .grouped_fold_local_instructions,
747 grouped_finalize_local_instructions: execute_phase_attribution
748 .grouped_finalize_local_instructions,
749 grouped_count_borrowed_hash_computations: execute_phase_attribution
750 .grouped_count
751 .borrowed_hash_computations,
752 grouped_count_bucket_candidate_checks: execute_phase_attribution
753 .grouped_count
754 .bucket_candidate_checks,
755 grouped_count_existing_group_hits: execute_phase_attribution
756 .grouped_count
757 .existing_group_hits,
758 grouped_count_new_group_inserts: execute_phase_attribution
759 .grouped_count
760 .new_group_inserts,
761 grouped_count_row_materialization_local_instructions: execute_phase_attribution
762 .grouped_count
763 .row_materialization_local_instructions,
764 grouped_count_group_lookup_local_instructions: execute_phase_attribution
765 .grouped_count
766 .group_lookup_local_instructions,
767 grouped_count_existing_group_update_local_instructions: execute_phase_attribution
768 .grouped_count
769 .existing_group_update_local_instructions,
770 grouped_count_new_group_insert_local_instructions: execute_phase_attribution
771 .grouped_count
772 .new_group_insert_local_instructions,
773 response_decode_local_instructions,
774 execute_local_instructions,
775 total_local_instructions,
776 shared_query_plan_cache_hits: cache_attribution.hits,
777 shared_query_plan_cache_misses: cache_attribution.misses,
778 },
779 ))
780 }
781
782 #[doc(hidden)]
785 pub fn execute_query_result<E>(
786 &self,
787 query: &Query<E>,
788 ) -> Result<LoadQueryResult<E>, QueryError>
789 where
790 E: PersistedRow<Canister = C> + EntityValue,
791 {
792 if query.has_grouping() {
793 return self
794 .execute_grouped(query, None)
795 .map(LoadQueryResult::Grouped);
796 }
797
798 self.execute_query(query).map(LoadQueryResult::Rows)
799 }
800
801 #[doc(hidden)]
803 pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
804 where
805 E: PersistedRow<Canister = C> + EntityValue,
806 {
807 if !query.mode().is_delete() {
809 return Err(QueryError::unsupported_query(
810 "delete count execution requires delete query mode",
811 ));
812 }
813
814 let plan = self
816 .compile_query_with_visible_indexes(query)?
817 .into_prepared_execution_plan();
818
819 self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
821 .map_err(QueryError::execute)
822 }
823
824 pub(in crate::db) fn execute_query_dyn<E>(
829 &self,
830 mode: QueryMode,
831 plan: PreparedExecutionPlan<E>,
832 ) -> Result<EntityResponse<E>, QueryError>
833 where
834 E: PersistedRow<Canister = C> + EntityValue,
835 {
836 let result = match mode {
837 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
838 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
839 };
840
841 result.map_err(QueryError::execute)
842 }
843
844 pub(in crate::db) fn execute_load_query_with<E, T>(
847 &self,
848 query: &Query<E>,
849 op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
850 ) -> Result<T, QueryError>
851 where
852 E: PersistedRow<Canister = C> + EntityValue,
853 {
854 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
855
856 self.with_metrics(|| op(self.load_executor::<E>(), plan))
857 .map_err(QueryError::execute)
858 }
859
860 pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
865 where
866 E: EntityKind<Canister = C>,
867 {
868 let compiled = self.compile_query_with_visible_indexes(query)?;
869 let explain = compiled.explain();
870 let plan_hash = compiled.plan_hash_hex();
871
872 let (executable, _) =
873 self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
874 let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
875 let execution_family = match query.mode() {
876 QueryMode::Load(_) => Some(executable.execution_family().map_err(QueryError::execute)?),
877 QueryMode::Delete(_) => None,
878 };
879
880 Ok(QueryTracePlan::new(
881 plan_hash,
882 access_strategy,
883 execution_family,
884 explain,
885 ))
886 }
887
888 pub(crate) fn execute_load_query_paged_with_trace<E>(
890 &self,
891 query: &Query<E>,
892 cursor_token: Option<&str>,
893 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
894 where
895 E: PersistedRow<Canister = C> + EntityValue,
896 {
897 let plan = self
899 .cached_prepared_query_plan_for_entity::<E>(query.structural())?
900 .0;
901 Self::ensure_scalar_paged_execution_family(
902 plan.execution_family().map_err(QueryError::execute)?,
903 )?;
904
905 let cursor_bytes = decode_optional_cursor_token(cursor_token)
907 .map_err(QueryError::from_cursor_plan_error)?;
908 let cursor = plan
909 .prepare_cursor(cursor_bytes.as_deref())
910 .map_err(QueryError::from_executor_plan_error)?;
911
912 let (page, trace) = self
914 .with_metrics(|| {
915 self.load_executor::<E>()
916 .execute_paged_with_cursor_traced(plan, cursor)
917 })
918 .map_err(QueryError::execute)?;
919 let next_cursor = page
920 .next_cursor
921 .map(|token| {
922 let Some(token) = token.as_scalar() else {
923 return Err(QueryError::scalar_paged_emitted_grouped_continuation());
924 };
925
926 token.encode().map_err(|err| {
927 QueryError::serialize_internal(format!(
928 "failed to serialize continuation cursor: {err}"
929 ))
930 })
931 })
932 .transpose()?;
933
934 Ok(PagedLoadExecutionWithTrace::new(
935 page.items,
936 next_cursor,
937 trace,
938 ))
939 }
940
941 pub(in crate::db) fn execute_grouped<E>(
946 &self,
947 query: &Query<E>,
948 cursor_token: Option<&str>,
949 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
950 where
951 E: PersistedRow<Canister = C> + EntityValue,
952 {
953 let plan = self
955 .cached_prepared_query_plan_for_entity::<E>(query.structural())?
956 .0;
957
958 let (page, trace) = self.execute_grouped_plan_with_trace(plan, cursor_token)?;
961
962 Self::finalize_grouped_execution_page(page, trace)
963 }
964
965 fn execute_grouped_plan_with_trace<E>(
967 &self,
968 plan: PreparedExecutionPlan<E>,
969 cursor_token: Option<&str>,
970 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
971 where
972 E: PersistedRow<Canister = C> + EntityValue,
973 {
974 Self::ensure_grouped_execution_family(
976 plan.execution_family().map_err(QueryError::execute)?,
977 )?;
978
979 let cursor = decode_optional_grouped_cursor_token(cursor_token)
981 .map_err(QueryError::from_cursor_plan_error)?;
982 let cursor = plan
983 .prepare_grouped_cursor_token(cursor)
984 .map_err(QueryError::from_executor_plan_error)?;
985
986 self.with_metrics(|| {
989 self.load_executor::<E>()
990 .execute_grouped_paged_with_cursor_traced(plan, cursor)
991 })
992 .map_err(QueryError::execute)
993 }
994
995 #[cfg(feature = "diagnostics")]
996 fn execute_grouped_plan_with_trace_with_phase_attribution<E>(
997 &self,
998 plan: PreparedExecutionPlan<E>,
999 cursor_token: Option<&str>,
1000 ) -> Result<
1001 (
1002 GroupedCursorPage,
1003 Option<ExecutionTrace>,
1004 GroupedExecutePhaseAttribution,
1005 ),
1006 QueryError,
1007 >
1008 where
1009 E: PersistedRow<Canister = C> + EntityValue,
1010 {
1011 Self::ensure_grouped_execution_family(
1012 plan.execution_family().map_err(QueryError::execute)?,
1013 )?;
1014
1015 let cursor = decode_optional_grouped_cursor_token(cursor_token)
1016 .map_err(QueryError::from_cursor_plan_error)?;
1017 let cursor = plan
1018 .prepare_grouped_cursor_token(cursor)
1019 .map_err(QueryError::from_executor_plan_error)?;
1020
1021 self.with_metrics(|| {
1022 self.load_executor::<E>()
1023 .execute_grouped_paged_with_cursor_traced_with_phase_attribution(plan, cursor)
1024 })
1025 .map_err(QueryError::execute)
1026 }
1027}
1028
1029impl QueryPlanCacheKey {
1030 const fn from_authority_parts(
1034 authority: crate::db::executor::EntityAuthority,
1035 schema_fingerprint: CommitSchemaFingerprint,
1036 visibility: QueryPlanVisibility,
1037 structural_query: crate::db::query::intent::StructuralQueryCacheKey,
1038 cache_method_version: u8,
1039 ) -> Self {
1040 Self {
1041 cache_method_version,
1042 entity_path: authority.entity_path(),
1043 schema_fingerprint,
1044 visibility,
1045 structural_query,
1046 }
1047 }
1048
1049 #[cfg(test)]
1050 fn for_authority_with_method_version(
1051 authority: crate::db::executor::EntityAuthority,
1052 schema_fingerprint: CommitSchemaFingerprint,
1053 visibility: QueryPlanVisibility,
1054 query: &StructuralQuery,
1055 cache_method_version: u8,
1056 ) -> Self {
1057 Self::from_authority_parts(
1058 authority,
1059 schema_fingerprint,
1060 visibility,
1061 query.structural_cache_key(),
1062 cache_method_version,
1063 )
1064 }
1065
1066 fn for_authority_with_normalized_predicate_and_method_version(
1067 authority: crate::db::executor::EntityAuthority,
1068 schema_fingerprint: CommitSchemaFingerprint,
1069 visibility: QueryPlanVisibility,
1070 query: &StructuralQuery,
1071 normalized_predicate: Option<&crate::db::predicate::Predicate>,
1072 cache_method_version: u8,
1073 ) -> Self {
1074 Self::from_authority_parts(
1075 authority,
1076 schema_fingerprint,
1077 visibility,
1078 query.structural_cache_key_with_normalized_predicate(normalized_predicate),
1079 cache_method_version,
1080 )
1081 }
1082}