1#[cfg(feature = "diagnostics")]
8use crate::db::executor::{
9 GroupedCountAttribution, GroupedExecutePhaseAttribution, ScalarExecutePhaseAttribution,
10};
11use crate::{
12 db::{
13 DbSession, EntityResponse, LoadQueryResult, PagedGroupedExecutionWithTrace,
14 PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
15 TraceReuseArtifactClass, TraceReuseEvent,
16 access::AccessStrategy,
17 commit::CommitSchemaFingerprint,
18 cursor::{
19 CursorPlanError, decode_optional_cursor_token, decode_optional_grouped_cursor_token,
20 },
21 diagnostics::ExecutionTrace,
22 executor::{
23 ExecutionFamily, GroupedCursorPage, LoadExecutor, PreparedExecutionPlan,
24 SharedPreparedExecutionPlan,
25 },
26 predicate::predicate_fingerprint_normalized,
27 query::builder::{
28 PreparedFluentAggregateExplainStrategy, PreparedFluentProjectionStrategy,
29 },
30 query::explain::{
31 ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
32 },
33 query::{
34 intent::{CompiledQuery, PlannedQuery, StructuralQuery},
35 plan::{QueryMode, VisibleIndexes},
36 },
37 },
38 error::InternalError,
39 model::entity::EntityModel,
40 traits::{CanisterKind, EntityKind, EntityValue, Path},
41};
42#[cfg(feature = "diagnostics")]
43use candid::CandidType;
44#[cfg(feature = "diagnostics")]
45use serde::Deserialize;
46use std::{cell::RefCell, collections::HashMap};
47
48const SHARED_QUERY_PLAN_CACHE_METHOD_VERSION: u8 = 2;
51
52#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
53pub(in crate::db) enum QueryPlanVisibility {
54 StoreNotReady,
55 StoreReady,
56}
57
58#[derive(Clone, Debug, Eq, Hash, PartialEq)]
59pub(in crate::db) struct QueryPlanCacheKey {
60 cache_method_version: u8,
61 entity_path: &'static str,
62 schema_fingerprint: CommitSchemaFingerprint,
63 visibility: QueryPlanVisibility,
64 structural_query: crate::db::query::intent::StructuralQueryCacheKey,
65}
66
67pub(in crate::db) type QueryPlanCache = HashMap<QueryPlanCacheKey, SharedPreparedExecutionPlan>;
68
69thread_local! {
70 static QUERY_PLAN_CACHES: RefCell<HashMap<usize, QueryPlanCache>> =
75 RefCell::new(HashMap::default());
76}
77
78#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
79pub(in crate::db) struct QueryPlanCacheAttribution {
80 pub hits: u64,
81 pub misses: u64,
82}
83
84impl QueryPlanCacheAttribution {
85 #[must_use]
86 const fn hit() -> Self {
87 Self { hits: 1, misses: 0 }
88 }
89
90 #[must_use]
91 const fn miss() -> Self {
92 Self { hits: 0, misses: 1 }
93 }
94}
95
96pub(in crate::db::session) const fn query_plan_cache_reuse_event(
99 attribution: QueryPlanCacheAttribution,
100) -> TraceReuseEvent {
101 if attribution.hits > 0 {
102 TraceReuseEvent::hit(TraceReuseArtifactClass::SharedPreparedQueryPlan)
103 } else {
104 TraceReuseEvent::miss(TraceReuseArtifactClass::SharedPreparedQueryPlan)
105 }
106}
107
108#[cfg(feature = "diagnostics")]
115#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
116pub struct QueryExecutionAttribution {
117 pub compile_local_instructions: u64,
118 pub runtime_local_instructions: u64,
119 pub finalize_local_instructions: u64,
120 pub direct_data_row_scan_local_instructions: u64,
121 pub direct_data_row_key_stream_local_instructions: u64,
122 pub direct_data_row_row_read_local_instructions: u64,
123 pub direct_data_row_key_encode_local_instructions: u64,
124 pub direct_data_row_store_get_local_instructions: u64,
125 pub direct_data_row_order_window_local_instructions: u64,
126 pub direct_data_row_page_window_local_instructions: u64,
127 pub grouped_stream_local_instructions: u64,
128 pub grouped_fold_local_instructions: u64,
129 pub grouped_finalize_local_instructions: u64,
130 pub grouped_count_borrowed_hash_computations: u64,
131 pub grouped_count_bucket_candidate_checks: u64,
132 pub grouped_count_existing_group_hits: u64,
133 pub grouped_count_new_group_inserts: u64,
134 pub grouped_count_row_materialization_local_instructions: u64,
135 pub grouped_count_group_lookup_local_instructions: u64,
136 pub grouped_count_existing_group_update_local_instructions: u64,
137 pub grouped_count_new_group_insert_local_instructions: u64,
138 pub response_decode_local_instructions: u64,
139 pub execute_local_instructions: u64,
140 pub total_local_instructions: u64,
141 pub shared_query_plan_cache_hits: u64,
142 pub shared_query_plan_cache_misses: u64,
143}
144
145#[cfg(feature = "diagnostics")]
146#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
147struct QueryExecutePhaseAttribution {
148 runtime_local_instructions: u64,
149 finalize_local_instructions: u64,
150 direct_data_row_scan_local_instructions: u64,
151 direct_data_row_key_stream_local_instructions: u64,
152 direct_data_row_row_read_local_instructions: u64,
153 direct_data_row_key_encode_local_instructions: u64,
154 direct_data_row_store_get_local_instructions: u64,
155 direct_data_row_order_window_local_instructions: u64,
156 direct_data_row_page_window_local_instructions: u64,
157 grouped_stream_local_instructions: u64,
158 grouped_fold_local_instructions: u64,
159 grouped_finalize_local_instructions: u64,
160 grouped_count: GroupedCountAttribution,
161}
162
163#[cfg(feature = "diagnostics")]
164#[expect(
165 clippy::missing_const_for_fn,
166 reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
167)]
168fn read_query_local_instruction_counter() -> u64 {
169 #[cfg(target_arch = "wasm32")]
170 {
171 canic_cdk::api::performance_counter(1)
172 }
173
174 #[cfg(not(target_arch = "wasm32"))]
175 {
176 0
177 }
178}
179
180#[cfg(feature = "diagnostics")]
181fn measure_query_stage<T, E>(run: impl FnOnce() -> Result<T, E>) -> (u64, Result<T, E>) {
182 let start = read_query_local_instruction_counter();
183 let result = run();
184 let delta = read_query_local_instruction_counter().saturating_sub(start);
185
186 (delta, result)
187}
188
189impl<C: CanisterKind> DbSession<C> {
190 #[cfg(feature = "diagnostics")]
191 const fn empty_query_execute_phase_attribution() -> QueryExecutePhaseAttribution {
192 QueryExecutePhaseAttribution {
193 runtime_local_instructions: 0,
194 finalize_local_instructions: 0,
195 direct_data_row_scan_local_instructions: 0,
196 direct_data_row_key_stream_local_instructions: 0,
197 direct_data_row_row_read_local_instructions: 0,
198 direct_data_row_key_encode_local_instructions: 0,
199 direct_data_row_store_get_local_instructions: 0,
200 direct_data_row_order_window_local_instructions: 0,
201 direct_data_row_page_window_local_instructions: 0,
202 grouped_stream_local_instructions: 0,
203 grouped_fold_local_instructions: 0,
204 grouped_finalize_local_instructions: 0,
205 grouped_count: GroupedCountAttribution::none(),
206 }
207 }
208
209 #[cfg(feature = "diagnostics")]
210 const fn scalar_query_execute_phase_attribution(
211 phase: ScalarExecutePhaseAttribution,
212 ) -> QueryExecutePhaseAttribution {
213 QueryExecutePhaseAttribution {
214 runtime_local_instructions: phase.runtime_local_instructions,
215 finalize_local_instructions: phase.finalize_local_instructions,
216 direct_data_row_scan_local_instructions: phase.direct_data_row_scan_local_instructions,
217 direct_data_row_key_stream_local_instructions: phase
218 .direct_data_row_key_stream_local_instructions,
219 direct_data_row_row_read_local_instructions: phase
220 .direct_data_row_row_read_local_instructions,
221 direct_data_row_key_encode_local_instructions: phase
222 .direct_data_row_key_encode_local_instructions,
223 direct_data_row_store_get_local_instructions: phase
224 .direct_data_row_store_get_local_instructions,
225 direct_data_row_order_window_local_instructions: phase
226 .direct_data_row_order_window_local_instructions,
227 direct_data_row_page_window_local_instructions: phase
228 .direct_data_row_page_window_local_instructions,
229 grouped_stream_local_instructions: 0,
230 grouped_fold_local_instructions: 0,
231 grouped_finalize_local_instructions: 0,
232 grouped_count: GroupedCountAttribution::none(),
233 }
234 }
235
236 #[cfg(feature = "diagnostics")]
237 const fn grouped_query_execute_phase_attribution(
238 phase: GroupedExecutePhaseAttribution,
239 ) -> QueryExecutePhaseAttribution {
240 QueryExecutePhaseAttribution {
241 runtime_local_instructions: phase
242 .stream_local_instructions
243 .saturating_add(phase.fold_local_instructions),
244 finalize_local_instructions: phase.finalize_local_instructions,
245 direct_data_row_scan_local_instructions: 0,
246 direct_data_row_key_stream_local_instructions: 0,
247 direct_data_row_row_read_local_instructions: 0,
248 direct_data_row_key_encode_local_instructions: 0,
249 direct_data_row_store_get_local_instructions: 0,
250 direct_data_row_order_window_local_instructions: 0,
251 direct_data_row_page_window_local_instructions: 0,
252 grouped_stream_local_instructions: phase.stream_local_instructions,
253 grouped_fold_local_instructions: phase.fold_local_instructions,
254 grouped_finalize_local_instructions: phase.finalize_local_instructions,
255 grouped_count: phase.grouped_count,
256 }
257 }
258
259 fn with_query_plan_cache<R>(&self, f: impl FnOnce(&mut QueryPlanCache) -> R) -> R {
260 let scope_id = self.db.cache_scope_id();
261
262 QUERY_PLAN_CACHES.with(|caches| {
263 let mut caches = caches.borrow_mut();
264 let cache = caches.entry(scope_id).or_default();
265
266 f(cache)
267 })
268 }
269
270 const fn visible_indexes_for_model(
271 model: &'static EntityModel,
272 visibility: QueryPlanVisibility,
273 ) -> VisibleIndexes<'static> {
274 match visibility {
275 QueryPlanVisibility::StoreReady => VisibleIndexes::planner_visible(model.indexes()),
276 QueryPlanVisibility::StoreNotReady => VisibleIndexes::none(),
277 }
278 }
279
280 #[cfg(test)]
281 pub(in crate::db) fn query_plan_cache_len(&self) -> usize {
282 self.with_query_plan_cache(|cache| cache.len())
283 }
284
285 #[cfg(test)]
286 pub(in crate::db) fn clear_query_plan_cache_for_tests(&self) {
287 self.with_query_plan_cache(QueryPlanCache::clear);
288 }
289
290 pub(in crate::db) fn query_plan_visibility_for_store_path(
291 &self,
292 store_path: &'static str,
293 ) -> Result<QueryPlanVisibility, QueryError> {
294 let store = self
295 .db
296 .recovered_store(store_path)
297 .map_err(QueryError::execute)?;
298 let visibility = if store.index_state() == crate::db::IndexState::Ready {
299 QueryPlanVisibility::StoreReady
300 } else {
301 QueryPlanVisibility::StoreNotReady
302 };
303
304 Ok(visibility)
305 }
306
307 pub(in crate::db) fn cached_shared_query_plan_for_authority(
308 &self,
309 authority: crate::db::executor::EntityAuthority,
310 schema_fingerprint: CommitSchemaFingerprint,
311 query: &StructuralQuery,
312 ) -> Result<(SharedPreparedExecutionPlan, QueryPlanCacheAttribution), QueryError> {
313 let visibility = self.query_plan_visibility_for_store_path(authority.store_path())?;
314 let visible_indexes = Self::visible_indexes_for_model(authority.model(), visibility);
315 let planning_state = query.prepare_scalar_planning_state()?;
316 let normalized_predicate_fingerprint = planning_state
317 .normalized_predicate()
318 .map(predicate_fingerprint_normalized);
319 let cache_key =
320 QueryPlanCacheKey::for_authority_with_normalized_predicate_fingerprint_and_method_version(
321 authority,
322 schema_fingerprint,
323 visibility,
324 query,
325 normalized_predicate_fingerprint,
326 SHARED_QUERY_PLAN_CACHE_METHOD_VERSION,
327 );
328
329 {
330 let cached = self.with_query_plan_cache(|cache| cache.get(&cache_key).cloned());
331 if let Some(prepared_plan) = cached {
332 return Ok((prepared_plan, QueryPlanCacheAttribution::hit()));
333 }
334 }
335
336 let plan = query.build_plan_with_visible_indexes_from_scalar_planning_state(
337 &visible_indexes,
338 planning_state,
339 )?;
340 let prepared_plan = SharedPreparedExecutionPlan::from_plan(authority, plan);
341 self.with_query_plan_cache(|cache| {
342 cache.insert(cache_key, prepared_plan.clone());
343 });
344
345 Ok((prepared_plan, QueryPlanCacheAttribution::miss()))
346 }
347
348 #[cfg(test)]
349 pub(in crate::db) fn query_plan_cache_key_for_tests(
350 authority: crate::db::executor::EntityAuthority,
351 schema_fingerprint: CommitSchemaFingerprint,
352 visibility: QueryPlanVisibility,
353 query: &StructuralQuery,
354 cache_method_version: u8,
355 ) -> QueryPlanCacheKey {
356 QueryPlanCacheKey::for_authority_with_method_version(
357 authority,
358 schema_fingerprint,
359 visibility,
360 query,
361 cache_method_version,
362 )
363 }
364
365 fn with_query_visible_indexes<E, T>(
368 &self,
369 query: &Query<E>,
370 op: impl FnOnce(
371 &Query<E>,
372 &crate::db::query::plan::VisibleIndexes<'static>,
373 ) -> Result<T, QueryError>,
374 ) -> Result<T, QueryError>
375 where
376 E: EntityKind<Canister = C>,
377 {
378 let visibility = self.query_plan_visibility_for_store_path(E::Store::PATH)?;
379 let visible_indexes = Self::visible_indexes_for_model(E::MODEL, visibility);
380
381 op(query, &visible_indexes)
382 }
383
384 pub(in crate::db::session) fn cached_prepared_query_plan_for_entity<E>(
385 &self,
386 query: &Query<E>,
387 ) -> Result<(PreparedExecutionPlan<E>, QueryPlanCacheAttribution), QueryError>
388 where
389 E: EntityKind<Canister = C>,
390 {
391 let (prepared_plan, attribution) = self.cached_shared_query_plan_for_entity::<E>(query)?;
392
393 Ok((prepared_plan.typed_clone::<E>(), attribution))
394 }
395
396 fn cached_shared_query_plan_for_entity<E>(
399 &self,
400 query: &Query<E>,
401 ) -> Result<(SharedPreparedExecutionPlan, QueryPlanCacheAttribution), QueryError>
402 where
403 E: EntityKind<Canister = C>,
404 {
405 self.cached_shared_query_plan_for_authority(
406 crate::db::executor::EntityAuthority::for_type::<E>(),
407 crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
408 query.structural(),
409 )
410 }
411
412 fn map_cached_shared_query_plan_for_entity<E, T>(
415 &self,
416 query: &Query<E>,
417 map: impl FnOnce(SharedPreparedExecutionPlan) -> T,
418 ) -> Result<T, QueryError>
419 where
420 E: EntityKind<Canister = C>,
421 {
422 let (prepared_plan, _) = self.cached_shared_query_plan_for_entity::<E>(query)?;
423
424 Ok(map(prepared_plan))
425 }
426
427 pub(in crate::db) fn compile_query_with_visible_indexes<E>(
430 &self,
431 query: &Query<E>,
432 ) -> Result<CompiledQuery<E>, QueryError>
433 where
434 E: EntityKind<Canister = C>,
435 {
436 self.map_cached_shared_query_plan_for_entity(query, CompiledQuery::<E>::from_prepared_plan)
437 }
438
439 pub(in crate::db) fn planned_query_with_visible_indexes<E>(
442 &self,
443 query: &Query<E>,
444 ) -> Result<PlannedQuery<E>, QueryError>
445 where
446 E: EntityKind<Canister = C>,
447 {
448 self.map_cached_shared_query_plan_for_entity(query, PlannedQuery::<E>::from_prepared_plan)
449 }
450
451 pub(in crate::db) fn explain_query_with_visible_indexes<E>(
453 &self,
454 query: &Query<E>,
455 ) -> Result<ExplainPlan, QueryError>
456 where
457 E: EntityKind<Canister = C>,
458 {
459 self.with_query_visible_indexes(query, Query::<E>::explain_with_visible_indexes)
460 }
461
462 pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
465 &self,
466 query: &Query<E>,
467 ) -> Result<String, QueryError>
468 where
469 E: EntityKind<Canister = C>,
470 {
471 self.with_query_visible_indexes(query, Query::<E>::plan_hash_hex_with_visible_indexes)
472 }
473
474 pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
477 &self,
478 query: &Query<E>,
479 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
480 where
481 E: EntityValue + EntityKind<Canister = C>,
482 {
483 self.with_query_visible_indexes(query, Query::<E>::explain_execution_with_visible_indexes)
484 }
485
486 pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
489 &self,
490 query: &Query<E>,
491 ) -> Result<String, QueryError>
492 where
493 E: EntityValue + EntityKind<Canister = C>,
494 {
495 self.with_query_visible_indexes(query, |query, visible_indexes| {
496 let (prepared_plan, cache_attribution) =
497 self.cached_prepared_query_plan_for_entity(query)?;
498 let mut plan = prepared_plan.logical_plan().clone();
499
500 plan.finalize_access_choice_for_model_with_indexes(
504 query.structural().model(),
505 visible_indexes.as_slice(),
506 );
507
508 query
509 .structural()
510 .finalized_execution_diagnostics_from_plan_with_descriptor_mutator(
511 &plan,
512 Some(query_plan_cache_reuse_event(cache_attribution)),
513 |_| {},
514 )
515 .map(|diagnostics| diagnostics.render_text_verbose())
516 })
517 }
518
519 pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
522 &self,
523 query: &Query<E>,
524 strategy: &S,
525 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
526 where
527 E: EntityValue + EntityKind<Canister = C>,
528 S: PreparedFluentAggregateExplainStrategy,
529 {
530 self.with_query_visible_indexes(query, |query, visible_indexes| {
531 query
532 .explain_prepared_aggregate_terminal_with_visible_indexes(visible_indexes, strategy)
533 })
534 }
535
536 pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
539 &self,
540 query: &Query<E>,
541 target_field: &str,
542 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
543 where
544 E: EntityValue + EntityKind<Canister = C>,
545 {
546 self.with_query_visible_indexes(query, |query, visible_indexes| {
547 query.explain_bytes_by_with_visible_indexes(visible_indexes, target_field)
548 })
549 }
550
551 pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
554 &self,
555 query: &Query<E>,
556 strategy: &PreparedFluentProjectionStrategy,
557 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
558 where
559 E: EntityValue + EntityKind<Canister = C>,
560 {
561 self.with_query_visible_indexes(query, |query, visible_indexes| {
562 query.explain_prepared_projection_terminal_with_visible_indexes(
563 visible_indexes,
564 strategy,
565 )
566 })
567 }
568
569 fn ensure_scalar_paged_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
572 match family {
573 ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
574 CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
575 )),
576 ExecutionFamily::Ordered => Ok(()),
577 ExecutionFamily::Grouped => Err(QueryError::invariant(
578 "grouped queries execute via execute(), not page().execute()",
579 )),
580 }
581 }
582
583 fn ensure_grouped_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
586 match family {
587 ExecutionFamily::Grouped => Ok(()),
588 ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
589 "grouped execution requires grouped logical plans",
590 )),
591 }
592 }
593
594 fn finalize_grouped_execution_page(
598 page: GroupedCursorPage,
599 trace: Option<ExecutionTrace>,
600 ) -> Result<PagedGroupedExecutionWithTrace, QueryError> {
601 let next_cursor = page
602 .next_cursor
603 .map(|token| {
604 let Some(token) = token.as_grouped() else {
605 return Err(QueryError::grouped_paged_emitted_scalar_continuation());
606 };
607
608 token.encode().map_err(|err| {
609 QueryError::serialize_internal(format!(
610 "failed to serialize grouped continuation cursor: {err}"
611 ))
612 })
613 })
614 .transpose()?;
615
616 Ok(PagedGroupedExecutionWithTrace::new(
617 page.rows,
618 next_cursor,
619 trace,
620 ))
621 }
622
623 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
625 where
626 E: PersistedRow<Canister = C> + EntityValue,
627 {
628 let mode = query.mode();
630 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
631
632 self.execute_query_dyn(mode, plan)
634 }
635
636 #[cfg(feature = "diagnostics")]
639 #[doc(hidden)]
640 #[expect(
641 clippy::too_many_lines,
642 reason = "the diagnostics-only attribution path keeps grouped and scalar execution on one explicit compile/execute accounting seam"
643 )]
644 pub fn execute_query_result_with_attribution<E>(
645 &self,
646 query: &Query<E>,
647 ) -> Result<(LoadQueryResult<E>, QueryExecutionAttribution), QueryError>
648 where
649 E: PersistedRow<Canister = C> + EntityValue,
650 {
651 let (compile_local_instructions, plan_and_cache) =
656 measure_query_stage(|| self.cached_prepared_query_plan_for_entity::<E>(query));
657 let (plan, cache_attribution) = plan_and_cache?;
658
659 let (execute_local_instructions, result) = measure_query_stage(
662 || -> Result<(LoadQueryResult<E>, QueryExecutePhaseAttribution, u64), QueryError> {
663 if query.has_grouping() {
664 let (page, trace, phase_attribution) =
665 self.execute_grouped_plan_with(plan, None, |executor, plan, cursor| {
666 executor
667 .execute_grouped_paged_with_cursor_traced_with_phase_attribution(
668 plan, cursor,
669 )
670 })?;
671 let grouped = Self::finalize_grouped_execution_page(page, trace)?;
672
673 Ok((
674 LoadQueryResult::Grouped(grouped),
675 Self::grouped_query_execute_phase_attribution(phase_attribution),
676 0,
677 ))
678 } else {
679 match query.mode() {
680 QueryMode::Load(_) => {
681 let (rows, phase_attribution, response_decode_local_instructions) =
682 self.load_executor::<E>()
683 .execute_with_phase_attribution(plan)
684 .map_err(QueryError::execute)?;
685
686 Ok((
687 LoadQueryResult::Rows(rows),
688 Self::scalar_query_execute_phase_attribution(phase_attribution),
689 response_decode_local_instructions,
690 ))
691 }
692 QueryMode::Delete(_) => {
693 let result = self.execute_query_dyn(query.mode(), plan)?;
694
695 Ok((
696 LoadQueryResult::Rows(result),
697 Self::empty_query_execute_phase_attribution(),
698 0,
699 ))
700 }
701 }
702 }
703 },
704 );
705 let (result, execute_phase_attribution, response_decode_local_instructions) = result?;
706 let total_local_instructions =
707 compile_local_instructions.saturating_add(execute_local_instructions);
708
709 Ok((
710 result,
711 QueryExecutionAttribution {
712 compile_local_instructions,
713 runtime_local_instructions: execute_phase_attribution.runtime_local_instructions,
714 finalize_local_instructions: execute_phase_attribution.finalize_local_instructions,
715 direct_data_row_scan_local_instructions: execute_phase_attribution
716 .direct_data_row_scan_local_instructions,
717 direct_data_row_key_stream_local_instructions: execute_phase_attribution
718 .direct_data_row_key_stream_local_instructions,
719 direct_data_row_row_read_local_instructions: execute_phase_attribution
720 .direct_data_row_row_read_local_instructions,
721 direct_data_row_key_encode_local_instructions: execute_phase_attribution
722 .direct_data_row_key_encode_local_instructions,
723 direct_data_row_store_get_local_instructions: execute_phase_attribution
724 .direct_data_row_store_get_local_instructions,
725 direct_data_row_order_window_local_instructions: execute_phase_attribution
726 .direct_data_row_order_window_local_instructions,
727 direct_data_row_page_window_local_instructions: execute_phase_attribution
728 .direct_data_row_page_window_local_instructions,
729 grouped_stream_local_instructions: execute_phase_attribution
730 .grouped_stream_local_instructions,
731 grouped_fold_local_instructions: execute_phase_attribution
732 .grouped_fold_local_instructions,
733 grouped_finalize_local_instructions: execute_phase_attribution
734 .grouped_finalize_local_instructions,
735 grouped_count_borrowed_hash_computations: execute_phase_attribution
736 .grouped_count
737 .borrowed_hash_computations,
738 grouped_count_bucket_candidate_checks: execute_phase_attribution
739 .grouped_count
740 .bucket_candidate_checks,
741 grouped_count_existing_group_hits: execute_phase_attribution
742 .grouped_count
743 .existing_group_hits,
744 grouped_count_new_group_inserts: execute_phase_attribution
745 .grouped_count
746 .new_group_inserts,
747 grouped_count_row_materialization_local_instructions: execute_phase_attribution
748 .grouped_count
749 .row_materialization_local_instructions,
750 grouped_count_group_lookup_local_instructions: execute_phase_attribution
751 .grouped_count
752 .group_lookup_local_instructions,
753 grouped_count_existing_group_update_local_instructions: execute_phase_attribution
754 .grouped_count
755 .existing_group_update_local_instructions,
756 grouped_count_new_group_insert_local_instructions: execute_phase_attribution
757 .grouped_count
758 .new_group_insert_local_instructions,
759 response_decode_local_instructions,
760 execute_local_instructions,
761 total_local_instructions,
762 shared_query_plan_cache_hits: cache_attribution.hits,
763 shared_query_plan_cache_misses: cache_attribution.misses,
764 },
765 ))
766 }
767
768 #[doc(hidden)]
771 pub fn execute_query_result<E>(
772 &self,
773 query: &Query<E>,
774 ) -> Result<LoadQueryResult<E>, QueryError>
775 where
776 E: PersistedRow<Canister = C> + EntityValue,
777 {
778 if query.has_grouping() {
779 return self
780 .execute_grouped(query, None)
781 .map(LoadQueryResult::Grouped);
782 }
783
784 self.execute_query(query).map(LoadQueryResult::Rows)
785 }
786
787 #[doc(hidden)]
789 pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
790 where
791 E: PersistedRow<Canister = C> + EntityValue,
792 {
793 if !query.mode().is_delete() {
795 return Err(QueryError::unsupported_query(
796 "delete count execution requires delete query mode",
797 ));
798 }
799
800 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
804
805 self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
807 .map_err(QueryError::execute)
808 }
809
810 pub(in crate::db) fn execute_query_dyn<E>(
815 &self,
816 mode: QueryMode,
817 plan: PreparedExecutionPlan<E>,
818 ) -> Result<EntityResponse<E>, QueryError>
819 where
820 E: PersistedRow<Canister = C> + EntityValue,
821 {
822 let result = match mode {
823 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
824 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
825 };
826
827 result.map_err(QueryError::execute)
828 }
829
830 pub(in crate::db) fn execute_load_query_with<E, T>(
833 &self,
834 query: &Query<E>,
835 op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
836 ) -> Result<T, QueryError>
837 where
838 E: PersistedRow<Canister = C> + EntityValue,
839 {
840 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
841
842 self.with_metrics(|| op(self.load_executor::<E>(), plan))
843 .map_err(QueryError::execute)
844 }
845
846 pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
851 where
852 E: EntityKind<Canister = C>,
853 {
854 let visibility = self.query_plan_visibility_for_store_path(E::Store::PATH)?;
855 let visible_indexes = Self::visible_indexes_for_model(E::MODEL, visibility);
856 let (prepared_plan, cache_attribution) =
857 self.cached_prepared_query_plan_for_entity::<E>(query)?;
858 let logical_plan = prepared_plan.logical_plan();
859 let explain = logical_plan.explain();
860 let plan_hash = query.plan_hash_hex_with_visible_indexes(&visible_indexes)?;
861 let access_strategy = AccessStrategy::from_plan(prepared_plan.access()).debug_summary();
862 let execution_family = match query.mode() {
863 QueryMode::Load(_) => Some(
864 prepared_plan
865 .execution_family()
866 .map_err(QueryError::execute)?,
867 ),
868 QueryMode::Delete(_) => None,
869 };
870 let reuse = query_plan_cache_reuse_event(cache_attribution);
871
872 Ok(QueryTracePlan::new(
873 plan_hash,
874 access_strategy,
875 execution_family,
876 reuse,
877 explain,
878 ))
879 }
880
881 pub(crate) fn execute_load_query_paged_with_trace<E>(
883 &self,
884 query: &Query<E>,
885 cursor_token: Option<&str>,
886 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
887 where
888 E: PersistedRow<Canister = C> + EntityValue,
889 {
890 let plan = self.cached_prepared_query_plan_for_entity::<E>(query)?.0;
892 Self::ensure_scalar_paged_execution_family(
893 plan.execution_family().map_err(QueryError::execute)?,
894 )?;
895
896 let cursor_bytes = decode_optional_cursor_token(cursor_token)
898 .map_err(QueryError::from_cursor_plan_error)?;
899 let cursor = plan
900 .prepare_cursor(cursor_bytes.as_deref())
901 .map_err(QueryError::from_executor_plan_error)?;
902
903 let (page, trace) = self
905 .with_metrics(|| {
906 self.load_executor::<E>()
907 .execute_paged_with_cursor_traced(plan, cursor)
908 })
909 .map_err(QueryError::execute)?;
910 let next_cursor = page
911 .next_cursor
912 .map(|token| {
913 let Some(token) = token.as_scalar() else {
914 return Err(QueryError::scalar_paged_emitted_grouped_continuation());
915 };
916
917 token.encode().map_err(|err| {
918 QueryError::serialize_internal(format!(
919 "failed to serialize continuation cursor: {err}"
920 ))
921 })
922 })
923 .transpose()?;
924
925 Ok(PagedLoadExecutionWithTrace::new(
926 page.items,
927 next_cursor,
928 trace,
929 ))
930 }
931
932 pub(in crate::db) fn execute_grouped<E>(
937 &self,
938 query: &Query<E>,
939 cursor_token: Option<&str>,
940 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
941 where
942 E: PersistedRow<Canister = C> + EntityValue,
943 {
944 let plan = self.cached_prepared_query_plan_for_entity::<E>(query)?.0;
946
947 let (page, trace) = self.execute_grouped_plan_with_trace(plan, cursor_token)?;
950
951 Self::finalize_grouped_execution_page(page, trace)
952 }
953
954 fn execute_grouped_plan_with<E, T>(
957 &self,
958 plan: PreparedExecutionPlan<E>,
959 cursor_token: Option<&str>,
960 op: impl FnOnce(
961 LoadExecutor<E>,
962 PreparedExecutionPlan<E>,
963 crate::db::cursor::GroupedPlannedCursor,
964 ) -> Result<T, InternalError>,
965 ) -> Result<T, QueryError>
966 where
967 E: PersistedRow<Canister = C> + EntityValue,
968 {
969 Self::ensure_grouped_execution_family(
971 plan.execution_family().map_err(QueryError::execute)?,
972 )?;
973
974 let cursor = decode_optional_grouped_cursor_token(cursor_token)
976 .map_err(QueryError::from_cursor_plan_error)?;
977 let cursor = plan
978 .prepare_grouped_cursor_token(cursor)
979 .map_err(QueryError::from_executor_plan_error)?;
980
981 self.with_metrics(|| op(self.load_executor::<E>(), plan, cursor))
984 .map_err(QueryError::execute)
985 }
986
987 fn execute_grouped_plan_with_trace<E>(
989 &self,
990 plan: PreparedExecutionPlan<E>,
991 cursor_token: Option<&str>,
992 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
993 where
994 E: PersistedRow<Canister = C> + EntityValue,
995 {
996 self.execute_grouped_plan_with(plan, cursor_token, |executor, plan, cursor| {
997 executor.execute_grouped_paged_with_cursor_traced(plan, cursor)
998 })
999 }
1000}
1001
1002impl QueryPlanCacheKey {
1003 const fn from_authority_parts(
1007 authority: crate::db::executor::EntityAuthority,
1008 schema_fingerprint: CommitSchemaFingerprint,
1009 visibility: QueryPlanVisibility,
1010 structural_query: crate::db::query::intent::StructuralQueryCacheKey,
1011 cache_method_version: u8,
1012 ) -> Self {
1013 Self {
1014 cache_method_version,
1015 entity_path: authority.entity_path(),
1016 schema_fingerprint,
1017 visibility,
1018 structural_query,
1019 }
1020 }
1021
1022 #[cfg(test)]
1023 fn for_authority_with_method_version(
1024 authority: crate::db::executor::EntityAuthority,
1025 schema_fingerprint: CommitSchemaFingerprint,
1026 visibility: QueryPlanVisibility,
1027 query: &StructuralQuery,
1028 cache_method_version: u8,
1029 ) -> Self {
1030 Self::from_authority_parts(
1031 authority,
1032 schema_fingerprint,
1033 visibility,
1034 query.structural_cache_key(),
1035 cache_method_version,
1036 )
1037 }
1038
1039 fn for_authority_with_normalized_predicate_fingerprint_and_method_version(
1040 authority: crate::db::executor::EntityAuthority,
1041 schema_fingerprint: CommitSchemaFingerprint,
1042 visibility: QueryPlanVisibility,
1043 query: &StructuralQuery,
1044 normalized_predicate_fingerprint: Option<[u8; 32]>,
1045 cache_method_version: u8,
1046 ) -> Self {
1047 Self::from_authority_parts(
1048 authority,
1049 schema_fingerprint,
1050 visibility,
1051 query.structural_cache_key_with_normalized_predicate_fingerprint(
1052 normalized_predicate_fingerprint,
1053 ),
1054 cache_method_version,
1055 )
1056 }
1057}