1mod cache;
8
9#[cfg(feature = "diagnostics")]
10use crate::db::executor::{
11 GroupedCountAttribution, GroupedExecutePhaseAttribution, ScalarExecutePhaseAttribution,
12};
13use crate::{
14 db::{
15 DbSession, EntityResponse, LoadQueryResult, PagedGroupedExecutionWithTrace,
16 PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
17 TraceExecutionFamily,
18 access::summarize_executable_access_plan,
19 cursor::{
20 CursorPlanError, decode_optional_cursor_token, decode_optional_grouped_cursor_token,
21 },
22 diagnostics::ExecutionTrace,
23 executor::{
24 ExecutionFamily, ExecutorPlanError, GroupedCursorPage, LoadExecutor,
25 PreparedExecutionPlan, ScalarNumericFieldBoundaryRequest,
26 ScalarProjectionBoundaryOutput, ScalarProjectionBoundaryRequest,
27 ScalarTerminalBoundaryOutput, ScalarTerminalBoundaryRequest,
28 },
29 query::builder::{
30 PreparedFluentAggregateExplainStrategy,
31 PreparedFluentExistingRowsTerminalRuntimeRequest,
32 PreparedFluentExistingRowsTerminalStrategy, PreparedFluentNumericFieldRuntimeRequest,
33 PreparedFluentNumericFieldStrategy, PreparedFluentOrderSensitiveTerminalRuntimeRequest,
34 PreparedFluentOrderSensitiveTerminalStrategy, PreparedFluentProjectionRuntimeRequest,
35 PreparedFluentProjectionStrategy, PreparedFluentScalarTerminalRuntimeRequest,
36 PreparedFluentScalarTerminalStrategy,
37 },
38 query::explain::{
39 ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
40 },
41 query::fluent::load::{FluentProjectionTerminalOutput, FluentScalarTerminalOutput},
42 query::{
43 intent::{CompiledQuery, PlannedQuery},
44 plan::{FieldSlot, QueryMode},
45 },
46 session::{finalize_grouped_paged_execution, finalize_scalar_paged_execution},
47 },
48 error::InternalError,
49 traits::{CanisterKind, EntityKind, EntityValue, Path},
50 types::{Decimal, Id},
51 value::Value,
52};
53pub(in crate::db) use cache::QueryPlanCacheAttribution;
54#[cfg(test)]
55pub(in crate::db) use cache::QueryPlanVisibility;
56pub(in crate::db::session) use cache::query_plan_cache_reuse_event;
57#[cfg(feature = "diagnostics")]
58use candid::CandidType;
59#[cfg(feature = "diagnostics")]
60use serde::Deserialize;
61
62const fn trace_execution_family_from_executor(family: ExecutionFamily) -> TraceExecutionFamily {
65 match family {
66 ExecutionFamily::PrimaryKey => TraceExecutionFamily::PrimaryKey,
67 ExecutionFamily::Ordered => TraceExecutionFamily::Ordered,
68 ExecutionFamily::Grouped => TraceExecutionFamily::Grouped,
69 }
70}
71
72pub(in crate::db::session) fn query_error_from_executor_plan_error(
75 err: ExecutorPlanError,
76) -> QueryError {
77 match err {
78 ExecutorPlanError::Cursor(err) => QueryError::from_cursor_plan_error(*err),
79 }
80}
81
82#[cfg(feature = "diagnostics")]
92#[derive(CandidType, Clone, Debug, Default, Deserialize, Eq, PartialEq)]
93pub struct QueryExecutionAttribution {
94 pub compile_local_instructions: u64,
95 pub plan_lookup_local_instructions: u64,
96 pub executor_invocation_local_instructions: u64,
97 pub response_finalization_local_instructions: u64,
98 pub runtime_local_instructions: u64,
99 pub finalize_local_instructions: u64,
100 pub direct_data_row_scan_local_instructions: u64,
101 pub direct_data_row_key_stream_local_instructions: u64,
102 pub direct_data_row_row_read_local_instructions: u64,
103 pub direct_data_row_key_encode_local_instructions: u64,
104 pub direct_data_row_store_get_local_instructions: u64,
105 pub direct_data_row_order_window_local_instructions: u64,
106 pub direct_data_row_page_window_local_instructions: u64,
107 pub grouped_stream_local_instructions: u64,
108 pub grouped_fold_local_instructions: u64,
109 pub grouped_finalize_local_instructions: u64,
110 pub grouped_count_borrowed_hash_computations: u64,
111 pub grouped_count_bucket_candidate_checks: u64,
112 pub grouped_count_existing_group_hits: u64,
113 pub grouped_count_new_group_inserts: u64,
114 pub grouped_count_row_materialization_local_instructions: u64,
115 pub grouped_count_group_lookup_local_instructions: u64,
116 pub grouped_count_existing_group_update_local_instructions: u64,
117 pub grouped_count_new_group_insert_local_instructions: u64,
118 pub response_decode_local_instructions: u64,
119 pub execute_local_instructions: u64,
120 pub total_local_instructions: u64,
121 pub shared_query_plan_cache_hits: u64,
122 pub shared_query_plan_cache_misses: u64,
123}
124
125#[cfg(feature = "diagnostics")]
126#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
127struct QueryExecutePhaseAttribution {
128 executor_invocation_local_instructions: u64,
129 response_finalization_local_instructions: u64,
130 runtime_local_instructions: u64,
131 finalize_local_instructions: u64,
132 direct_data_row_scan_local_instructions: u64,
133 direct_data_row_key_stream_local_instructions: u64,
134 direct_data_row_row_read_local_instructions: u64,
135 direct_data_row_key_encode_local_instructions: u64,
136 direct_data_row_store_get_local_instructions: u64,
137 direct_data_row_order_window_local_instructions: u64,
138 direct_data_row_page_window_local_instructions: u64,
139 grouped_stream_local_instructions: u64,
140 grouped_fold_local_instructions: u64,
141 grouped_finalize_local_instructions: u64,
142 grouped_count: GroupedCountAttribution,
143}
144
145#[cfg(feature = "diagnostics")]
146#[expect(
147 clippy::missing_const_for_fn,
148 reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
149)]
150fn read_query_local_instruction_counter() -> u64 {
151 #[cfg(target_arch = "wasm32")]
152 {
153 canic_cdk::api::performance_counter(1)
154 }
155
156 #[cfg(not(target_arch = "wasm32"))]
157 {
158 0
159 }
160}
161
162#[cfg(feature = "diagnostics")]
163fn measure_query_stage<T, E>(run: impl FnOnce() -> Result<T, E>) -> (u64, Result<T, E>) {
164 let start = read_query_local_instruction_counter();
165 let result = run();
166 let delta = read_query_local_instruction_counter().saturating_sub(start);
167
168 (delta, result)
169}
170
171impl<C: CanisterKind> DbSession<C> {
172 #[cfg(feature = "diagnostics")]
173 const fn empty_query_execute_phase_attribution() -> QueryExecutePhaseAttribution {
174 QueryExecutePhaseAttribution {
175 executor_invocation_local_instructions: 0,
176 response_finalization_local_instructions: 0,
177 runtime_local_instructions: 0,
178 finalize_local_instructions: 0,
179 direct_data_row_scan_local_instructions: 0,
180 direct_data_row_key_stream_local_instructions: 0,
181 direct_data_row_row_read_local_instructions: 0,
182 direct_data_row_key_encode_local_instructions: 0,
183 direct_data_row_store_get_local_instructions: 0,
184 direct_data_row_order_window_local_instructions: 0,
185 direct_data_row_page_window_local_instructions: 0,
186 grouped_stream_local_instructions: 0,
187 grouped_fold_local_instructions: 0,
188 grouped_finalize_local_instructions: 0,
189 grouped_count: GroupedCountAttribution::none(),
190 }
191 }
192
193 #[cfg(feature = "diagnostics")]
194 const fn scalar_query_execute_phase_attribution(
195 phase: ScalarExecutePhaseAttribution,
196 executor_invocation_local_instructions: u64,
197 ) -> QueryExecutePhaseAttribution {
198 QueryExecutePhaseAttribution {
199 executor_invocation_local_instructions,
200 response_finalization_local_instructions: 0,
201 runtime_local_instructions: phase.runtime_local_instructions,
202 finalize_local_instructions: phase.finalize_local_instructions,
203 direct_data_row_scan_local_instructions: phase.direct_data_row_scan_local_instructions,
204 direct_data_row_key_stream_local_instructions: phase
205 .direct_data_row_key_stream_local_instructions,
206 direct_data_row_row_read_local_instructions: phase
207 .direct_data_row_row_read_local_instructions,
208 direct_data_row_key_encode_local_instructions: phase
209 .direct_data_row_key_encode_local_instructions,
210 direct_data_row_store_get_local_instructions: phase
211 .direct_data_row_store_get_local_instructions,
212 direct_data_row_order_window_local_instructions: phase
213 .direct_data_row_order_window_local_instructions,
214 direct_data_row_page_window_local_instructions: phase
215 .direct_data_row_page_window_local_instructions,
216 grouped_stream_local_instructions: 0,
217 grouped_fold_local_instructions: 0,
218 grouped_finalize_local_instructions: 0,
219 grouped_count: GroupedCountAttribution::none(),
220 }
221 }
222
223 #[cfg(feature = "diagnostics")]
224 const fn grouped_query_execute_phase_attribution(
225 phase: GroupedExecutePhaseAttribution,
226 executor_invocation_local_instructions: u64,
227 response_finalization_local_instructions: u64,
228 ) -> QueryExecutePhaseAttribution {
229 QueryExecutePhaseAttribution {
230 executor_invocation_local_instructions,
231 response_finalization_local_instructions,
232 runtime_local_instructions: phase
233 .stream_local_instructions
234 .saturating_add(phase.fold_local_instructions),
235 finalize_local_instructions: phase.finalize_local_instructions,
236 direct_data_row_scan_local_instructions: 0,
237 direct_data_row_key_stream_local_instructions: 0,
238 direct_data_row_row_read_local_instructions: 0,
239 direct_data_row_key_encode_local_instructions: 0,
240 direct_data_row_store_get_local_instructions: 0,
241 direct_data_row_order_window_local_instructions: 0,
242 direct_data_row_page_window_local_instructions: 0,
243 grouped_stream_local_instructions: phase.stream_local_instructions,
244 grouped_fold_local_instructions: phase.fold_local_instructions,
245 grouped_finalize_local_instructions: phase.finalize_local_instructions,
246 grouped_count: phase.grouped_count,
247 }
248 }
249
250 pub(in crate::db) fn compile_query_with_visible_indexes<E>(
253 &self,
254 query: &Query<E>,
255 ) -> Result<CompiledQuery<E>, QueryError>
256 where
257 E: EntityKind<Canister = C>,
258 {
259 self.map_cached_shared_query_plan_for_entity(query, CompiledQuery::<E>::from_plan)
260 }
261
262 pub(in crate::db) fn planned_query_with_visible_indexes<E>(
265 &self,
266 query: &Query<E>,
267 ) -> Result<PlannedQuery<E>, QueryError>
268 where
269 E: EntityKind<Canister = C>,
270 {
271 self.map_cached_shared_query_plan_for_entity(query, PlannedQuery::<E>::from_plan)
272 }
273
274 pub(in crate::db) fn explain_query_with_visible_indexes<E>(
276 &self,
277 query: &Query<E>,
278 ) -> Result<ExplainPlan, QueryError>
279 where
280 E: EntityKind<Canister = C>,
281 {
282 self.with_query_visible_indexes(query, Query::<E>::explain_with_visible_indexes)
283 }
284
285 pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
288 &self,
289 query: &Query<E>,
290 ) -> Result<String, QueryError>
291 where
292 E: EntityKind<Canister = C>,
293 {
294 self.with_query_visible_indexes(query, Query::<E>::plan_hash_hex_with_visible_indexes)
295 }
296
297 pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
300 &self,
301 query: &Query<E>,
302 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
303 where
304 E: EntityValue + EntityKind<Canister = C>,
305 {
306 self.with_query_visible_indexes(query, Query::<E>::explain_execution_with_visible_indexes)
307 }
308
309 pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
312 &self,
313 query: &Query<E>,
314 ) -> Result<String, QueryError>
315 where
316 E: EntityValue + EntityKind<Canister = C>,
317 {
318 self.with_query_visible_indexes(query, |query, visible_indexes| {
319 let (prepared_plan, cache_attribution) =
320 self.cached_prepared_query_plan_for_entity(query)?;
321 let mut plan = prepared_plan.logical_plan().clone();
322
323 plan.finalize_access_choice_for_model_with_indexes(
327 query.structural().model(),
328 visible_indexes.as_slice(),
329 );
330
331 query
332 .structural()
333 .finalized_execution_diagnostics_from_plan_with_descriptor_mutator(
334 &plan,
335 Some(query_plan_cache_reuse_event(cache_attribution)),
336 |_| {},
337 )
338 .map(|diagnostics| diagnostics.render_text_verbose())
339 })
340 }
341
342 pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
345 &self,
346 query: &Query<E>,
347 strategy: &S,
348 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
349 where
350 E: EntityValue + EntityKind<Canister = C>,
351 S: PreparedFluentAggregateExplainStrategy,
352 {
353 self.with_query_visible_indexes(query, |query, visible_indexes| {
354 query
355 .explain_prepared_aggregate_terminal_with_visible_indexes(visible_indexes, strategy)
356 })
357 }
358
359 pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
362 &self,
363 query: &Query<E>,
364 target_field: &str,
365 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
366 where
367 E: EntityValue + EntityKind<Canister = C>,
368 {
369 self.with_query_visible_indexes(query, |query, visible_indexes| {
370 query.explain_bytes_by_with_visible_indexes(visible_indexes, target_field)
371 })
372 }
373
374 pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
377 &self,
378 query: &Query<E>,
379 strategy: &PreparedFluentProjectionStrategy,
380 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
381 where
382 E: EntityValue + EntityKind<Canister = C>,
383 {
384 self.with_query_visible_indexes(query, |query, visible_indexes| {
385 query.explain_prepared_projection_terminal_with_visible_indexes(
386 visible_indexes,
387 strategy,
388 )
389 })
390 }
391
392 fn ensure_scalar_paged_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
395 match family {
396 ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
397 CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
398 )),
399 ExecutionFamily::Ordered => Ok(()),
400 ExecutionFamily::Grouped => Err(QueryError::invariant(
401 "grouped queries execute via execute(), not page().execute()",
402 )),
403 }
404 }
405
406 fn ensure_grouped_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
409 match family {
410 ExecutionFamily::Grouped => Ok(()),
411 ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
412 "grouped execution requires grouped logical plans",
413 )),
414 }
415 }
416
417 fn finalize_grouped_execution_page(
421 page: GroupedCursorPage,
422 trace: Option<ExecutionTrace>,
423 ) -> Result<PagedGroupedExecutionWithTrace, QueryError> {
424 finalize_grouped_paged_execution(page, trace)
425 }
426
427 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
429 where
430 E: PersistedRow<Canister = C> + EntityValue,
431 {
432 let mode = query.mode();
434 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
435
436 self.execute_query_dyn(mode, plan)
438 }
439
440 #[cfg(feature = "diagnostics")]
443 #[doc(hidden)]
444 #[expect(
445 clippy::too_many_lines,
446 reason = "the diagnostics-only attribution path keeps grouped and scalar execution on one explicit compile/execute accounting seam"
447 )]
448 #[expect(
449 clippy::needless_update,
450 reason = "diagnostics attribution literals stay default-backed so future counters do not break every initializer"
451 )]
452 pub fn execute_query_result_with_attribution<E>(
453 &self,
454 query: &Query<E>,
455 ) -> Result<(LoadQueryResult<E>, QueryExecutionAttribution), QueryError>
456 where
457 E: PersistedRow<Canister = C> + EntityValue,
458 {
459 let (plan_lookup_local_instructions, plan_and_cache) =
464 measure_query_stage(|| self.cached_prepared_query_plan_for_entity::<E>(query));
465 let (plan, cache_attribution) = plan_and_cache?;
466 let compile_local_instructions = plan_lookup_local_instructions;
467
468 let result =
471 || -> Result<(LoadQueryResult<E>, QueryExecutePhaseAttribution, u64), QueryError> {
472 if query.has_grouping() {
473 let (executor_invocation_local_instructions, grouped_page) =
474 measure_query_stage(|| {
475 self.execute_grouped_plan_with(plan, None, |executor, plan, cursor| {
476 executor
477 .execute_grouped_paged_with_cursor_traced_with_phase_attribution(
478 plan, cursor,
479 )
480 })
481 });
482 let (page, trace, phase_attribution) = grouped_page?;
483 let (response_finalization_local_instructions, grouped) =
484 measure_query_stage(|| Self::finalize_grouped_execution_page(page, trace));
485 let grouped = grouped?;
486
487 Ok((
488 LoadQueryResult::Grouped(grouped),
489 Self::grouped_query_execute_phase_attribution(
490 phase_attribution,
491 executor_invocation_local_instructions,
492 response_finalization_local_instructions,
493 ),
494 0,
495 ))
496 } else {
497 match query.mode() {
498 QueryMode::Load(_) => {
499 let (executor_invocation_local_instructions, executed) =
500 measure_query_stage(|| {
501 self.load_executor::<E>()
502 .execute_with_phase_attribution(plan)
503 .map_err(QueryError::execute)
504 });
505 let (rows, phase_attribution, response_decode_local_instructions) =
506 executed?;
507
508 Ok((
509 LoadQueryResult::Rows(rows),
510 Self::scalar_query_execute_phase_attribution(
511 phase_attribution,
512 executor_invocation_local_instructions,
513 ),
514 response_decode_local_instructions,
515 ))
516 }
517 QueryMode::Delete(_) => {
518 let (executor_invocation_local_instructions, result) =
519 measure_query_stage(|| self.execute_query_dyn(query.mode(), plan));
520 let result = result?;
521
522 Ok((
523 LoadQueryResult::Rows(result),
524 QueryExecutePhaseAttribution {
525 executor_invocation_local_instructions,
526 ..Self::empty_query_execute_phase_attribution()
527 },
528 0,
529 ))
530 }
531 }
532 }
533 }();
534 let (result, execute_phase_attribution, response_decode_local_instructions) = result?;
535 let execute_local_instructions = execute_phase_attribution
536 .executor_invocation_local_instructions
537 .saturating_add(execute_phase_attribution.response_finalization_local_instructions);
538 let total_local_instructions =
539 compile_local_instructions.saturating_add(execute_local_instructions);
540
541 Ok((
542 result,
543 QueryExecutionAttribution {
544 compile_local_instructions,
545 plan_lookup_local_instructions,
546 executor_invocation_local_instructions: execute_phase_attribution
547 .executor_invocation_local_instructions,
548 response_finalization_local_instructions: execute_phase_attribution
549 .response_finalization_local_instructions,
550 runtime_local_instructions: execute_phase_attribution.runtime_local_instructions,
551 finalize_local_instructions: execute_phase_attribution.finalize_local_instructions,
552 direct_data_row_scan_local_instructions: execute_phase_attribution
553 .direct_data_row_scan_local_instructions,
554 direct_data_row_key_stream_local_instructions: execute_phase_attribution
555 .direct_data_row_key_stream_local_instructions,
556 direct_data_row_row_read_local_instructions: execute_phase_attribution
557 .direct_data_row_row_read_local_instructions,
558 direct_data_row_key_encode_local_instructions: execute_phase_attribution
559 .direct_data_row_key_encode_local_instructions,
560 direct_data_row_store_get_local_instructions: execute_phase_attribution
561 .direct_data_row_store_get_local_instructions,
562 direct_data_row_order_window_local_instructions: execute_phase_attribution
563 .direct_data_row_order_window_local_instructions,
564 direct_data_row_page_window_local_instructions: execute_phase_attribution
565 .direct_data_row_page_window_local_instructions,
566 grouped_stream_local_instructions: execute_phase_attribution
567 .grouped_stream_local_instructions,
568 grouped_fold_local_instructions: execute_phase_attribution
569 .grouped_fold_local_instructions,
570 grouped_finalize_local_instructions: execute_phase_attribution
571 .grouped_finalize_local_instructions,
572 grouped_count_borrowed_hash_computations: execute_phase_attribution
573 .grouped_count
574 .borrowed_hash_computations,
575 grouped_count_bucket_candidate_checks: execute_phase_attribution
576 .grouped_count
577 .bucket_candidate_checks,
578 grouped_count_existing_group_hits: execute_phase_attribution
579 .grouped_count
580 .existing_group_hits,
581 grouped_count_new_group_inserts: execute_phase_attribution
582 .grouped_count
583 .new_group_inserts,
584 grouped_count_row_materialization_local_instructions: execute_phase_attribution
585 .grouped_count
586 .row_materialization_local_instructions,
587 grouped_count_group_lookup_local_instructions: execute_phase_attribution
588 .grouped_count
589 .group_lookup_local_instructions,
590 grouped_count_existing_group_update_local_instructions: execute_phase_attribution
591 .grouped_count
592 .existing_group_update_local_instructions,
593 grouped_count_new_group_insert_local_instructions: execute_phase_attribution
594 .grouped_count
595 .new_group_insert_local_instructions,
596 response_decode_local_instructions,
597 execute_local_instructions,
598 total_local_instructions,
599 shared_query_plan_cache_hits: cache_attribution.hits,
600 shared_query_plan_cache_misses: cache_attribution.misses,
601 ..QueryExecutionAttribution::default()
602 },
603 ))
604 }
605
606 #[doc(hidden)]
609 pub fn execute_query_result<E>(
610 &self,
611 query: &Query<E>,
612 ) -> Result<LoadQueryResult<E>, QueryError>
613 where
614 E: PersistedRow<Canister = C> + EntityValue,
615 {
616 if query.has_grouping() {
617 return self
618 .execute_grouped(query, None)
619 .map(LoadQueryResult::Grouped);
620 }
621
622 self.execute_query(query).map(LoadQueryResult::Rows)
623 }
624
625 #[doc(hidden)]
627 pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
628 where
629 E: PersistedRow<Canister = C> + EntityValue,
630 {
631 if !query.mode().is_delete() {
633 return Err(QueryError::unsupported_query(
634 "delete count execution requires delete query mode",
635 ));
636 }
637
638 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
642
643 self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
645 .map_err(QueryError::execute)
646 }
647
648 pub(in crate::db) fn execute_query_dyn<E>(
653 &self,
654 mode: QueryMode,
655 plan: PreparedExecutionPlan<E>,
656 ) -> Result<EntityResponse<E>, QueryError>
657 where
658 E: PersistedRow<Canister = C> + EntityValue,
659 {
660 let result = match mode {
661 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
662 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
663 };
664
665 result.map_err(QueryError::execute)
666 }
667
668 pub(in crate::db) fn execute_load_query_with<E, T>(
671 &self,
672 query: &Query<E>,
673 op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
674 ) -> Result<T, QueryError>
675 where
676 E: PersistedRow<Canister = C> + EntityValue,
677 {
678 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
679
680 self.with_metrics(|| op(self.load_executor::<E>(), plan))
681 .map_err(QueryError::execute)
682 }
683
684 fn execute_scalar_terminal_boundary<E>(
687 &self,
688 query: &Query<E>,
689 request: ScalarTerminalBoundaryRequest,
690 ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
691 where
692 E: PersistedRow<Canister = C> + EntityValue,
693 {
694 self.execute_load_query_with(query, move |load, plan| {
695 load.execute_scalar_terminal_request(plan, request)
696 })
697 }
698
699 fn execute_scalar_projection_boundary<E>(
702 &self,
703 query: &Query<E>,
704 target_field: FieldSlot,
705 request: ScalarProjectionBoundaryRequest,
706 ) -> Result<ScalarProjectionBoundaryOutput, QueryError>
707 where
708 E: PersistedRow<Canister = C> + EntityValue,
709 {
710 self.execute_load_query_with(query, move |load, plan| {
711 load.execute_scalar_projection_boundary(plan, target_field, request)
712 })
713 }
714
715 pub(in crate::db) fn execute_fluent_existing_rows_terminal<E>(
718 &self,
719 query: &Query<E>,
720 strategy: PreparedFluentExistingRowsTerminalStrategy,
721 ) -> Result<FluentScalarTerminalOutput<E>, QueryError>
722 where
723 E: PersistedRow<Canister = C> + EntityValue,
724 {
725 match strategy.into_runtime_request() {
726 PreparedFluentExistingRowsTerminalRuntimeRequest::CountRows => self
727 .execute_scalar_terminal_boundary(query, ScalarTerminalBoundaryRequest::Count)?
728 .into_count()
729 .map(FluentScalarTerminalOutput::Count)
730 .map_err(QueryError::execute),
731 PreparedFluentExistingRowsTerminalRuntimeRequest::ExistsRows => self
732 .execute_scalar_terminal_boundary(query, ScalarTerminalBoundaryRequest::Exists)?
733 .into_exists()
734 .map(FluentScalarTerminalOutput::Exists)
735 .map_err(QueryError::execute),
736 }
737 }
738
739 pub(in crate::db) fn execute_fluent_scalar_terminal<E>(
742 &self,
743 query: &Query<E>,
744 strategy: PreparedFluentScalarTerminalStrategy,
745 ) -> Result<FluentScalarTerminalOutput<E>, QueryError>
746 where
747 E: PersistedRow<Canister = C> + EntityValue,
748 {
749 let request = match strategy.into_runtime_request() {
750 PreparedFluentScalarTerminalRuntimeRequest::IdTerminal { kind } => {
751 ScalarTerminalBoundaryRequest::IdTerminal { kind }
752 }
753 PreparedFluentScalarTerminalRuntimeRequest::IdBySlot { kind, target_field } => {
754 ScalarTerminalBoundaryRequest::IdBySlot { kind, target_field }
755 }
756 };
757
758 self.execute_scalar_terminal_boundary(query, request)?
759 .into_id::<E>()
760 .map(FluentScalarTerminalOutput::Id)
761 .map_err(QueryError::execute)
762 }
763
764 pub(in crate::db) fn execute_fluent_order_sensitive_terminal<E>(
767 &self,
768 query: &Query<E>,
769 strategy: PreparedFluentOrderSensitiveTerminalStrategy,
770 ) -> Result<FluentScalarTerminalOutput<E>, QueryError>
771 where
772 E: PersistedRow<Canister = C> + EntityValue,
773 {
774 match strategy.into_runtime_request() {
775 PreparedFluentOrderSensitiveTerminalRuntimeRequest::ResponseOrder { kind } => self
776 .execute_scalar_terminal_boundary(
777 query,
778 ScalarTerminalBoundaryRequest::IdTerminal { kind },
779 )?
780 .into_id::<E>()
781 .map(FluentScalarTerminalOutput::Id)
782 .map_err(QueryError::execute),
783 PreparedFluentOrderSensitiveTerminalRuntimeRequest::NthBySlot { target_field, nth } => {
784 self.execute_scalar_terminal_boundary(
785 query,
786 ScalarTerminalBoundaryRequest::NthBySlot { target_field, nth },
787 )?
788 .into_id::<E>()
789 .map(FluentScalarTerminalOutput::Id)
790 .map_err(QueryError::execute)
791 }
792 PreparedFluentOrderSensitiveTerminalRuntimeRequest::MedianBySlot { target_field } => {
793 self.execute_scalar_terminal_boundary(
794 query,
795 ScalarTerminalBoundaryRequest::MedianBySlot { target_field },
796 )?
797 .into_id::<E>()
798 .map(FluentScalarTerminalOutput::Id)
799 .map_err(QueryError::execute)
800 }
801 PreparedFluentOrderSensitiveTerminalRuntimeRequest::MinMaxBySlot { target_field } => {
802 self.execute_scalar_terminal_boundary(
803 query,
804 ScalarTerminalBoundaryRequest::MinMaxBySlot { target_field },
805 )?
806 .into_id_pair::<E>()
807 .map(FluentScalarTerminalOutput::IdPair)
808 .map_err(QueryError::execute)
809 }
810 }
811 }
812
813 pub(in crate::db) fn execute_fluent_numeric_field_terminal<E>(
816 &self,
817 query: &Query<E>,
818 strategy: PreparedFluentNumericFieldStrategy,
819 ) -> Result<Option<Decimal>, QueryError>
820 where
821 E: PersistedRow<Canister = C> + EntityValue,
822 {
823 let (target_field, runtime_request) = strategy.into_runtime_parts();
824 let request = match runtime_request {
825 PreparedFluentNumericFieldRuntimeRequest::Sum => ScalarNumericFieldBoundaryRequest::Sum,
826 PreparedFluentNumericFieldRuntimeRequest::SumDistinct => {
827 ScalarNumericFieldBoundaryRequest::SumDistinct
828 }
829 PreparedFluentNumericFieldRuntimeRequest::Avg => ScalarNumericFieldBoundaryRequest::Avg,
830 PreparedFluentNumericFieldRuntimeRequest::AvgDistinct => {
831 ScalarNumericFieldBoundaryRequest::AvgDistinct
832 }
833 };
834
835 self.execute_load_query_with(query, move |load, plan| {
836 load.execute_numeric_field_boundary(plan, target_field, request)
837 })
838 }
839
840 pub(in crate::db) fn execute_fluent_projection_terminal<E>(
843 &self,
844 query: &Query<E>,
845 strategy: PreparedFluentProjectionStrategy,
846 ) -> Result<FluentProjectionTerminalOutput<E>, QueryError>
847 where
848 E: PersistedRow<Canister = C> + EntityValue,
849 {
850 let (target_field, runtime_request) = strategy.into_runtime_parts();
851
852 match runtime_request {
853 PreparedFluentProjectionRuntimeRequest::Values => self
854 .execute_scalar_projection_boundary(
855 query,
856 target_field,
857 ScalarProjectionBoundaryRequest::Values,
858 )?
859 .into_values()
860 .map(FluentProjectionTerminalOutput::Values)
861 .map_err(QueryError::execute),
862 PreparedFluentProjectionRuntimeRequest::DistinctValues => self
863 .execute_scalar_projection_boundary(
864 query,
865 target_field,
866 ScalarProjectionBoundaryRequest::DistinctValues,
867 )?
868 .into_values()
869 .map(FluentProjectionTerminalOutput::Values)
870 .map_err(QueryError::execute),
871 PreparedFluentProjectionRuntimeRequest::CountDistinct => self
872 .execute_scalar_projection_boundary(
873 query,
874 target_field,
875 ScalarProjectionBoundaryRequest::CountDistinct,
876 )?
877 .into_count()
878 .map(FluentProjectionTerminalOutput::Count)
879 .map_err(QueryError::execute),
880 PreparedFluentProjectionRuntimeRequest::ValuesWithIds => self
881 .execute_scalar_projection_boundary(
882 query,
883 target_field,
884 ScalarProjectionBoundaryRequest::ValuesWithIds,
885 )?
886 .into_values_with_ids::<E>()
887 .map(FluentProjectionTerminalOutput::ValuesWithIds)
888 .map_err(QueryError::execute),
889 PreparedFluentProjectionRuntimeRequest::TerminalValue { terminal_kind } => self
890 .execute_scalar_projection_boundary(
891 query,
892 target_field,
893 ScalarProjectionBoundaryRequest::TerminalValue { terminal_kind },
894 )?
895 .into_terminal_value()
896 .map(FluentProjectionTerminalOutput::TerminalValue)
897 .map_err(QueryError::execute),
898 }
899 }
900
901 pub(in crate::db) fn execute_fluent_bytes<E>(&self, query: &Query<E>) -> Result<u64, QueryError>
904 where
905 E: PersistedRow<Canister = C> + EntityValue,
906 {
907 self.execute_load_query_with(query, |load, plan| load.bytes(plan))
908 }
909
910 pub(in crate::db) fn execute_fluent_bytes_by_slot<E>(
912 &self,
913 query: &Query<E>,
914 target_slot: FieldSlot,
915 ) -> Result<u64, QueryError>
916 where
917 E: PersistedRow<Canister = C> + EntityValue,
918 {
919 self.execute_load_query_with(query, move |load, plan| {
920 load.bytes_by_slot(plan, target_slot)
921 })
922 }
923
924 pub(in crate::db) fn execute_fluent_take<E>(
926 &self,
927 query: &Query<E>,
928 take_count: u32,
929 ) -> Result<EntityResponse<E>, QueryError>
930 where
931 E: PersistedRow<Canister = C> + EntityValue,
932 {
933 self.execute_load_query_with(query, move |load, plan| load.take(plan, take_count))
934 }
935
936 pub(in crate::db) fn execute_fluent_ranked_rows_by_slot<E>(
938 &self,
939 query: &Query<E>,
940 target_slot: FieldSlot,
941 take_count: u32,
942 descending: bool,
943 ) -> Result<EntityResponse<E>, QueryError>
944 where
945 E: PersistedRow<Canister = C> + EntityValue,
946 {
947 self.execute_load_query_with(query, move |load, plan| {
948 if descending {
949 load.top_k_by_slot(plan, target_slot, take_count)
950 } else {
951 load.bottom_k_by_slot(plan, target_slot, take_count)
952 }
953 })
954 }
955
956 pub(in crate::db) fn execute_fluent_ranked_values_by_slot<E>(
958 &self,
959 query: &Query<E>,
960 target_slot: FieldSlot,
961 take_count: u32,
962 descending: bool,
963 ) -> Result<Vec<Value>, QueryError>
964 where
965 E: PersistedRow<Canister = C> + EntityValue,
966 {
967 self.execute_load_query_with(query, move |load, plan| {
968 if descending {
969 load.top_k_by_values_slot(plan, target_slot, take_count)
970 } else {
971 load.bottom_k_by_values_slot(plan, target_slot, take_count)
972 }
973 })
974 }
975
976 pub(in crate::db) fn execute_fluent_ranked_values_with_ids_by_slot<E>(
978 &self,
979 query: &Query<E>,
980 target_slot: FieldSlot,
981 take_count: u32,
982 descending: bool,
983 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
984 where
985 E: PersistedRow<Canister = C> + EntityValue,
986 {
987 self.execute_load_query_with(query, move |load, plan| {
988 if descending {
989 load.top_k_by_with_ids_slot(plan, target_slot, take_count)
990 } else {
991 load.bottom_k_by_with_ids_slot(plan, target_slot, take_count)
992 }
993 })
994 }
995
996 pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
1001 where
1002 E: EntityKind<Canister = C>,
1003 {
1004 let visibility = self.query_plan_visibility_for_store_path(E::Store::PATH)?;
1005 let visible_indexes = Self::visible_indexes_for_model(E::MODEL, visibility);
1006 let (prepared_plan, cache_attribution) =
1007 self.cached_prepared_query_plan_for_entity::<E>(query)?;
1008 let logical_plan = prepared_plan.logical_plan();
1009 let explain = logical_plan.explain();
1010 let plan_hash = query.plan_hash_hex_with_visible_indexes(&visible_indexes)?;
1011 let executable_access = prepared_plan.access().executable_contract();
1012 let access_strategy = summarize_executable_access_plan(&executable_access);
1013 let execution_family = match query.mode() {
1014 QueryMode::Load(_) => Some(trace_execution_family_from_executor(
1015 prepared_plan
1016 .execution_family()
1017 .map_err(QueryError::execute)?,
1018 )),
1019 QueryMode::Delete(_) => None,
1020 };
1021 let reuse = query_plan_cache_reuse_event(cache_attribution);
1022
1023 Ok(QueryTracePlan::new(
1024 plan_hash,
1025 access_strategy,
1026 execution_family,
1027 reuse,
1028 explain,
1029 ))
1030 }
1031
1032 pub(crate) fn execute_load_query_paged_with_trace<E>(
1034 &self,
1035 query: &Query<E>,
1036 cursor_token: Option<&str>,
1037 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
1038 where
1039 E: PersistedRow<Canister = C> + EntityValue,
1040 {
1041 let plan = self.cached_prepared_query_plan_for_entity::<E>(query)?.0;
1043 Self::ensure_scalar_paged_execution_family(
1044 plan.execution_family().map_err(QueryError::execute)?,
1045 )?;
1046
1047 let cursor_bytes = decode_optional_cursor_token(cursor_token)
1049 .map_err(QueryError::from_cursor_plan_error)?;
1050 let cursor = plan
1051 .prepare_cursor(cursor_bytes.as_deref())
1052 .map_err(query_error_from_executor_plan_error)?;
1053
1054 let (page, trace) = self
1056 .with_metrics(|| {
1057 self.load_executor::<E>()
1058 .execute_paged_with_cursor_traced(plan, cursor)
1059 })
1060 .map_err(QueryError::execute)?;
1061 finalize_scalar_paged_execution(page, trace)
1062 }
1063
1064 pub(in crate::db) fn execute_grouped<E>(
1069 &self,
1070 query: &Query<E>,
1071 cursor_token: Option<&str>,
1072 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
1073 where
1074 E: PersistedRow<Canister = C> + EntityValue,
1075 {
1076 let plan = self.cached_prepared_query_plan_for_entity::<E>(query)?.0;
1078
1079 let (page, trace) = self.execute_grouped_plan_with_trace(plan, cursor_token)?;
1082
1083 Self::finalize_grouped_execution_page(page, trace)
1084 }
1085
1086 fn execute_grouped_plan_with<E, T>(
1089 &self,
1090 plan: PreparedExecutionPlan<E>,
1091 cursor_token: Option<&str>,
1092 op: impl FnOnce(
1093 LoadExecutor<E>,
1094 PreparedExecutionPlan<E>,
1095 crate::db::cursor::GroupedPlannedCursor,
1096 ) -> Result<T, InternalError>,
1097 ) -> Result<T, QueryError>
1098 where
1099 E: PersistedRow<Canister = C> + EntityValue,
1100 {
1101 Self::ensure_grouped_execution_family(
1103 plan.execution_family().map_err(QueryError::execute)?,
1104 )?;
1105
1106 let cursor = decode_optional_grouped_cursor_token(cursor_token)
1108 .map_err(QueryError::from_cursor_plan_error)?;
1109 let cursor = plan
1110 .prepare_grouped_cursor_token(cursor)
1111 .map_err(query_error_from_executor_plan_error)?;
1112
1113 self.with_metrics(|| op(self.load_executor::<E>(), plan, cursor))
1116 .map_err(QueryError::execute)
1117 }
1118
1119 fn execute_grouped_plan_with_trace<E>(
1121 &self,
1122 plan: PreparedExecutionPlan<E>,
1123 cursor_token: Option<&str>,
1124 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
1125 where
1126 E: PersistedRow<Canister = C> + EntityValue,
1127 {
1128 self.execute_grouped_plan_with(plan, cursor_token, |executor, plan, cursor| {
1129 executor.execute_grouped_paged_with_cursor_traced(plan, cursor)
1130 })
1131 }
1132}