1#[cfg(feature = "diagnostics")]
8use crate::db::executor::{
9 GroupedCountAttribution, GroupedExecutePhaseAttribution, ScalarExecutePhaseAttribution,
10};
11use crate::{
12 db::{
13 DbSession, EntityResponse, LoadQueryResult, PagedGroupedExecutionWithTrace,
14 PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
15 access::AccessStrategy,
16 commit::CommitSchemaFingerprint,
17 cursor::{
18 CursorPlanError, decode_optional_cursor_token, decode_optional_grouped_cursor_token,
19 },
20 diagnostics::ExecutionTrace,
21 executor::{
22 ExecutionFamily, GroupedCursorPage, LoadExecutor, PreparedExecutionPlan,
23 SharedPreparedExecutionPlan,
24 },
25 query::builder::{
26 PreparedFluentAggregateExplainStrategy, PreparedFluentProjectionStrategy,
27 },
28 query::explain::{
29 ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
30 },
31 query::{
32 intent::{CompiledQuery, PlannedQuery, StructuralQuery},
33 plan::{AccessPlannedQuery, QueryMode, VisibleIndexes},
34 },
35 },
36 error::InternalError,
37 model::entity::EntityModel,
38 traits::{CanisterKind, EntityKind, EntityValue, Path},
39};
40#[cfg(feature = "diagnostics")]
41use candid::CandidType;
42use icydb_utils::Xxh3;
43#[cfg(feature = "diagnostics")]
44use serde::Deserialize;
45use std::{cell::RefCell, collections::HashMap, hash::BuildHasherDefault};
46
47type CacheBuildHasher = BuildHasherDefault<Xxh3>;
48
49const SHARED_QUERY_PLAN_CACHE_METHOD_VERSION: u8 = 1;
52
53#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
54pub(in crate::db) enum QueryPlanVisibility {
55 StoreNotReady,
56 StoreReady,
57}
58
59#[derive(Clone, Debug, Eq, Hash, PartialEq)]
60pub(in crate::db) struct QueryPlanCacheKey {
61 cache_method_version: u8,
62 entity_path: &'static str,
63 schema_fingerprint: CommitSchemaFingerprint,
64 visibility: QueryPlanVisibility,
65 structural_query: crate::db::query::intent::StructuralQueryCacheKey,
66}
67
68#[derive(Clone, Debug)]
69pub(in crate::db) struct QueryPlanCacheEntry {
70 logical_plan: AccessPlannedQuery,
71 prepared_plan: SharedPreparedExecutionPlan,
72}
73
74impl QueryPlanCacheEntry {
75 #[must_use]
76 pub(in crate::db) const fn new(
77 logical_plan: AccessPlannedQuery,
78 prepared_plan: SharedPreparedExecutionPlan,
79 ) -> Self {
80 Self {
81 logical_plan,
82 prepared_plan,
83 }
84 }
85
86 #[must_use]
87 pub(in crate::db) const fn logical_plan(&self) -> &AccessPlannedQuery {
88 &self.logical_plan
89 }
90
91 #[must_use]
92 pub(in crate::db) fn typed_prepared_plan<E: EntityKind>(&self) -> PreparedExecutionPlan<E> {
93 self.prepared_plan.typed_clone::<E>()
94 }
95
96 #[must_use]
97 pub(in crate::db) const fn prepared_plan(&self) -> &SharedPreparedExecutionPlan {
98 &self.prepared_plan
99 }
100}
101
102pub(in crate::db) type QueryPlanCache =
103 HashMap<QueryPlanCacheKey, QueryPlanCacheEntry, CacheBuildHasher>;
104
105thread_local! {
106 static QUERY_PLAN_CACHES: RefCell<HashMap<usize, QueryPlanCache, CacheBuildHasher>> =
111 RefCell::new(HashMap::default());
112}
113
114#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
115pub(in crate::db) struct QueryPlanCacheAttribution {
116 pub hits: u64,
117 pub misses: u64,
118}
119
120impl QueryPlanCacheAttribution {
121 #[must_use]
122 const fn hit() -> Self {
123 Self { hits: 1, misses: 0 }
124 }
125
126 #[must_use]
127 const fn miss() -> Self {
128 Self { hits: 0, misses: 1 }
129 }
130}
131
132#[cfg(feature = "diagnostics")]
139#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
140pub struct QueryExecutionAttribution {
141 pub compile_local_instructions: u64,
142 pub runtime_local_instructions: u64,
143 pub finalize_local_instructions: u64,
144 pub direct_data_row_scan_local_instructions: u64,
145 pub direct_data_row_key_stream_local_instructions: u64,
146 pub direct_data_row_row_read_local_instructions: u64,
147 pub direct_data_row_key_encode_local_instructions: u64,
148 pub direct_data_row_store_get_local_instructions: u64,
149 pub direct_data_row_order_window_local_instructions: u64,
150 pub direct_data_row_page_window_local_instructions: u64,
151 pub grouped_stream_local_instructions: u64,
152 pub grouped_fold_local_instructions: u64,
153 pub grouped_finalize_local_instructions: u64,
154 pub grouped_count_borrowed_hash_computations: u64,
155 pub grouped_count_bucket_candidate_checks: u64,
156 pub grouped_count_existing_group_hits: u64,
157 pub grouped_count_new_group_inserts: u64,
158 pub grouped_count_row_materialization_local_instructions: u64,
159 pub grouped_count_group_lookup_local_instructions: u64,
160 pub grouped_count_existing_group_update_local_instructions: u64,
161 pub grouped_count_new_group_insert_local_instructions: u64,
162 pub response_decode_local_instructions: u64,
163 pub execute_local_instructions: u64,
164 pub total_local_instructions: u64,
165 pub shared_query_plan_cache_hits: u64,
166 pub shared_query_plan_cache_misses: u64,
167}
168
169#[cfg(feature = "diagnostics")]
170#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
171struct QueryExecutePhaseAttribution {
172 runtime_local_instructions: u64,
173 finalize_local_instructions: u64,
174 direct_data_row_scan_local_instructions: u64,
175 direct_data_row_key_stream_local_instructions: u64,
176 direct_data_row_row_read_local_instructions: u64,
177 direct_data_row_key_encode_local_instructions: u64,
178 direct_data_row_store_get_local_instructions: u64,
179 direct_data_row_order_window_local_instructions: u64,
180 direct_data_row_page_window_local_instructions: u64,
181 grouped_stream_local_instructions: u64,
182 grouped_fold_local_instructions: u64,
183 grouped_finalize_local_instructions: u64,
184 grouped_count: GroupedCountAttribution,
185}
186
187#[cfg(feature = "diagnostics")]
188#[expect(
189 clippy::missing_const_for_fn,
190 reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
191)]
192fn read_query_local_instruction_counter() -> u64 {
193 #[cfg(target_arch = "wasm32")]
194 {
195 canic_cdk::api::performance_counter(1)
196 }
197
198 #[cfg(not(target_arch = "wasm32"))]
199 {
200 0
201 }
202}
203
204#[cfg(feature = "diagnostics")]
205fn measure_query_stage<T, E>(run: impl FnOnce() -> Result<T, E>) -> (u64, Result<T, E>) {
206 let start = read_query_local_instruction_counter();
207 let result = run();
208 let delta = read_query_local_instruction_counter().saturating_sub(start);
209
210 (delta, result)
211}
212
213impl<C: CanisterKind> DbSession<C> {
214 #[cfg(feature = "diagnostics")]
215 const fn empty_query_execute_phase_attribution() -> QueryExecutePhaseAttribution {
216 QueryExecutePhaseAttribution {
217 runtime_local_instructions: 0,
218 finalize_local_instructions: 0,
219 direct_data_row_scan_local_instructions: 0,
220 direct_data_row_key_stream_local_instructions: 0,
221 direct_data_row_row_read_local_instructions: 0,
222 direct_data_row_key_encode_local_instructions: 0,
223 direct_data_row_store_get_local_instructions: 0,
224 direct_data_row_order_window_local_instructions: 0,
225 direct_data_row_page_window_local_instructions: 0,
226 grouped_stream_local_instructions: 0,
227 grouped_fold_local_instructions: 0,
228 grouped_finalize_local_instructions: 0,
229 grouped_count: GroupedCountAttribution::none(),
230 }
231 }
232
233 #[cfg(feature = "diagnostics")]
234 const fn scalar_query_execute_phase_attribution(
235 phase: ScalarExecutePhaseAttribution,
236 ) -> QueryExecutePhaseAttribution {
237 QueryExecutePhaseAttribution {
238 runtime_local_instructions: phase.runtime_local_instructions,
239 finalize_local_instructions: phase.finalize_local_instructions,
240 direct_data_row_scan_local_instructions: phase.direct_data_row_scan_local_instructions,
241 direct_data_row_key_stream_local_instructions: phase
242 .direct_data_row_key_stream_local_instructions,
243 direct_data_row_row_read_local_instructions: phase
244 .direct_data_row_row_read_local_instructions,
245 direct_data_row_key_encode_local_instructions: phase
246 .direct_data_row_key_encode_local_instructions,
247 direct_data_row_store_get_local_instructions: phase
248 .direct_data_row_store_get_local_instructions,
249 direct_data_row_order_window_local_instructions: phase
250 .direct_data_row_order_window_local_instructions,
251 direct_data_row_page_window_local_instructions: phase
252 .direct_data_row_page_window_local_instructions,
253 grouped_stream_local_instructions: 0,
254 grouped_fold_local_instructions: 0,
255 grouped_finalize_local_instructions: 0,
256 grouped_count: GroupedCountAttribution::none(),
257 }
258 }
259
260 #[cfg(feature = "diagnostics")]
261 const fn grouped_query_execute_phase_attribution(
262 phase: GroupedExecutePhaseAttribution,
263 ) -> QueryExecutePhaseAttribution {
264 QueryExecutePhaseAttribution {
265 runtime_local_instructions: phase
266 .stream_local_instructions
267 .saturating_add(phase.fold_local_instructions),
268 finalize_local_instructions: phase.finalize_local_instructions,
269 direct_data_row_scan_local_instructions: 0,
270 direct_data_row_key_stream_local_instructions: 0,
271 direct_data_row_row_read_local_instructions: 0,
272 direct_data_row_key_encode_local_instructions: 0,
273 direct_data_row_store_get_local_instructions: 0,
274 direct_data_row_order_window_local_instructions: 0,
275 direct_data_row_page_window_local_instructions: 0,
276 grouped_stream_local_instructions: phase.stream_local_instructions,
277 grouped_fold_local_instructions: phase.fold_local_instructions,
278 grouped_finalize_local_instructions: phase.finalize_local_instructions,
279 grouped_count: phase.grouped_count,
280 }
281 }
282
283 fn with_query_plan_cache<R>(&self, f: impl FnOnce(&mut QueryPlanCache) -> R) -> R {
284 let scope_id = self.db.cache_scope_id();
285
286 QUERY_PLAN_CACHES.with(|caches| {
287 let mut caches = caches.borrow_mut();
288 let cache = caches.entry(scope_id).or_default();
289
290 f(cache)
291 })
292 }
293
294 const fn visible_indexes_for_model(
295 model: &'static EntityModel,
296 visibility: QueryPlanVisibility,
297 ) -> VisibleIndexes<'static> {
298 match visibility {
299 QueryPlanVisibility::StoreReady => VisibleIndexes::planner_visible(model.indexes()),
300 QueryPlanVisibility::StoreNotReady => VisibleIndexes::none(),
301 }
302 }
303
304 #[cfg(test)]
305 pub(in crate::db) fn query_plan_cache_len(&self) -> usize {
306 self.with_query_plan_cache(|cache| cache.len())
307 }
308
309 #[cfg(test)]
310 pub(in crate::db) fn clear_query_plan_cache_for_tests(&self) {
311 self.with_query_plan_cache(QueryPlanCache::clear);
312 }
313
314 pub(in crate::db) fn query_plan_visibility_for_store_path(
315 &self,
316 store_path: &'static str,
317 ) -> Result<QueryPlanVisibility, QueryError> {
318 let store = self
319 .db
320 .recovered_store(store_path)
321 .map_err(QueryError::execute)?;
322 let visibility = if store.index_state() == crate::db::IndexState::Ready {
323 QueryPlanVisibility::StoreReady
324 } else {
325 QueryPlanVisibility::StoreNotReady
326 };
327
328 Ok(visibility)
329 }
330
331 pub(in crate::db) fn cached_query_plan_entry_for_authority(
332 &self,
333 authority: crate::db::executor::EntityAuthority,
334 schema_fingerprint: CommitSchemaFingerprint,
335 query: &StructuralQuery,
336 ) -> Result<(QueryPlanCacheEntry, QueryPlanCacheAttribution), QueryError> {
337 let visibility = self.query_plan_visibility_for_store_path(authority.store_path())?;
338 let visible_indexes = Self::visible_indexes_for_model(authority.model(), visibility);
339 let normalized_predicate = query.prepare_normalized_scalar_predicate()?;
340 let cache_key =
341 QueryPlanCacheKey::for_authority_with_normalized_predicate_and_method_version(
342 authority,
343 schema_fingerprint,
344 visibility,
345 query,
346 normalized_predicate.as_ref(),
347 SHARED_QUERY_PLAN_CACHE_METHOD_VERSION,
348 );
349
350 {
351 let cached = self.with_query_plan_cache(|cache| cache.get(&cache_key).cloned());
352 if let Some(entry) = cached {
353 return Ok((entry, QueryPlanCacheAttribution::hit()));
354 }
355 }
356
357 let plan = query.build_plan_with_visible_indexes_from_normalized_predicate(
358 &visible_indexes,
359 normalized_predicate,
360 )?;
361 let entry = QueryPlanCacheEntry::new(
362 plan.clone(),
363 SharedPreparedExecutionPlan::from_plan(authority, plan),
364 );
365 self.with_query_plan_cache(|cache| {
366 cache.insert(cache_key, entry.clone());
367 });
368
369 Ok((entry, QueryPlanCacheAttribution::miss()))
370 }
371
372 #[cfg(test)]
373 pub(in crate::db) fn query_plan_cache_key_for_tests(
374 authority: crate::db::executor::EntityAuthority,
375 schema_fingerprint: CommitSchemaFingerprint,
376 visibility: QueryPlanVisibility,
377 query: &StructuralQuery,
378 cache_method_version: u8,
379 ) -> QueryPlanCacheKey {
380 QueryPlanCacheKey::for_authority_with_method_version(
381 authority,
382 schema_fingerprint,
383 visibility,
384 query,
385 cache_method_version,
386 )
387 }
388
389 fn with_query_visible_indexes<E, T>(
392 &self,
393 query: &Query<E>,
394 op: impl FnOnce(
395 &Query<E>,
396 &crate::db::query::plan::VisibleIndexes<'static>,
397 ) -> Result<T, QueryError>,
398 ) -> Result<T, QueryError>
399 where
400 E: EntityKind<Canister = C>,
401 {
402 let visibility = self.query_plan_visibility_for_store_path(E::Store::PATH)?;
403 let visible_indexes = Self::visible_indexes_for_model(E::MODEL, visibility);
404
405 op(query, &visible_indexes)
406 }
407
408 pub(in crate::db::session) fn cached_prepared_query_plan_for_entity<E>(
409 &self,
410 query: &StructuralQuery,
411 ) -> Result<(PreparedExecutionPlan<E>, QueryPlanCacheAttribution), QueryError>
412 where
413 E: EntityKind<Canister = C>,
414 {
415 let (entry, attribution) = self.cached_query_plan_entry_for_authority(
416 crate::db::executor::EntityAuthority::for_type::<E>(),
417 crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
418 query,
419 )?;
420
421 Ok((entry.typed_prepared_plan::<E>(), attribution))
422 }
423
424 pub(in crate::db) fn compile_query_with_visible_indexes<E>(
427 &self,
428 query: &Query<E>,
429 ) -> Result<CompiledQuery<E>, QueryError>
430 where
431 E: EntityKind<Canister = C>,
432 {
433 let (entry, _) = self.cached_query_plan_entry_for_authority(
434 crate::db::executor::EntityAuthority::for_type::<E>(),
435 crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
436 query.structural(),
437 )?;
438
439 Ok(Query::<E>::compiled_query_from_plan(
440 entry.logical_plan().clone(),
441 ))
442 }
443
444 pub(in crate::db) fn planned_query_with_visible_indexes<E>(
447 &self,
448 query: &Query<E>,
449 ) -> Result<PlannedQuery<E>, QueryError>
450 where
451 E: EntityKind<Canister = C>,
452 {
453 let (entry, _) = self.cached_query_plan_entry_for_authority(
454 crate::db::executor::EntityAuthority::for_type::<E>(),
455 crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
456 query.structural(),
457 )?;
458
459 Ok(Query::<E>::planned_query_from_plan(
460 entry.logical_plan().clone(),
461 ))
462 }
463
464 pub(in crate::db) fn explain_query_with_visible_indexes<E>(
466 &self,
467 query: &Query<E>,
468 ) -> Result<ExplainPlan, QueryError>
469 where
470 E: EntityKind<Canister = C>,
471 {
472 self.with_query_visible_indexes(query, Query::<E>::explain_with_visible_indexes)
473 }
474
475 pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
478 &self,
479 query: &Query<E>,
480 ) -> Result<String, QueryError>
481 where
482 E: EntityKind<Canister = C>,
483 {
484 self.with_query_visible_indexes(query, Query::<E>::plan_hash_hex_with_visible_indexes)
485 }
486
487 pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
490 &self,
491 query: &Query<E>,
492 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
493 where
494 E: EntityValue + EntityKind<Canister = C>,
495 {
496 self.with_query_visible_indexes(query, Query::<E>::explain_execution_with_visible_indexes)
497 }
498
499 pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
502 &self,
503 query: &Query<E>,
504 ) -> Result<String, QueryError>
505 where
506 E: EntityValue + EntityKind<Canister = C>,
507 {
508 self.with_query_visible_indexes(
509 query,
510 Query::<E>::explain_execution_verbose_with_visible_indexes,
511 )
512 }
513
514 pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
517 &self,
518 query: &Query<E>,
519 strategy: &S,
520 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
521 where
522 E: EntityValue + EntityKind<Canister = C>,
523 S: PreparedFluentAggregateExplainStrategy,
524 {
525 self.with_query_visible_indexes(query, |query, visible_indexes| {
526 query
527 .explain_prepared_aggregate_terminal_with_visible_indexes(visible_indexes, strategy)
528 })
529 }
530
531 pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
534 &self,
535 query: &Query<E>,
536 target_field: &str,
537 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
538 where
539 E: EntityValue + EntityKind<Canister = C>,
540 {
541 self.with_query_visible_indexes(query, |query, visible_indexes| {
542 query.explain_bytes_by_with_visible_indexes(visible_indexes, target_field)
543 })
544 }
545
546 pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
549 &self,
550 query: &Query<E>,
551 strategy: &PreparedFluentProjectionStrategy,
552 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
553 where
554 E: EntityValue + EntityKind<Canister = C>,
555 {
556 self.with_query_visible_indexes(query, |query, visible_indexes| {
557 query.explain_prepared_projection_terminal_with_visible_indexes(
558 visible_indexes,
559 strategy,
560 )
561 })
562 }
563
564 fn ensure_scalar_paged_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
567 match family {
568 ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
569 CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
570 )),
571 ExecutionFamily::Ordered => Ok(()),
572 ExecutionFamily::Grouped => Err(QueryError::invariant(
573 "grouped queries execute via execute(), not page().execute()",
574 )),
575 }
576 }
577
578 fn ensure_grouped_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
581 match family {
582 ExecutionFamily::Grouped => Ok(()),
583 ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
584 "grouped execution requires grouped logical plans",
585 )),
586 }
587 }
588
589 fn finalize_grouped_execution_page(
593 page: GroupedCursorPage,
594 trace: Option<ExecutionTrace>,
595 ) -> Result<PagedGroupedExecutionWithTrace, QueryError> {
596 let next_cursor = page
597 .next_cursor
598 .map(|token| {
599 let Some(token) = token.as_grouped() else {
600 return Err(QueryError::grouped_paged_emitted_scalar_continuation());
601 };
602
603 token.encode().map_err(|err| {
604 QueryError::serialize_internal(format!(
605 "failed to serialize grouped continuation cursor: {err}"
606 ))
607 })
608 })
609 .transpose()?;
610
611 Ok(PagedGroupedExecutionWithTrace::new(
612 page.rows,
613 next_cursor,
614 trace,
615 ))
616 }
617
618 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
620 where
621 E: PersistedRow<Canister = C> + EntityValue,
622 {
623 let mode = query.mode();
625 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
626
627 self.execute_query_dyn(mode, plan)
629 }
630
631 #[cfg(feature = "diagnostics")]
634 #[doc(hidden)]
635 #[expect(
636 clippy::too_many_lines,
637 reason = "the diagnostics-only attribution path keeps grouped and scalar execution on one explicit compile/execute accounting seam"
638 )]
639 pub fn execute_query_result_with_attribution<E>(
640 &self,
641 query: &Query<E>,
642 ) -> Result<(LoadQueryResult<E>, QueryExecutionAttribution), QueryError>
643 where
644 E: PersistedRow<Canister = C> + EntityValue,
645 {
646 let (compile_local_instructions, plan_and_cache) = measure_query_stage(|| {
651 self.cached_prepared_query_plan_for_entity::<E>(query.structural())
652 });
653 let (plan, cache_attribution) = plan_and_cache?;
654
655 let (execute_local_instructions, result) = measure_query_stage(
658 || -> Result<(LoadQueryResult<E>, QueryExecutePhaseAttribution, u64), QueryError> {
659 if query.has_grouping() {
660 let (page, trace, phase_attribution) =
661 self.execute_grouped_plan_with(plan, None, |executor, plan, cursor| {
662 executor
663 .execute_grouped_paged_with_cursor_traced_with_phase_attribution(
664 plan, cursor,
665 )
666 })?;
667 let grouped = Self::finalize_grouped_execution_page(page, trace)?;
668
669 Ok((
670 LoadQueryResult::Grouped(grouped),
671 Self::grouped_query_execute_phase_attribution(phase_attribution),
672 0,
673 ))
674 } else {
675 match query.mode() {
676 QueryMode::Load(_) => {
677 let (rows, phase_attribution, response_decode_local_instructions) =
678 self.load_executor::<E>()
679 .execute_with_phase_attribution(plan)
680 .map_err(QueryError::execute)?;
681
682 Ok((
683 LoadQueryResult::Rows(rows),
684 Self::scalar_query_execute_phase_attribution(phase_attribution),
685 response_decode_local_instructions,
686 ))
687 }
688 QueryMode::Delete(_) => {
689 let result = self.execute_query_dyn(query.mode(), plan)?;
690
691 Ok((
692 LoadQueryResult::Rows(result),
693 Self::empty_query_execute_phase_attribution(),
694 0,
695 ))
696 }
697 }
698 }
699 },
700 );
701 let (result, execute_phase_attribution, response_decode_local_instructions) = result?;
702 let total_local_instructions =
703 compile_local_instructions.saturating_add(execute_local_instructions);
704
705 Ok((
706 result,
707 QueryExecutionAttribution {
708 compile_local_instructions,
709 runtime_local_instructions: execute_phase_attribution.runtime_local_instructions,
710 finalize_local_instructions: execute_phase_attribution.finalize_local_instructions,
711 direct_data_row_scan_local_instructions: execute_phase_attribution
712 .direct_data_row_scan_local_instructions,
713 direct_data_row_key_stream_local_instructions: execute_phase_attribution
714 .direct_data_row_key_stream_local_instructions,
715 direct_data_row_row_read_local_instructions: execute_phase_attribution
716 .direct_data_row_row_read_local_instructions,
717 direct_data_row_key_encode_local_instructions: execute_phase_attribution
718 .direct_data_row_key_encode_local_instructions,
719 direct_data_row_store_get_local_instructions: execute_phase_attribution
720 .direct_data_row_store_get_local_instructions,
721 direct_data_row_order_window_local_instructions: execute_phase_attribution
722 .direct_data_row_order_window_local_instructions,
723 direct_data_row_page_window_local_instructions: execute_phase_attribution
724 .direct_data_row_page_window_local_instructions,
725 grouped_stream_local_instructions: execute_phase_attribution
726 .grouped_stream_local_instructions,
727 grouped_fold_local_instructions: execute_phase_attribution
728 .grouped_fold_local_instructions,
729 grouped_finalize_local_instructions: execute_phase_attribution
730 .grouped_finalize_local_instructions,
731 grouped_count_borrowed_hash_computations: execute_phase_attribution
732 .grouped_count
733 .borrowed_hash_computations,
734 grouped_count_bucket_candidate_checks: execute_phase_attribution
735 .grouped_count
736 .bucket_candidate_checks,
737 grouped_count_existing_group_hits: execute_phase_attribution
738 .grouped_count
739 .existing_group_hits,
740 grouped_count_new_group_inserts: execute_phase_attribution
741 .grouped_count
742 .new_group_inserts,
743 grouped_count_row_materialization_local_instructions: execute_phase_attribution
744 .grouped_count
745 .row_materialization_local_instructions,
746 grouped_count_group_lookup_local_instructions: execute_phase_attribution
747 .grouped_count
748 .group_lookup_local_instructions,
749 grouped_count_existing_group_update_local_instructions: execute_phase_attribution
750 .grouped_count
751 .existing_group_update_local_instructions,
752 grouped_count_new_group_insert_local_instructions: execute_phase_attribution
753 .grouped_count
754 .new_group_insert_local_instructions,
755 response_decode_local_instructions,
756 execute_local_instructions,
757 total_local_instructions,
758 shared_query_plan_cache_hits: cache_attribution.hits,
759 shared_query_plan_cache_misses: cache_attribution.misses,
760 },
761 ))
762 }
763
764 #[doc(hidden)]
767 pub fn execute_query_result<E>(
768 &self,
769 query: &Query<E>,
770 ) -> Result<LoadQueryResult<E>, QueryError>
771 where
772 E: PersistedRow<Canister = C> + EntityValue,
773 {
774 if query.has_grouping() {
775 return self
776 .execute_grouped(query, None)
777 .map(LoadQueryResult::Grouped);
778 }
779
780 self.execute_query(query).map(LoadQueryResult::Rows)
781 }
782
783 #[doc(hidden)]
785 pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
786 where
787 E: PersistedRow<Canister = C> + EntityValue,
788 {
789 if !query.mode().is_delete() {
791 return Err(QueryError::unsupported_query(
792 "delete count execution requires delete query mode",
793 ));
794 }
795
796 let plan = self
798 .compile_query_with_visible_indexes(query)?
799 .into_prepared_execution_plan();
800
801 self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
803 .map_err(QueryError::execute)
804 }
805
806 pub(in crate::db) fn execute_query_dyn<E>(
811 &self,
812 mode: QueryMode,
813 plan: PreparedExecutionPlan<E>,
814 ) -> Result<EntityResponse<E>, QueryError>
815 where
816 E: PersistedRow<Canister = C> + EntityValue,
817 {
818 let result = match mode {
819 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
820 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
821 };
822
823 result.map_err(QueryError::execute)
824 }
825
826 pub(in crate::db) fn execute_load_query_with<E, T>(
829 &self,
830 query: &Query<E>,
831 op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
832 ) -> Result<T, QueryError>
833 where
834 E: PersistedRow<Canister = C> + EntityValue,
835 {
836 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
837
838 self.with_metrics(|| op(self.load_executor::<E>(), plan))
839 .map_err(QueryError::execute)
840 }
841
842 pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
847 where
848 E: EntityKind<Canister = C>,
849 {
850 let compiled = self.compile_query_with_visible_indexes(query)?;
851 let explain = compiled.explain();
852 let plan_hash = compiled.plan_hash_hex();
853
854 let (executable, _) =
855 self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
856 let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
857 let execution_family = match query.mode() {
858 QueryMode::Load(_) => Some(executable.execution_family().map_err(QueryError::execute)?),
859 QueryMode::Delete(_) => None,
860 };
861
862 Ok(QueryTracePlan::new(
863 plan_hash,
864 access_strategy,
865 execution_family,
866 explain,
867 ))
868 }
869
870 pub(crate) fn execute_load_query_paged_with_trace<E>(
872 &self,
873 query: &Query<E>,
874 cursor_token: Option<&str>,
875 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
876 where
877 E: PersistedRow<Canister = C> + EntityValue,
878 {
879 let plan = self
881 .cached_prepared_query_plan_for_entity::<E>(query.structural())?
882 .0;
883 Self::ensure_scalar_paged_execution_family(
884 plan.execution_family().map_err(QueryError::execute)?,
885 )?;
886
887 let cursor_bytes = decode_optional_cursor_token(cursor_token)
889 .map_err(QueryError::from_cursor_plan_error)?;
890 let cursor = plan
891 .prepare_cursor(cursor_bytes.as_deref())
892 .map_err(QueryError::from_executor_plan_error)?;
893
894 let (page, trace) = self
896 .with_metrics(|| {
897 self.load_executor::<E>()
898 .execute_paged_with_cursor_traced(plan, cursor)
899 })
900 .map_err(QueryError::execute)?;
901 let next_cursor = page
902 .next_cursor
903 .map(|token| {
904 let Some(token) = token.as_scalar() else {
905 return Err(QueryError::scalar_paged_emitted_grouped_continuation());
906 };
907
908 token.encode().map_err(|err| {
909 QueryError::serialize_internal(format!(
910 "failed to serialize continuation cursor: {err}"
911 ))
912 })
913 })
914 .transpose()?;
915
916 Ok(PagedLoadExecutionWithTrace::new(
917 page.items,
918 next_cursor,
919 trace,
920 ))
921 }
922
923 pub(in crate::db) fn execute_grouped<E>(
928 &self,
929 query: &Query<E>,
930 cursor_token: Option<&str>,
931 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
932 where
933 E: PersistedRow<Canister = C> + EntityValue,
934 {
935 let plan = self
937 .cached_prepared_query_plan_for_entity::<E>(query.structural())?
938 .0;
939
940 let (page, trace) = self.execute_grouped_plan_with_trace(plan, cursor_token)?;
943
944 Self::finalize_grouped_execution_page(page, trace)
945 }
946
947 fn execute_grouped_plan_with<E, T>(
950 &self,
951 plan: PreparedExecutionPlan<E>,
952 cursor_token: Option<&str>,
953 op: impl FnOnce(
954 LoadExecutor<E>,
955 PreparedExecutionPlan<E>,
956 crate::db::cursor::GroupedPlannedCursor,
957 ) -> Result<T, InternalError>,
958 ) -> Result<T, QueryError>
959 where
960 E: PersistedRow<Canister = C> + EntityValue,
961 {
962 Self::ensure_grouped_execution_family(
964 plan.execution_family().map_err(QueryError::execute)?,
965 )?;
966
967 let cursor = decode_optional_grouped_cursor_token(cursor_token)
969 .map_err(QueryError::from_cursor_plan_error)?;
970 let cursor = plan
971 .prepare_grouped_cursor_token(cursor)
972 .map_err(QueryError::from_executor_plan_error)?;
973
974 self.with_metrics(|| op(self.load_executor::<E>(), plan, cursor))
977 .map_err(QueryError::execute)
978 }
979
980 fn execute_grouped_plan_with_trace<E>(
982 &self,
983 plan: PreparedExecutionPlan<E>,
984 cursor_token: Option<&str>,
985 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
986 where
987 E: PersistedRow<Canister = C> + EntityValue,
988 {
989 self.execute_grouped_plan_with(plan, cursor_token, |executor, plan, cursor| {
990 executor.execute_grouped_paged_with_cursor_traced(plan, cursor)
991 })
992 }
993}
994
995impl QueryPlanCacheKey {
996 const fn from_authority_parts(
1000 authority: crate::db::executor::EntityAuthority,
1001 schema_fingerprint: CommitSchemaFingerprint,
1002 visibility: QueryPlanVisibility,
1003 structural_query: crate::db::query::intent::StructuralQueryCacheKey,
1004 cache_method_version: u8,
1005 ) -> Self {
1006 Self {
1007 cache_method_version,
1008 entity_path: authority.entity_path(),
1009 schema_fingerprint,
1010 visibility,
1011 structural_query,
1012 }
1013 }
1014
1015 #[cfg(test)]
1016 fn for_authority_with_method_version(
1017 authority: crate::db::executor::EntityAuthority,
1018 schema_fingerprint: CommitSchemaFingerprint,
1019 visibility: QueryPlanVisibility,
1020 query: &StructuralQuery,
1021 cache_method_version: u8,
1022 ) -> Self {
1023 Self::from_authority_parts(
1024 authority,
1025 schema_fingerprint,
1026 visibility,
1027 query.structural_cache_key(),
1028 cache_method_version,
1029 )
1030 }
1031
1032 fn for_authority_with_normalized_predicate_and_method_version(
1033 authority: crate::db::executor::EntityAuthority,
1034 schema_fingerprint: CommitSchemaFingerprint,
1035 visibility: QueryPlanVisibility,
1036 query: &StructuralQuery,
1037 normalized_predicate: Option<&crate::db::predicate::Predicate>,
1038 cache_method_version: u8,
1039 ) -> Self {
1040 Self::from_authority_parts(
1041 authority,
1042 schema_fingerprint,
1043 visibility,
1044 query.structural_cache_key_with_normalized_predicate(normalized_predicate),
1045 cache_method_version,
1046 )
1047 }
1048}