1#[cfg(feature = "diagnostics")]
8use crate::db::executor::{
9 GroupedCountAttribution, GroupedExecutePhaseAttribution, ScalarExecutePhaseAttribution,
10};
11use crate::{
12 db::{
13 DbSession, EntityResponse, LoadQueryResult, PagedGroupedExecutionWithTrace,
14 PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
15 TraceReuseArtifactClass, TraceReuseEvent,
16 access::AccessStrategy,
17 commit::CommitSchemaFingerprint,
18 cursor::{
19 CursorPlanError, decode_optional_cursor_token, decode_optional_grouped_cursor_token,
20 },
21 diagnostics::ExecutionTrace,
22 executor::{
23 ExecutionFamily, GroupedCursorPage, LoadExecutor, PreparedExecutionPlan,
24 SharedPreparedExecutionPlan,
25 },
26 predicate::predicate_fingerprint_normalized,
27 query::builder::{
28 PreparedFluentAggregateExplainStrategy, PreparedFluentProjectionStrategy,
29 },
30 query::explain::{
31 ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
32 },
33 query::{
34 intent::{CompiledQuery, PlannedQuery, StructuralQuery},
35 plan::{QueryMode, VisibleIndexes},
36 },
37 },
38 error::InternalError,
39 model::entity::EntityModel,
40 traits::{CanisterKind, EntityKind, EntityValue, Path},
41};
42#[cfg(feature = "diagnostics")]
43use candid::CandidType;
44#[cfg(feature = "diagnostics")]
45use serde::Deserialize;
46use std::{cell::RefCell, collections::HashMap};
47
48const SHARED_QUERY_PLAN_CACHE_METHOD_VERSION: u8 = 2;
51
52#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
53pub(in crate::db) enum QueryPlanVisibility {
54 StoreNotReady,
55 StoreReady,
56}
57
58#[derive(Clone, Debug, Eq, Hash, PartialEq)]
59pub(in crate::db) struct QueryPlanCacheKey {
60 cache_method_version: u8,
61 entity_path: &'static str,
62 schema_fingerprint: CommitSchemaFingerprint,
63 visibility: QueryPlanVisibility,
64 structural_query: crate::db::query::intent::StructuralQueryCacheKey,
65}
66
67pub(in crate::db) type QueryPlanCache = HashMap<QueryPlanCacheKey, SharedPreparedExecutionPlan>;
68
69thread_local! {
70 static QUERY_PLAN_CACHES: RefCell<HashMap<usize, QueryPlanCache>> =
75 RefCell::new(HashMap::default());
76}
77
78#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
79pub(in crate::db) struct QueryPlanCacheAttribution {
80 pub hits: u64,
81 pub misses: u64,
82}
83
84impl QueryPlanCacheAttribution {
85 #[must_use]
86 const fn hit() -> Self {
87 Self { hits: 1, misses: 0 }
88 }
89
90 #[must_use]
91 const fn miss() -> Self {
92 Self { hits: 0, misses: 1 }
93 }
94}
95
96pub(in crate::db::session) const fn query_plan_cache_reuse_event(
99 attribution: QueryPlanCacheAttribution,
100) -> TraceReuseEvent {
101 if attribution.hits > 0 {
102 TraceReuseEvent::hit(TraceReuseArtifactClass::SharedPreparedQueryPlan)
103 } else {
104 TraceReuseEvent::miss(TraceReuseArtifactClass::SharedPreparedQueryPlan)
105 }
106}
107
108#[cfg(feature = "diagnostics")]
115#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
116pub struct QueryExecutionAttribution {
117 pub compile_local_instructions: u64,
118 pub runtime_local_instructions: u64,
119 pub finalize_local_instructions: u64,
120 pub direct_data_row_scan_local_instructions: u64,
121 pub direct_data_row_key_stream_local_instructions: u64,
122 pub direct_data_row_row_read_local_instructions: u64,
123 pub direct_data_row_key_encode_local_instructions: u64,
124 pub direct_data_row_store_get_local_instructions: u64,
125 pub direct_data_row_order_window_local_instructions: u64,
126 pub direct_data_row_page_window_local_instructions: u64,
127 pub grouped_stream_local_instructions: u64,
128 pub grouped_fold_local_instructions: u64,
129 pub grouped_finalize_local_instructions: u64,
130 pub grouped_count_borrowed_hash_computations: u64,
131 pub grouped_count_bucket_candidate_checks: u64,
132 pub grouped_count_existing_group_hits: u64,
133 pub grouped_count_new_group_inserts: u64,
134 pub grouped_count_row_materialization_local_instructions: u64,
135 pub grouped_count_group_lookup_local_instructions: u64,
136 pub grouped_count_existing_group_update_local_instructions: u64,
137 pub grouped_count_new_group_insert_local_instructions: u64,
138 pub response_decode_local_instructions: u64,
139 pub execute_local_instructions: u64,
140 pub total_local_instructions: u64,
141 pub shared_query_plan_cache_hits: u64,
142 pub shared_query_plan_cache_misses: u64,
143}
144
145#[cfg(feature = "diagnostics")]
146#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
147struct QueryExecutePhaseAttribution {
148 runtime_local_instructions: u64,
149 finalize_local_instructions: u64,
150 direct_data_row_scan_local_instructions: u64,
151 direct_data_row_key_stream_local_instructions: u64,
152 direct_data_row_row_read_local_instructions: u64,
153 direct_data_row_key_encode_local_instructions: u64,
154 direct_data_row_store_get_local_instructions: u64,
155 direct_data_row_order_window_local_instructions: u64,
156 direct_data_row_page_window_local_instructions: u64,
157 grouped_stream_local_instructions: u64,
158 grouped_fold_local_instructions: u64,
159 grouped_finalize_local_instructions: u64,
160 grouped_count: GroupedCountAttribution,
161}
162
163#[cfg(feature = "diagnostics")]
164#[expect(
165 clippy::missing_const_for_fn,
166 reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
167)]
168fn read_query_local_instruction_counter() -> u64 {
169 #[cfg(target_arch = "wasm32")]
170 {
171 canic_cdk::api::performance_counter(1)
172 }
173
174 #[cfg(not(target_arch = "wasm32"))]
175 {
176 0
177 }
178}
179
180#[cfg(feature = "diagnostics")]
181fn measure_query_stage<T, E>(run: impl FnOnce() -> Result<T, E>) -> (u64, Result<T, E>) {
182 let start = read_query_local_instruction_counter();
183 let result = run();
184 let delta = read_query_local_instruction_counter().saturating_sub(start);
185
186 (delta, result)
187}
188
189impl<C: CanisterKind> DbSession<C> {
190 #[cfg(feature = "diagnostics")]
191 const fn empty_query_execute_phase_attribution() -> QueryExecutePhaseAttribution {
192 QueryExecutePhaseAttribution {
193 runtime_local_instructions: 0,
194 finalize_local_instructions: 0,
195 direct_data_row_scan_local_instructions: 0,
196 direct_data_row_key_stream_local_instructions: 0,
197 direct_data_row_row_read_local_instructions: 0,
198 direct_data_row_key_encode_local_instructions: 0,
199 direct_data_row_store_get_local_instructions: 0,
200 direct_data_row_order_window_local_instructions: 0,
201 direct_data_row_page_window_local_instructions: 0,
202 grouped_stream_local_instructions: 0,
203 grouped_fold_local_instructions: 0,
204 grouped_finalize_local_instructions: 0,
205 grouped_count: GroupedCountAttribution::none(),
206 }
207 }
208
209 #[cfg(feature = "diagnostics")]
210 const fn scalar_query_execute_phase_attribution(
211 phase: ScalarExecutePhaseAttribution,
212 ) -> QueryExecutePhaseAttribution {
213 QueryExecutePhaseAttribution {
214 runtime_local_instructions: phase.runtime_local_instructions,
215 finalize_local_instructions: phase.finalize_local_instructions,
216 direct_data_row_scan_local_instructions: phase.direct_data_row_scan_local_instructions,
217 direct_data_row_key_stream_local_instructions: phase
218 .direct_data_row_key_stream_local_instructions,
219 direct_data_row_row_read_local_instructions: phase
220 .direct_data_row_row_read_local_instructions,
221 direct_data_row_key_encode_local_instructions: phase
222 .direct_data_row_key_encode_local_instructions,
223 direct_data_row_store_get_local_instructions: phase
224 .direct_data_row_store_get_local_instructions,
225 direct_data_row_order_window_local_instructions: phase
226 .direct_data_row_order_window_local_instructions,
227 direct_data_row_page_window_local_instructions: phase
228 .direct_data_row_page_window_local_instructions,
229 grouped_stream_local_instructions: 0,
230 grouped_fold_local_instructions: 0,
231 grouped_finalize_local_instructions: 0,
232 grouped_count: GroupedCountAttribution::none(),
233 }
234 }
235
236 #[cfg(feature = "diagnostics")]
237 const fn grouped_query_execute_phase_attribution(
238 phase: GroupedExecutePhaseAttribution,
239 ) -> QueryExecutePhaseAttribution {
240 QueryExecutePhaseAttribution {
241 runtime_local_instructions: phase
242 .stream_local_instructions
243 .saturating_add(phase.fold_local_instructions),
244 finalize_local_instructions: phase.finalize_local_instructions,
245 direct_data_row_scan_local_instructions: 0,
246 direct_data_row_key_stream_local_instructions: 0,
247 direct_data_row_row_read_local_instructions: 0,
248 direct_data_row_key_encode_local_instructions: 0,
249 direct_data_row_store_get_local_instructions: 0,
250 direct_data_row_order_window_local_instructions: 0,
251 direct_data_row_page_window_local_instructions: 0,
252 grouped_stream_local_instructions: phase.stream_local_instructions,
253 grouped_fold_local_instructions: phase.fold_local_instructions,
254 grouped_finalize_local_instructions: phase.finalize_local_instructions,
255 grouped_count: phase.grouped_count,
256 }
257 }
258
259 fn with_query_plan_cache<R>(&self, f: impl FnOnce(&mut QueryPlanCache) -> R) -> R {
260 let scope_id = self.db.cache_scope_id();
261
262 QUERY_PLAN_CACHES.with(|caches| {
263 let mut caches = caches.borrow_mut();
264 let cache = caches.entry(scope_id).or_default();
265
266 f(cache)
267 })
268 }
269
270 const fn visible_indexes_for_model(
271 model: &'static EntityModel,
272 visibility: QueryPlanVisibility,
273 ) -> VisibleIndexes<'static> {
274 match visibility {
275 QueryPlanVisibility::StoreReady => VisibleIndexes::planner_visible(model.indexes()),
276 QueryPlanVisibility::StoreNotReady => VisibleIndexes::none(),
277 }
278 }
279
280 #[cfg(test)]
281 pub(in crate::db) fn query_plan_cache_len(&self) -> usize {
282 self.with_query_plan_cache(|cache| cache.len())
283 }
284
285 #[cfg(test)]
286 pub(in crate::db) fn clear_query_plan_cache_for_tests(&self) {
287 self.with_query_plan_cache(QueryPlanCache::clear);
288 }
289
290 pub(in crate::db) fn query_plan_visibility_for_store_path(
291 &self,
292 store_path: &'static str,
293 ) -> Result<QueryPlanVisibility, QueryError> {
294 let store = self
295 .db
296 .recovered_store(store_path)
297 .map_err(QueryError::execute)?;
298 let visibility = if store.index_state() == crate::db::IndexState::Ready {
299 QueryPlanVisibility::StoreReady
300 } else {
301 QueryPlanVisibility::StoreNotReady
302 };
303
304 Ok(visibility)
305 }
306
307 pub(in crate::db) fn cached_shared_query_plan_for_authority(
308 &self,
309 authority: crate::db::executor::EntityAuthority,
310 schema_fingerprint: CommitSchemaFingerprint,
311 query: &StructuralQuery,
312 ) -> Result<(SharedPreparedExecutionPlan, QueryPlanCacheAttribution), QueryError> {
313 let visibility = self.query_plan_visibility_for_store_path(authority.store_path())?;
314 let visible_indexes = Self::visible_indexes_for_model(authority.model(), visibility);
315 let planning_state = query.prepare_scalar_planning_state()?;
316 let normalized_predicate_fingerprint = planning_state
317 .normalized_predicate()
318 .map(predicate_fingerprint_normalized);
319 let cache_key =
320 QueryPlanCacheKey::for_authority_with_normalized_predicate_fingerprint_and_method_version(
321 authority,
322 schema_fingerprint,
323 visibility,
324 query,
325 normalized_predicate_fingerprint,
326 SHARED_QUERY_PLAN_CACHE_METHOD_VERSION,
327 );
328
329 {
330 let cached = self.with_query_plan_cache(|cache| cache.get(&cache_key).cloned());
331 if let Some(prepared_plan) = cached {
332 return Ok((prepared_plan, QueryPlanCacheAttribution::hit()));
333 }
334 }
335
336 let plan = query.build_plan_with_visible_indexes_from_scalar_planning_state(
337 &visible_indexes,
338 planning_state,
339 )?;
340 let prepared_plan = SharedPreparedExecutionPlan::from_plan(authority, plan);
341 self.with_query_plan_cache(|cache| {
342 cache.insert(cache_key, prepared_plan.clone());
343 });
344
345 Ok((prepared_plan, QueryPlanCacheAttribution::miss()))
346 }
347
348 #[cfg(test)]
349 pub(in crate::db) fn query_plan_cache_key_for_tests(
350 authority: crate::db::executor::EntityAuthority,
351 schema_fingerprint: CommitSchemaFingerprint,
352 visibility: QueryPlanVisibility,
353 query: &StructuralQuery,
354 cache_method_version: u8,
355 ) -> QueryPlanCacheKey {
356 QueryPlanCacheKey::for_authority_with_method_version(
357 authority,
358 schema_fingerprint,
359 visibility,
360 query,
361 cache_method_version,
362 )
363 }
364
365 fn with_query_visible_indexes<E, T>(
368 &self,
369 query: &Query<E>,
370 op: impl FnOnce(
371 &Query<E>,
372 &crate::db::query::plan::VisibleIndexes<'static>,
373 ) -> Result<T, QueryError>,
374 ) -> Result<T, QueryError>
375 where
376 E: EntityKind<Canister = C>,
377 {
378 let visibility = self.query_plan_visibility_for_store_path(E::Store::PATH)?;
379 let visible_indexes = Self::visible_indexes_for_model(E::MODEL, visibility);
380
381 op(query, &visible_indexes)
382 }
383
384 pub(in crate::db::session) fn cached_prepared_query_plan_for_entity<E>(
385 &self,
386 query: &Query<E>,
387 ) -> Result<(PreparedExecutionPlan<E>, QueryPlanCacheAttribution), QueryError>
388 where
389 E: EntityKind<Canister = C>,
390 {
391 let (prepared_plan, attribution) = self.cached_shared_query_plan_for_entity::<E>(query)?;
392
393 Ok((prepared_plan.typed_clone::<E>(), attribution))
394 }
395
396 fn cached_shared_query_plan_for_entity<E>(
399 &self,
400 query: &Query<E>,
401 ) -> Result<(SharedPreparedExecutionPlan, QueryPlanCacheAttribution), QueryError>
402 where
403 E: EntityKind<Canister = C>,
404 {
405 self.cached_shared_query_plan_for_authority(
406 crate::db::executor::EntityAuthority::for_type::<E>(),
407 crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
408 query.structural(),
409 )
410 }
411
412 fn map_cached_shared_query_plan_for_entity<E, T>(
415 &self,
416 query: &Query<E>,
417 map: impl FnOnce(SharedPreparedExecutionPlan) -> T,
418 ) -> Result<T, QueryError>
419 where
420 E: EntityKind<Canister = C>,
421 {
422 let (prepared_plan, _) = self.cached_shared_query_plan_for_entity::<E>(query)?;
423
424 Ok(map(prepared_plan))
425 }
426
427 pub(in crate::db) fn compile_query_with_visible_indexes<E>(
430 &self,
431 query: &Query<E>,
432 ) -> Result<CompiledQuery<E>, QueryError>
433 where
434 E: EntityKind<Canister = C>,
435 {
436 self.map_cached_shared_query_plan_for_entity(query, CompiledQuery::<E>::from_prepared_plan)
437 }
438
439 pub(in crate::db) fn planned_query_with_visible_indexes<E>(
442 &self,
443 query: &Query<E>,
444 ) -> Result<PlannedQuery<E>, QueryError>
445 where
446 E: EntityKind<Canister = C>,
447 {
448 self.map_cached_shared_query_plan_for_entity(query, PlannedQuery::<E>::from_prepared_plan)
449 }
450
451 pub(in crate::db) fn explain_query_with_visible_indexes<E>(
453 &self,
454 query: &Query<E>,
455 ) -> Result<ExplainPlan, QueryError>
456 where
457 E: EntityKind<Canister = C>,
458 {
459 self.with_query_visible_indexes(query, Query::<E>::explain_with_visible_indexes)
460 }
461
462 pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
465 &self,
466 query: &Query<E>,
467 ) -> Result<String, QueryError>
468 where
469 E: EntityKind<Canister = C>,
470 {
471 self.with_query_visible_indexes(query, Query::<E>::plan_hash_hex_with_visible_indexes)
472 }
473
474 pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
477 &self,
478 query: &Query<E>,
479 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
480 where
481 E: EntityValue + EntityKind<Canister = C>,
482 {
483 self.with_query_visible_indexes(query, Query::<E>::explain_execution_with_visible_indexes)
484 }
485
486 pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
489 &self,
490 query: &Query<E>,
491 ) -> Result<String, QueryError>
492 where
493 E: EntityValue + EntityKind<Canister = C>,
494 {
495 self.with_query_visible_indexes(query, |query, visible_indexes| {
496 let (prepared_plan, cache_attribution) =
497 self.cached_prepared_query_plan_for_entity(query)?;
498 let mut plan = prepared_plan.logical_plan().clone();
499
500 plan.finalize_access_choice_for_model_with_indexes(
504 query.structural().model(),
505 visible_indexes.as_slice(),
506 );
507
508 query
509 .structural()
510 .finalized_execution_diagnostics_from_plan_with_descriptor_mutator(
511 &plan,
512 Some(query_plan_cache_reuse_event(cache_attribution)),
513 |_| {},
514 )
515 .map(|diagnostics| diagnostics.render_text_verbose())
516 })
517 }
518
519 pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
522 &self,
523 query: &Query<E>,
524 strategy: &S,
525 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
526 where
527 E: EntityValue + EntityKind<Canister = C>,
528 S: PreparedFluentAggregateExplainStrategy,
529 {
530 self.with_query_visible_indexes(query, |query, visible_indexes| {
531 query
532 .explain_prepared_aggregate_terminal_with_visible_indexes(visible_indexes, strategy)
533 })
534 }
535
536 pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
539 &self,
540 query: &Query<E>,
541 target_field: &str,
542 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
543 where
544 E: EntityValue + EntityKind<Canister = C>,
545 {
546 self.with_query_visible_indexes(query, |query, visible_indexes| {
547 query.explain_bytes_by_with_visible_indexes(visible_indexes, target_field)
548 })
549 }
550
551 pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
554 &self,
555 query: &Query<E>,
556 strategy: &PreparedFluentProjectionStrategy,
557 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
558 where
559 E: EntityValue + EntityKind<Canister = C>,
560 {
561 self.with_query_visible_indexes(query, |query, visible_indexes| {
562 query.explain_prepared_projection_terminal_with_visible_indexes(
563 visible_indexes,
564 strategy,
565 )
566 })
567 }
568
569 fn ensure_scalar_paged_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
572 match family {
573 ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
574 CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
575 )),
576 ExecutionFamily::Ordered => Ok(()),
577 ExecutionFamily::Grouped => Err(QueryError::invariant(
578 "grouped queries execute via execute(), not page().execute()",
579 )),
580 }
581 }
582
583 fn ensure_grouped_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
586 match family {
587 ExecutionFamily::Grouped => Ok(()),
588 ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
589 "grouped execution requires grouped logical plans",
590 )),
591 }
592 }
593
594 fn finalize_grouped_execution_page(
598 page: GroupedCursorPage,
599 trace: Option<ExecutionTrace>,
600 ) -> Result<PagedGroupedExecutionWithTrace, QueryError> {
601 let next_cursor = page
602 .next_cursor
603 .map(|token| {
604 let Some(token) = token.as_grouped() else {
605 return Err(QueryError::grouped_paged_emitted_scalar_continuation());
606 };
607
608 token.encode().map_err(|err| {
609 QueryError::serialize_internal(format!(
610 "failed to serialize grouped continuation cursor: {err}"
611 ))
612 })
613 })
614 .transpose()?;
615
616 Ok(PagedGroupedExecutionWithTrace::new(
617 page.rows
618 .into_iter()
619 .map(crate::db::GroupedRow::from_runtime_row)
620 .collect(),
621 next_cursor,
622 trace,
623 ))
624 }
625
626 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
628 where
629 E: PersistedRow<Canister = C> + EntityValue,
630 {
631 let mode = query.mode();
633 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
634
635 self.execute_query_dyn(mode, plan)
637 }
638
639 #[cfg(feature = "diagnostics")]
642 #[doc(hidden)]
643 #[expect(
644 clippy::too_many_lines,
645 reason = "the diagnostics-only attribution path keeps grouped and scalar execution on one explicit compile/execute accounting seam"
646 )]
647 pub fn execute_query_result_with_attribution<E>(
648 &self,
649 query: &Query<E>,
650 ) -> Result<(LoadQueryResult<E>, QueryExecutionAttribution), QueryError>
651 where
652 E: PersistedRow<Canister = C> + EntityValue,
653 {
654 let (compile_local_instructions, plan_and_cache) =
659 measure_query_stage(|| self.cached_prepared_query_plan_for_entity::<E>(query));
660 let (plan, cache_attribution) = plan_and_cache?;
661
662 let (execute_local_instructions, result) = measure_query_stage(
665 || -> Result<(LoadQueryResult<E>, QueryExecutePhaseAttribution, u64), QueryError> {
666 if query.has_grouping() {
667 let (page, trace, phase_attribution) =
668 self.execute_grouped_plan_with(plan, None, |executor, plan, cursor| {
669 executor
670 .execute_grouped_paged_with_cursor_traced_with_phase_attribution(
671 plan, cursor,
672 )
673 })?;
674 let grouped = Self::finalize_grouped_execution_page(page, trace)?;
675
676 Ok((
677 LoadQueryResult::Grouped(grouped),
678 Self::grouped_query_execute_phase_attribution(phase_attribution),
679 0,
680 ))
681 } else {
682 match query.mode() {
683 QueryMode::Load(_) => {
684 let (rows, phase_attribution, response_decode_local_instructions) =
685 self.load_executor::<E>()
686 .execute_with_phase_attribution(plan)
687 .map_err(QueryError::execute)?;
688
689 Ok((
690 LoadQueryResult::Rows(rows),
691 Self::scalar_query_execute_phase_attribution(phase_attribution),
692 response_decode_local_instructions,
693 ))
694 }
695 QueryMode::Delete(_) => {
696 let result = self.execute_query_dyn(query.mode(), plan)?;
697
698 Ok((
699 LoadQueryResult::Rows(result),
700 Self::empty_query_execute_phase_attribution(),
701 0,
702 ))
703 }
704 }
705 }
706 },
707 );
708 let (result, execute_phase_attribution, response_decode_local_instructions) = result?;
709 let total_local_instructions =
710 compile_local_instructions.saturating_add(execute_local_instructions);
711
712 Ok((
713 result,
714 QueryExecutionAttribution {
715 compile_local_instructions,
716 runtime_local_instructions: execute_phase_attribution.runtime_local_instructions,
717 finalize_local_instructions: execute_phase_attribution.finalize_local_instructions,
718 direct_data_row_scan_local_instructions: execute_phase_attribution
719 .direct_data_row_scan_local_instructions,
720 direct_data_row_key_stream_local_instructions: execute_phase_attribution
721 .direct_data_row_key_stream_local_instructions,
722 direct_data_row_row_read_local_instructions: execute_phase_attribution
723 .direct_data_row_row_read_local_instructions,
724 direct_data_row_key_encode_local_instructions: execute_phase_attribution
725 .direct_data_row_key_encode_local_instructions,
726 direct_data_row_store_get_local_instructions: execute_phase_attribution
727 .direct_data_row_store_get_local_instructions,
728 direct_data_row_order_window_local_instructions: execute_phase_attribution
729 .direct_data_row_order_window_local_instructions,
730 direct_data_row_page_window_local_instructions: execute_phase_attribution
731 .direct_data_row_page_window_local_instructions,
732 grouped_stream_local_instructions: execute_phase_attribution
733 .grouped_stream_local_instructions,
734 grouped_fold_local_instructions: execute_phase_attribution
735 .grouped_fold_local_instructions,
736 grouped_finalize_local_instructions: execute_phase_attribution
737 .grouped_finalize_local_instructions,
738 grouped_count_borrowed_hash_computations: execute_phase_attribution
739 .grouped_count
740 .borrowed_hash_computations,
741 grouped_count_bucket_candidate_checks: execute_phase_attribution
742 .grouped_count
743 .bucket_candidate_checks,
744 grouped_count_existing_group_hits: execute_phase_attribution
745 .grouped_count
746 .existing_group_hits,
747 grouped_count_new_group_inserts: execute_phase_attribution
748 .grouped_count
749 .new_group_inserts,
750 grouped_count_row_materialization_local_instructions: execute_phase_attribution
751 .grouped_count
752 .row_materialization_local_instructions,
753 grouped_count_group_lookup_local_instructions: execute_phase_attribution
754 .grouped_count
755 .group_lookup_local_instructions,
756 grouped_count_existing_group_update_local_instructions: execute_phase_attribution
757 .grouped_count
758 .existing_group_update_local_instructions,
759 grouped_count_new_group_insert_local_instructions: execute_phase_attribution
760 .grouped_count
761 .new_group_insert_local_instructions,
762 response_decode_local_instructions,
763 execute_local_instructions,
764 total_local_instructions,
765 shared_query_plan_cache_hits: cache_attribution.hits,
766 shared_query_plan_cache_misses: cache_attribution.misses,
767 },
768 ))
769 }
770
771 #[doc(hidden)]
774 pub fn execute_query_result<E>(
775 &self,
776 query: &Query<E>,
777 ) -> Result<LoadQueryResult<E>, QueryError>
778 where
779 E: PersistedRow<Canister = C> + EntityValue,
780 {
781 if query.has_grouping() {
782 return self
783 .execute_grouped(query, None)
784 .map(LoadQueryResult::Grouped);
785 }
786
787 self.execute_query(query).map(LoadQueryResult::Rows)
788 }
789
790 #[doc(hidden)]
792 pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
793 where
794 E: PersistedRow<Canister = C> + EntityValue,
795 {
796 if !query.mode().is_delete() {
798 return Err(QueryError::unsupported_query(
799 "delete count execution requires delete query mode",
800 ));
801 }
802
803 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
807
808 self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
810 .map_err(QueryError::execute)
811 }
812
813 pub(in crate::db) fn execute_query_dyn<E>(
818 &self,
819 mode: QueryMode,
820 plan: PreparedExecutionPlan<E>,
821 ) -> Result<EntityResponse<E>, QueryError>
822 where
823 E: PersistedRow<Canister = C> + EntityValue,
824 {
825 let result = match mode {
826 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
827 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
828 };
829
830 result.map_err(QueryError::execute)
831 }
832
833 pub(in crate::db) fn execute_load_query_with<E, T>(
836 &self,
837 query: &Query<E>,
838 op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
839 ) -> Result<T, QueryError>
840 where
841 E: PersistedRow<Canister = C> + EntityValue,
842 {
843 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
844
845 self.with_metrics(|| op(self.load_executor::<E>(), plan))
846 .map_err(QueryError::execute)
847 }
848
849 pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
854 where
855 E: EntityKind<Canister = C>,
856 {
857 let visibility = self.query_plan_visibility_for_store_path(E::Store::PATH)?;
858 let visible_indexes = Self::visible_indexes_for_model(E::MODEL, visibility);
859 let (prepared_plan, cache_attribution) =
860 self.cached_prepared_query_plan_for_entity::<E>(query)?;
861 let logical_plan = prepared_plan.logical_plan();
862 let explain = logical_plan.explain();
863 let plan_hash = query.plan_hash_hex_with_visible_indexes(&visible_indexes)?;
864 let access_strategy = AccessStrategy::from_plan(prepared_plan.access()).debug_summary();
865 let execution_family = match query.mode() {
866 QueryMode::Load(_) => Some(
867 prepared_plan
868 .execution_family()
869 .map_err(QueryError::execute)?,
870 ),
871 QueryMode::Delete(_) => None,
872 };
873 let reuse = query_plan_cache_reuse_event(cache_attribution);
874
875 Ok(QueryTracePlan::new(
876 plan_hash,
877 access_strategy,
878 execution_family,
879 reuse,
880 explain,
881 ))
882 }
883
884 pub(crate) fn execute_load_query_paged_with_trace<E>(
886 &self,
887 query: &Query<E>,
888 cursor_token: Option<&str>,
889 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
890 where
891 E: PersistedRow<Canister = C> + EntityValue,
892 {
893 let plan = self.cached_prepared_query_plan_for_entity::<E>(query)?.0;
895 Self::ensure_scalar_paged_execution_family(
896 plan.execution_family().map_err(QueryError::execute)?,
897 )?;
898
899 let cursor_bytes = decode_optional_cursor_token(cursor_token)
901 .map_err(QueryError::from_cursor_plan_error)?;
902 let cursor = plan
903 .prepare_cursor(cursor_bytes.as_deref())
904 .map_err(QueryError::from_executor_plan_error)?;
905
906 let (page, trace) = self
908 .with_metrics(|| {
909 self.load_executor::<E>()
910 .execute_paged_with_cursor_traced(plan, cursor)
911 })
912 .map_err(QueryError::execute)?;
913 let next_cursor = page
914 .next_cursor
915 .map(|token| {
916 let Some(token) = token.as_scalar() else {
917 return Err(QueryError::scalar_paged_emitted_grouped_continuation());
918 };
919
920 token.encode().map_err(|err| {
921 QueryError::serialize_internal(format!(
922 "failed to serialize continuation cursor: {err}"
923 ))
924 })
925 })
926 .transpose()?;
927
928 Ok(PagedLoadExecutionWithTrace::new(
929 page.items,
930 next_cursor,
931 trace,
932 ))
933 }
934
935 pub(in crate::db) fn execute_grouped<E>(
940 &self,
941 query: &Query<E>,
942 cursor_token: Option<&str>,
943 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
944 where
945 E: PersistedRow<Canister = C> + EntityValue,
946 {
947 let plan = self.cached_prepared_query_plan_for_entity::<E>(query)?.0;
949
950 let (page, trace) = self.execute_grouped_plan_with_trace(plan, cursor_token)?;
953
954 Self::finalize_grouped_execution_page(page, trace)
955 }
956
957 fn execute_grouped_plan_with<E, T>(
960 &self,
961 plan: PreparedExecutionPlan<E>,
962 cursor_token: Option<&str>,
963 op: impl FnOnce(
964 LoadExecutor<E>,
965 PreparedExecutionPlan<E>,
966 crate::db::cursor::GroupedPlannedCursor,
967 ) -> Result<T, InternalError>,
968 ) -> Result<T, QueryError>
969 where
970 E: PersistedRow<Canister = C> + EntityValue,
971 {
972 Self::ensure_grouped_execution_family(
974 plan.execution_family().map_err(QueryError::execute)?,
975 )?;
976
977 let cursor = decode_optional_grouped_cursor_token(cursor_token)
979 .map_err(QueryError::from_cursor_plan_error)?;
980 let cursor = plan
981 .prepare_grouped_cursor_token(cursor)
982 .map_err(QueryError::from_executor_plan_error)?;
983
984 self.with_metrics(|| op(self.load_executor::<E>(), plan, cursor))
987 .map_err(QueryError::execute)
988 }
989
990 fn execute_grouped_plan_with_trace<E>(
992 &self,
993 plan: PreparedExecutionPlan<E>,
994 cursor_token: Option<&str>,
995 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
996 where
997 E: PersistedRow<Canister = C> + EntityValue,
998 {
999 self.execute_grouped_plan_with(plan, cursor_token, |executor, plan, cursor| {
1000 executor.execute_grouped_paged_with_cursor_traced(plan, cursor)
1001 })
1002 }
1003}
1004
1005impl QueryPlanCacheKey {
1006 const fn from_authority_parts(
1010 authority: crate::db::executor::EntityAuthority,
1011 schema_fingerprint: CommitSchemaFingerprint,
1012 visibility: QueryPlanVisibility,
1013 structural_query: crate::db::query::intent::StructuralQueryCacheKey,
1014 cache_method_version: u8,
1015 ) -> Self {
1016 Self {
1017 cache_method_version,
1018 entity_path: authority.entity_path(),
1019 schema_fingerprint,
1020 visibility,
1021 structural_query,
1022 }
1023 }
1024
1025 #[cfg(test)]
1026 fn for_authority_with_method_version(
1027 authority: crate::db::executor::EntityAuthority,
1028 schema_fingerprint: CommitSchemaFingerprint,
1029 visibility: QueryPlanVisibility,
1030 query: &StructuralQuery,
1031 cache_method_version: u8,
1032 ) -> Self {
1033 Self::from_authority_parts(
1034 authority,
1035 schema_fingerprint,
1036 visibility,
1037 query.structural_cache_key(),
1038 cache_method_version,
1039 )
1040 }
1041
1042 fn for_authority_with_normalized_predicate_fingerprint_and_method_version(
1043 authority: crate::db::executor::EntityAuthority,
1044 schema_fingerprint: CommitSchemaFingerprint,
1045 visibility: QueryPlanVisibility,
1046 query: &StructuralQuery,
1047 normalized_predicate_fingerprint: Option<[u8; 32]>,
1048 cache_method_version: u8,
1049 ) -> Self {
1050 Self::from_authority_parts(
1051 authority,
1052 schema_fingerprint,
1053 visibility,
1054 query.structural_cache_key_with_normalized_predicate_fingerprint(
1055 normalized_predicate_fingerprint,
1056 ),
1057 cache_method_version,
1058 )
1059 }
1060}