1#[cfg(feature = "diagnostics")]
8use crate::db::executor::{
9 GroupedCountAttribution, GroupedExecutePhaseAttribution, ScalarExecutePhaseAttribution,
10};
11use crate::{
12 db::{
13 DbSession, EntityResponse, LoadQueryResult, PagedGroupedExecutionWithTrace,
14 PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
15 access::AccessStrategy,
16 commit::CommitSchemaFingerprint,
17 cursor::{
18 CursorPlanError, decode_optional_cursor_token, decode_optional_grouped_cursor_token,
19 },
20 diagnostics::ExecutionTrace,
21 executor::{
22 ExecutionFamily, GroupedCursorPage, LoadExecutor, PreparedExecutionPlan,
23 SharedPreparedExecutionPlan,
24 },
25 predicate::predicate_fingerprint_normalized,
26 query::builder::{
27 PreparedFluentAggregateExplainStrategy, PreparedFluentProjectionStrategy,
28 },
29 query::explain::{
30 ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
31 },
32 query::{
33 intent::{CompiledQuery, PlannedQuery, StructuralQuery},
34 plan::{QueryMode, VisibleIndexes},
35 },
36 },
37 error::InternalError,
38 model::entity::EntityModel,
39 traits::{CanisterKind, EntityKind, EntityValue, Path},
40};
41#[cfg(feature = "diagnostics")]
42use candid::CandidType;
43use icydb_utils::Xxh3;
44#[cfg(feature = "diagnostics")]
45use serde::Deserialize;
46use std::{cell::RefCell, collections::HashMap, hash::BuildHasherDefault};
47
48type CacheBuildHasher = BuildHasherDefault<Xxh3>;
49
50const SHARED_QUERY_PLAN_CACHE_METHOD_VERSION: u8 = 1;
53
54#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
55pub(in crate::db) enum QueryPlanVisibility {
56 StoreNotReady,
57 StoreReady,
58}
59
60#[derive(Clone, Debug, Eq, Hash, PartialEq)]
61pub(in crate::db) struct QueryPlanCacheKey {
62 cache_method_version: u8,
63 entity_path: &'static str,
64 schema_fingerprint: CommitSchemaFingerprint,
65 visibility: QueryPlanVisibility,
66 structural_query: crate::db::query::intent::StructuralQueryCacheKey,
67}
68
69pub(in crate::db) type QueryPlanCache =
70 HashMap<QueryPlanCacheKey, SharedPreparedExecutionPlan, CacheBuildHasher>;
71
72thread_local! {
73 static QUERY_PLAN_CACHES: RefCell<HashMap<usize, QueryPlanCache, CacheBuildHasher>> =
78 RefCell::new(HashMap::default());
79}
80
81#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
82pub(in crate::db) struct QueryPlanCacheAttribution {
83 pub hits: u64,
84 pub misses: u64,
85}
86
87impl QueryPlanCacheAttribution {
88 #[must_use]
89 const fn hit() -> Self {
90 Self { hits: 1, misses: 0 }
91 }
92
93 #[must_use]
94 const fn miss() -> Self {
95 Self { hits: 0, misses: 1 }
96 }
97}
98
99#[cfg(feature = "diagnostics")]
106#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
107pub struct QueryExecutionAttribution {
108 pub compile_local_instructions: u64,
109 pub runtime_local_instructions: u64,
110 pub finalize_local_instructions: u64,
111 pub direct_data_row_scan_local_instructions: u64,
112 pub direct_data_row_key_stream_local_instructions: u64,
113 pub direct_data_row_row_read_local_instructions: u64,
114 pub direct_data_row_key_encode_local_instructions: u64,
115 pub direct_data_row_store_get_local_instructions: u64,
116 pub direct_data_row_order_window_local_instructions: u64,
117 pub direct_data_row_page_window_local_instructions: u64,
118 pub grouped_stream_local_instructions: u64,
119 pub grouped_fold_local_instructions: u64,
120 pub grouped_finalize_local_instructions: u64,
121 pub grouped_count_borrowed_hash_computations: u64,
122 pub grouped_count_bucket_candidate_checks: u64,
123 pub grouped_count_existing_group_hits: u64,
124 pub grouped_count_new_group_inserts: u64,
125 pub grouped_count_row_materialization_local_instructions: u64,
126 pub grouped_count_group_lookup_local_instructions: u64,
127 pub grouped_count_existing_group_update_local_instructions: u64,
128 pub grouped_count_new_group_insert_local_instructions: u64,
129 pub response_decode_local_instructions: u64,
130 pub execute_local_instructions: u64,
131 pub total_local_instructions: u64,
132 pub shared_query_plan_cache_hits: u64,
133 pub shared_query_plan_cache_misses: u64,
134}
135
136#[cfg(feature = "diagnostics")]
137#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
138struct QueryExecutePhaseAttribution {
139 runtime_local_instructions: u64,
140 finalize_local_instructions: u64,
141 direct_data_row_scan_local_instructions: u64,
142 direct_data_row_key_stream_local_instructions: u64,
143 direct_data_row_row_read_local_instructions: u64,
144 direct_data_row_key_encode_local_instructions: u64,
145 direct_data_row_store_get_local_instructions: u64,
146 direct_data_row_order_window_local_instructions: u64,
147 direct_data_row_page_window_local_instructions: u64,
148 grouped_stream_local_instructions: u64,
149 grouped_fold_local_instructions: u64,
150 grouped_finalize_local_instructions: u64,
151 grouped_count: GroupedCountAttribution,
152}
153
154#[cfg(feature = "diagnostics")]
155#[expect(
156 clippy::missing_const_for_fn,
157 reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
158)]
159fn read_query_local_instruction_counter() -> u64 {
160 #[cfg(target_arch = "wasm32")]
161 {
162 canic_cdk::api::performance_counter(1)
163 }
164
165 #[cfg(not(target_arch = "wasm32"))]
166 {
167 0
168 }
169}
170
171#[cfg(feature = "diagnostics")]
172fn measure_query_stage<T, E>(run: impl FnOnce() -> Result<T, E>) -> (u64, Result<T, E>) {
173 let start = read_query_local_instruction_counter();
174 let result = run();
175 let delta = read_query_local_instruction_counter().saturating_sub(start);
176
177 (delta, result)
178}
179
180impl<C: CanisterKind> DbSession<C> {
181 #[cfg(feature = "diagnostics")]
182 const fn empty_query_execute_phase_attribution() -> QueryExecutePhaseAttribution {
183 QueryExecutePhaseAttribution {
184 runtime_local_instructions: 0,
185 finalize_local_instructions: 0,
186 direct_data_row_scan_local_instructions: 0,
187 direct_data_row_key_stream_local_instructions: 0,
188 direct_data_row_row_read_local_instructions: 0,
189 direct_data_row_key_encode_local_instructions: 0,
190 direct_data_row_store_get_local_instructions: 0,
191 direct_data_row_order_window_local_instructions: 0,
192 direct_data_row_page_window_local_instructions: 0,
193 grouped_stream_local_instructions: 0,
194 grouped_fold_local_instructions: 0,
195 grouped_finalize_local_instructions: 0,
196 grouped_count: GroupedCountAttribution::none(),
197 }
198 }
199
200 #[cfg(feature = "diagnostics")]
201 const fn scalar_query_execute_phase_attribution(
202 phase: ScalarExecutePhaseAttribution,
203 ) -> QueryExecutePhaseAttribution {
204 QueryExecutePhaseAttribution {
205 runtime_local_instructions: phase.runtime_local_instructions,
206 finalize_local_instructions: phase.finalize_local_instructions,
207 direct_data_row_scan_local_instructions: phase.direct_data_row_scan_local_instructions,
208 direct_data_row_key_stream_local_instructions: phase
209 .direct_data_row_key_stream_local_instructions,
210 direct_data_row_row_read_local_instructions: phase
211 .direct_data_row_row_read_local_instructions,
212 direct_data_row_key_encode_local_instructions: phase
213 .direct_data_row_key_encode_local_instructions,
214 direct_data_row_store_get_local_instructions: phase
215 .direct_data_row_store_get_local_instructions,
216 direct_data_row_order_window_local_instructions: phase
217 .direct_data_row_order_window_local_instructions,
218 direct_data_row_page_window_local_instructions: phase
219 .direct_data_row_page_window_local_instructions,
220 grouped_stream_local_instructions: 0,
221 grouped_fold_local_instructions: 0,
222 grouped_finalize_local_instructions: 0,
223 grouped_count: GroupedCountAttribution::none(),
224 }
225 }
226
227 #[cfg(feature = "diagnostics")]
228 const fn grouped_query_execute_phase_attribution(
229 phase: GroupedExecutePhaseAttribution,
230 ) -> QueryExecutePhaseAttribution {
231 QueryExecutePhaseAttribution {
232 runtime_local_instructions: phase
233 .stream_local_instructions
234 .saturating_add(phase.fold_local_instructions),
235 finalize_local_instructions: phase.finalize_local_instructions,
236 direct_data_row_scan_local_instructions: 0,
237 direct_data_row_key_stream_local_instructions: 0,
238 direct_data_row_row_read_local_instructions: 0,
239 direct_data_row_key_encode_local_instructions: 0,
240 direct_data_row_store_get_local_instructions: 0,
241 direct_data_row_order_window_local_instructions: 0,
242 direct_data_row_page_window_local_instructions: 0,
243 grouped_stream_local_instructions: phase.stream_local_instructions,
244 grouped_fold_local_instructions: phase.fold_local_instructions,
245 grouped_finalize_local_instructions: phase.finalize_local_instructions,
246 grouped_count: phase.grouped_count,
247 }
248 }
249
250 fn with_query_plan_cache<R>(&self, f: impl FnOnce(&mut QueryPlanCache) -> R) -> R {
251 let scope_id = self.db.cache_scope_id();
252
253 QUERY_PLAN_CACHES.with(|caches| {
254 let mut caches = caches.borrow_mut();
255 let cache = caches.entry(scope_id).or_default();
256
257 f(cache)
258 })
259 }
260
261 const fn visible_indexes_for_model(
262 model: &'static EntityModel,
263 visibility: QueryPlanVisibility,
264 ) -> VisibleIndexes<'static> {
265 match visibility {
266 QueryPlanVisibility::StoreReady => VisibleIndexes::planner_visible(model.indexes()),
267 QueryPlanVisibility::StoreNotReady => VisibleIndexes::none(),
268 }
269 }
270
271 #[cfg(test)]
272 pub(in crate::db) fn query_plan_cache_len(&self) -> usize {
273 self.with_query_plan_cache(|cache| cache.len())
274 }
275
276 #[cfg(test)]
277 pub(in crate::db) fn clear_query_plan_cache_for_tests(&self) {
278 self.with_query_plan_cache(QueryPlanCache::clear);
279 }
280
281 pub(in crate::db) fn query_plan_visibility_for_store_path(
282 &self,
283 store_path: &'static str,
284 ) -> Result<QueryPlanVisibility, QueryError> {
285 let store = self
286 .db
287 .recovered_store(store_path)
288 .map_err(QueryError::execute)?;
289 let visibility = if store.index_state() == crate::db::IndexState::Ready {
290 QueryPlanVisibility::StoreReady
291 } else {
292 QueryPlanVisibility::StoreNotReady
293 };
294
295 Ok(visibility)
296 }
297
298 pub(in crate::db) fn cached_shared_query_plan_for_authority(
299 &self,
300 authority: crate::db::executor::EntityAuthority,
301 schema_fingerprint: CommitSchemaFingerprint,
302 query: &StructuralQuery,
303 ) -> Result<(SharedPreparedExecutionPlan, QueryPlanCacheAttribution), QueryError> {
304 let visibility = self.query_plan_visibility_for_store_path(authority.store_path())?;
305 let visible_indexes = Self::visible_indexes_for_model(authority.model(), visibility);
306 let planning_state = query.prepare_scalar_planning_state()?;
307 let normalized_predicate_fingerprint = planning_state
308 .normalized_predicate()
309 .map(predicate_fingerprint_normalized);
310 let cache_key =
311 QueryPlanCacheKey::for_authority_with_normalized_predicate_fingerprint_and_method_version(
312 authority,
313 schema_fingerprint,
314 visibility,
315 query,
316 normalized_predicate_fingerprint,
317 SHARED_QUERY_PLAN_CACHE_METHOD_VERSION,
318 );
319
320 {
321 let cached = self.with_query_plan_cache(|cache| cache.get(&cache_key).cloned());
322 if let Some(prepared_plan) = cached {
323 return Ok((prepared_plan, QueryPlanCacheAttribution::hit()));
324 }
325 }
326
327 let plan = query.build_plan_with_visible_indexes_from_scalar_planning_state(
328 &visible_indexes,
329 planning_state,
330 )?;
331 let prepared_plan = SharedPreparedExecutionPlan::from_plan(authority, plan);
332 self.with_query_plan_cache(|cache| {
333 cache.insert(cache_key, prepared_plan.clone());
334 });
335
336 Ok((prepared_plan, QueryPlanCacheAttribution::miss()))
337 }
338
339 #[cfg(test)]
340 pub(in crate::db) fn query_plan_cache_key_for_tests(
341 authority: crate::db::executor::EntityAuthority,
342 schema_fingerprint: CommitSchemaFingerprint,
343 visibility: QueryPlanVisibility,
344 query: &StructuralQuery,
345 cache_method_version: u8,
346 ) -> QueryPlanCacheKey {
347 QueryPlanCacheKey::for_authority_with_method_version(
348 authority,
349 schema_fingerprint,
350 visibility,
351 query,
352 cache_method_version,
353 )
354 }
355
356 fn with_query_visible_indexes<E, T>(
359 &self,
360 query: &Query<E>,
361 op: impl FnOnce(
362 &Query<E>,
363 &crate::db::query::plan::VisibleIndexes<'static>,
364 ) -> Result<T, QueryError>,
365 ) -> Result<T, QueryError>
366 where
367 E: EntityKind<Canister = C>,
368 {
369 let visibility = self.query_plan_visibility_for_store_path(E::Store::PATH)?;
370 let visible_indexes = Self::visible_indexes_for_model(E::MODEL, visibility);
371
372 op(query, &visible_indexes)
373 }
374
375 pub(in crate::db::session) fn cached_prepared_query_plan_for_entity<E>(
376 &self,
377 query: &Query<E>,
378 ) -> Result<(PreparedExecutionPlan<E>, QueryPlanCacheAttribution), QueryError>
379 where
380 E: EntityKind<Canister = C>,
381 {
382 let (prepared_plan, attribution) = self.cached_shared_query_plan_for_entity::<E>(query)?;
383
384 Ok((prepared_plan.typed_clone::<E>(), attribution))
385 }
386
387 fn cached_shared_query_plan_for_entity<E>(
390 &self,
391 query: &Query<E>,
392 ) -> Result<(SharedPreparedExecutionPlan, QueryPlanCacheAttribution), QueryError>
393 where
394 E: EntityKind<Canister = C>,
395 {
396 self.cached_shared_query_plan_for_authority(
397 crate::db::executor::EntityAuthority::for_type::<E>(),
398 crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
399 query.structural(),
400 )
401 }
402
403 fn map_cached_shared_query_plan_for_entity<E, T>(
406 &self,
407 query: &Query<E>,
408 map: impl FnOnce(SharedPreparedExecutionPlan) -> T,
409 ) -> Result<T, QueryError>
410 where
411 E: EntityKind<Canister = C>,
412 {
413 let (prepared_plan, _) = self.cached_shared_query_plan_for_entity::<E>(query)?;
414
415 Ok(map(prepared_plan))
416 }
417
418 pub(in crate::db) fn compile_query_with_visible_indexes<E>(
421 &self,
422 query: &Query<E>,
423 ) -> Result<CompiledQuery<E>, QueryError>
424 where
425 E: EntityKind<Canister = C>,
426 {
427 self.map_cached_shared_query_plan_for_entity(query, CompiledQuery::<E>::from_prepared_plan)
428 }
429
430 pub(in crate::db) fn planned_query_with_visible_indexes<E>(
433 &self,
434 query: &Query<E>,
435 ) -> Result<PlannedQuery<E>, QueryError>
436 where
437 E: EntityKind<Canister = C>,
438 {
439 self.map_cached_shared_query_plan_for_entity(query, PlannedQuery::<E>::from_prepared_plan)
440 }
441
442 pub(in crate::db) fn explain_query_with_visible_indexes<E>(
444 &self,
445 query: &Query<E>,
446 ) -> Result<ExplainPlan, QueryError>
447 where
448 E: EntityKind<Canister = C>,
449 {
450 self.with_query_visible_indexes(query, Query::<E>::explain_with_visible_indexes)
451 }
452
453 pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
456 &self,
457 query: &Query<E>,
458 ) -> Result<String, QueryError>
459 where
460 E: EntityKind<Canister = C>,
461 {
462 self.with_query_visible_indexes(query, Query::<E>::plan_hash_hex_with_visible_indexes)
463 }
464
465 pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
468 &self,
469 query: &Query<E>,
470 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
471 where
472 E: EntityValue + EntityKind<Canister = C>,
473 {
474 self.with_query_visible_indexes(query, Query::<E>::explain_execution_with_visible_indexes)
475 }
476
477 pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
480 &self,
481 query: &Query<E>,
482 ) -> Result<String, QueryError>
483 where
484 E: EntityValue + EntityKind<Canister = C>,
485 {
486 self.with_query_visible_indexes(
487 query,
488 Query::<E>::explain_execution_verbose_with_visible_indexes,
489 )
490 }
491
492 pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
495 &self,
496 query: &Query<E>,
497 strategy: &S,
498 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
499 where
500 E: EntityValue + EntityKind<Canister = C>,
501 S: PreparedFluentAggregateExplainStrategy,
502 {
503 self.with_query_visible_indexes(query, |query, visible_indexes| {
504 query
505 .explain_prepared_aggregate_terminal_with_visible_indexes(visible_indexes, strategy)
506 })
507 }
508
509 pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
512 &self,
513 query: &Query<E>,
514 target_field: &str,
515 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
516 where
517 E: EntityValue + EntityKind<Canister = C>,
518 {
519 self.with_query_visible_indexes(query, |query, visible_indexes| {
520 query.explain_bytes_by_with_visible_indexes(visible_indexes, target_field)
521 })
522 }
523
524 pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
527 &self,
528 query: &Query<E>,
529 strategy: &PreparedFluentProjectionStrategy,
530 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
531 where
532 E: EntityValue + EntityKind<Canister = C>,
533 {
534 self.with_query_visible_indexes(query, |query, visible_indexes| {
535 query.explain_prepared_projection_terminal_with_visible_indexes(
536 visible_indexes,
537 strategy,
538 )
539 })
540 }
541
542 fn ensure_scalar_paged_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
545 match family {
546 ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
547 CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
548 )),
549 ExecutionFamily::Ordered => Ok(()),
550 ExecutionFamily::Grouped => Err(QueryError::invariant(
551 "grouped queries execute via execute(), not page().execute()",
552 )),
553 }
554 }
555
556 fn ensure_grouped_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
559 match family {
560 ExecutionFamily::Grouped => Ok(()),
561 ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
562 "grouped execution requires grouped logical plans",
563 )),
564 }
565 }
566
567 fn finalize_grouped_execution_page(
571 page: GroupedCursorPage,
572 trace: Option<ExecutionTrace>,
573 ) -> Result<PagedGroupedExecutionWithTrace, QueryError> {
574 let next_cursor = page
575 .next_cursor
576 .map(|token| {
577 let Some(token) = token.as_grouped() else {
578 return Err(QueryError::grouped_paged_emitted_scalar_continuation());
579 };
580
581 token.encode().map_err(|err| {
582 QueryError::serialize_internal(format!(
583 "failed to serialize grouped continuation cursor: {err}"
584 ))
585 })
586 })
587 .transpose()?;
588
589 Ok(PagedGroupedExecutionWithTrace::new(
590 page.rows,
591 next_cursor,
592 trace,
593 ))
594 }
595
596 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
598 where
599 E: PersistedRow<Canister = C> + EntityValue,
600 {
601 let mode = query.mode();
603 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
604
605 self.execute_query_dyn(mode, plan)
607 }
608
609 #[cfg(feature = "diagnostics")]
612 #[doc(hidden)]
613 #[expect(
614 clippy::too_many_lines,
615 reason = "the diagnostics-only attribution path keeps grouped and scalar execution on one explicit compile/execute accounting seam"
616 )]
617 pub fn execute_query_result_with_attribution<E>(
618 &self,
619 query: &Query<E>,
620 ) -> Result<(LoadQueryResult<E>, QueryExecutionAttribution), QueryError>
621 where
622 E: PersistedRow<Canister = C> + EntityValue,
623 {
624 let (compile_local_instructions, plan_and_cache) =
629 measure_query_stage(|| self.cached_prepared_query_plan_for_entity::<E>(query));
630 let (plan, cache_attribution) = plan_and_cache?;
631
632 let (execute_local_instructions, result) = measure_query_stage(
635 || -> Result<(LoadQueryResult<E>, QueryExecutePhaseAttribution, u64), QueryError> {
636 if query.has_grouping() {
637 let (page, trace, phase_attribution) =
638 self.execute_grouped_plan_with(plan, None, |executor, plan, cursor| {
639 executor
640 .execute_grouped_paged_with_cursor_traced_with_phase_attribution(
641 plan, cursor,
642 )
643 })?;
644 let grouped = Self::finalize_grouped_execution_page(page, trace)?;
645
646 Ok((
647 LoadQueryResult::Grouped(grouped),
648 Self::grouped_query_execute_phase_attribution(phase_attribution),
649 0,
650 ))
651 } else {
652 match query.mode() {
653 QueryMode::Load(_) => {
654 let (rows, phase_attribution, response_decode_local_instructions) =
655 self.load_executor::<E>()
656 .execute_with_phase_attribution(plan)
657 .map_err(QueryError::execute)?;
658
659 Ok((
660 LoadQueryResult::Rows(rows),
661 Self::scalar_query_execute_phase_attribution(phase_attribution),
662 response_decode_local_instructions,
663 ))
664 }
665 QueryMode::Delete(_) => {
666 let result = self.execute_query_dyn(query.mode(), plan)?;
667
668 Ok((
669 LoadQueryResult::Rows(result),
670 Self::empty_query_execute_phase_attribution(),
671 0,
672 ))
673 }
674 }
675 }
676 },
677 );
678 let (result, execute_phase_attribution, response_decode_local_instructions) = result?;
679 let total_local_instructions =
680 compile_local_instructions.saturating_add(execute_local_instructions);
681
682 Ok((
683 result,
684 QueryExecutionAttribution {
685 compile_local_instructions,
686 runtime_local_instructions: execute_phase_attribution.runtime_local_instructions,
687 finalize_local_instructions: execute_phase_attribution.finalize_local_instructions,
688 direct_data_row_scan_local_instructions: execute_phase_attribution
689 .direct_data_row_scan_local_instructions,
690 direct_data_row_key_stream_local_instructions: execute_phase_attribution
691 .direct_data_row_key_stream_local_instructions,
692 direct_data_row_row_read_local_instructions: execute_phase_attribution
693 .direct_data_row_row_read_local_instructions,
694 direct_data_row_key_encode_local_instructions: execute_phase_attribution
695 .direct_data_row_key_encode_local_instructions,
696 direct_data_row_store_get_local_instructions: execute_phase_attribution
697 .direct_data_row_store_get_local_instructions,
698 direct_data_row_order_window_local_instructions: execute_phase_attribution
699 .direct_data_row_order_window_local_instructions,
700 direct_data_row_page_window_local_instructions: execute_phase_attribution
701 .direct_data_row_page_window_local_instructions,
702 grouped_stream_local_instructions: execute_phase_attribution
703 .grouped_stream_local_instructions,
704 grouped_fold_local_instructions: execute_phase_attribution
705 .grouped_fold_local_instructions,
706 grouped_finalize_local_instructions: execute_phase_attribution
707 .grouped_finalize_local_instructions,
708 grouped_count_borrowed_hash_computations: execute_phase_attribution
709 .grouped_count
710 .borrowed_hash_computations,
711 grouped_count_bucket_candidate_checks: execute_phase_attribution
712 .grouped_count
713 .bucket_candidate_checks,
714 grouped_count_existing_group_hits: execute_phase_attribution
715 .grouped_count
716 .existing_group_hits,
717 grouped_count_new_group_inserts: execute_phase_attribution
718 .grouped_count
719 .new_group_inserts,
720 grouped_count_row_materialization_local_instructions: execute_phase_attribution
721 .grouped_count
722 .row_materialization_local_instructions,
723 grouped_count_group_lookup_local_instructions: execute_phase_attribution
724 .grouped_count
725 .group_lookup_local_instructions,
726 grouped_count_existing_group_update_local_instructions: execute_phase_attribution
727 .grouped_count
728 .existing_group_update_local_instructions,
729 grouped_count_new_group_insert_local_instructions: execute_phase_attribution
730 .grouped_count
731 .new_group_insert_local_instructions,
732 response_decode_local_instructions,
733 execute_local_instructions,
734 total_local_instructions,
735 shared_query_plan_cache_hits: cache_attribution.hits,
736 shared_query_plan_cache_misses: cache_attribution.misses,
737 },
738 ))
739 }
740
741 #[doc(hidden)]
744 pub fn execute_query_result<E>(
745 &self,
746 query: &Query<E>,
747 ) -> Result<LoadQueryResult<E>, QueryError>
748 where
749 E: PersistedRow<Canister = C> + EntityValue,
750 {
751 if query.has_grouping() {
752 return self
753 .execute_grouped(query, None)
754 .map(LoadQueryResult::Grouped);
755 }
756
757 self.execute_query(query).map(LoadQueryResult::Rows)
758 }
759
760 #[doc(hidden)]
762 pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
763 where
764 E: PersistedRow<Canister = C> + EntityValue,
765 {
766 if !query.mode().is_delete() {
768 return Err(QueryError::unsupported_query(
769 "delete count execution requires delete query mode",
770 ));
771 }
772
773 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
777
778 self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
780 .map_err(QueryError::execute)
781 }
782
783 pub(in crate::db) fn execute_query_dyn<E>(
788 &self,
789 mode: QueryMode,
790 plan: PreparedExecutionPlan<E>,
791 ) -> Result<EntityResponse<E>, QueryError>
792 where
793 E: PersistedRow<Canister = C> + EntityValue,
794 {
795 let result = match mode {
796 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
797 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
798 };
799
800 result.map_err(QueryError::execute)
801 }
802
803 pub(in crate::db) fn execute_load_query_with<E, T>(
806 &self,
807 query: &Query<E>,
808 op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
809 ) -> Result<T, QueryError>
810 where
811 E: PersistedRow<Canister = C> + EntityValue,
812 {
813 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
814
815 self.with_metrics(|| op(self.load_executor::<E>(), plan))
816 .map_err(QueryError::execute)
817 }
818
819 pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
824 where
825 E: EntityKind<Canister = C>,
826 {
827 let (prepared_plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
828 let logical_plan = prepared_plan.logical_plan();
829 let explain = logical_plan.explain();
830 let plan_hash = logical_plan.fingerprint().to_string();
831 let access_strategy = AccessStrategy::from_plan(prepared_plan.access()).debug_summary();
832 let execution_family = match query.mode() {
833 QueryMode::Load(_) => Some(
834 prepared_plan
835 .execution_family()
836 .map_err(QueryError::execute)?,
837 ),
838 QueryMode::Delete(_) => None,
839 };
840
841 Ok(QueryTracePlan::new(
842 plan_hash,
843 access_strategy,
844 execution_family,
845 explain,
846 ))
847 }
848
849 pub(crate) fn execute_load_query_paged_with_trace<E>(
851 &self,
852 query: &Query<E>,
853 cursor_token: Option<&str>,
854 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
855 where
856 E: PersistedRow<Canister = C> + EntityValue,
857 {
858 let plan = self.cached_prepared_query_plan_for_entity::<E>(query)?.0;
860 Self::ensure_scalar_paged_execution_family(
861 plan.execution_family().map_err(QueryError::execute)?,
862 )?;
863
864 let cursor_bytes = decode_optional_cursor_token(cursor_token)
866 .map_err(QueryError::from_cursor_plan_error)?;
867 let cursor = plan
868 .prepare_cursor(cursor_bytes.as_deref())
869 .map_err(QueryError::from_executor_plan_error)?;
870
871 let (page, trace) = self
873 .with_metrics(|| {
874 self.load_executor::<E>()
875 .execute_paged_with_cursor_traced(plan, cursor)
876 })
877 .map_err(QueryError::execute)?;
878 let next_cursor = page
879 .next_cursor
880 .map(|token| {
881 let Some(token) = token.as_scalar() else {
882 return Err(QueryError::scalar_paged_emitted_grouped_continuation());
883 };
884
885 token.encode().map_err(|err| {
886 QueryError::serialize_internal(format!(
887 "failed to serialize continuation cursor: {err}"
888 ))
889 })
890 })
891 .transpose()?;
892
893 Ok(PagedLoadExecutionWithTrace::new(
894 page.items,
895 next_cursor,
896 trace,
897 ))
898 }
899
900 pub(in crate::db) fn execute_grouped<E>(
905 &self,
906 query: &Query<E>,
907 cursor_token: Option<&str>,
908 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
909 where
910 E: PersistedRow<Canister = C> + EntityValue,
911 {
912 let plan = self.cached_prepared_query_plan_for_entity::<E>(query)?.0;
914
915 let (page, trace) = self.execute_grouped_plan_with_trace(plan, cursor_token)?;
918
919 Self::finalize_grouped_execution_page(page, trace)
920 }
921
922 fn execute_grouped_plan_with<E, T>(
925 &self,
926 plan: PreparedExecutionPlan<E>,
927 cursor_token: Option<&str>,
928 op: impl FnOnce(
929 LoadExecutor<E>,
930 PreparedExecutionPlan<E>,
931 crate::db::cursor::GroupedPlannedCursor,
932 ) -> Result<T, InternalError>,
933 ) -> Result<T, QueryError>
934 where
935 E: PersistedRow<Canister = C> + EntityValue,
936 {
937 Self::ensure_grouped_execution_family(
939 plan.execution_family().map_err(QueryError::execute)?,
940 )?;
941
942 let cursor = decode_optional_grouped_cursor_token(cursor_token)
944 .map_err(QueryError::from_cursor_plan_error)?;
945 let cursor = plan
946 .prepare_grouped_cursor_token(cursor)
947 .map_err(QueryError::from_executor_plan_error)?;
948
949 self.with_metrics(|| op(self.load_executor::<E>(), plan, cursor))
952 .map_err(QueryError::execute)
953 }
954
955 fn execute_grouped_plan_with_trace<E>(
957 &self,
958 plan: PreparedExecutionPlan<E>,
959 cursor_token: Option<&str>,
960 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
961 where
962 E: PersistedRow<Canister = C> + EntityValue,
963 {
964 self.execute_grouped_plan_with(plan, cursor_token, |executor, plan, cursor| {
965 executor.execute_grouped_paged_with_cursor_traced(plan, cursor)
966 })
967 }
968}
969
970impl QueryPlanCacheKey {
971 const fn from_authority_parts(
975 authority: crate::db::executor::EntityAuthority,
976 schema_fingerprint: CommitSchemaFingerprint,
977 visibility: QueryPlanVisibility,
978 structural_query: crate::db::query::intent::StructuralQueryCacheKey,
979 cache_method_version: u8,
980 ) -> Self {
981 Self {
982 cache_method_version,
983 entity_path: authority.entity_path(),
984 schema_fingerprint,
985 visibility,
986 structural_query,
987 }
988 }
989
990 #[cfg(test)]
991 fn for_authority_with_method_version(
992 authority: crate::db::executor::EntityAuthority,
993 schema_fingerprint: CommitSchemaFingerprint,
994 visibility: QueryPlanVisibility,
995 query: &StructuralQuery,
996 cache_method_version: u8,
997 ) -> Self {
998 Self::from_authority_parts(
999 authority,
1000 schema_fingerprint,
1001 visibility,
1002 query.structural_cache_key(),
1003 cache_method_version,
1004 )
1005 }
1006
1007 fn for_authority_with_normalized_predicate_fingerprint_and_method_version(
1008 authority: crate::db::executor::EntityAuthority,
1009 schema_fingerprint: CommitSchemaFingerprint,
1010 visibility: QueryPlanVisibility,
1011 query: &StructuralQuery,
1012 normalized_predicate_fingerprint: Option<[u8; 32]>,
1013 cache_method_version: u8,
1014 ) -> Self {
1015 Self::from_authority_parts(
1016 authority,
1017 schema_fingerprint,
1018 visibility,
1019 query.structural_cache_key_with_normalized_predicate_fingerprint(
1020 normalized_predicate_fingerprint,
1021 ),
1022 cache_method_version,
1023 )
1024 }
1025}