1mod cache;
8
9#[cfg(feature = "diagnostics")]
10use crate::db::executor::{
11 GroupedCountAttribution, GroupedExecutePhaseAttribution, ScalarExecutePhaseAttribution,
12};
13use crate::{
14 db::{
15 DbSession, EntityResponse, LoadQueryResult, PagedGroupedExecutionWithTrace,
16 PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
17 TraceExecutionFamily,
18 access::summarize_executable_access_plan,
19 cursor::{
20 CursorPlanError, decode_optional_cursor_token, decode_optional_grouped_cursor_token,
21 },
22 diagnostics::ExecutionTrace,
23 executor::{
24 ExecutionFamily, ExecutorPlanError, GroupedCursorPage, LoadExecutor,
25 PreparedExecutionPlan, ScalarNumericFieldBoundaryRequest,
26 ScalarProjectionBoundaryOutput, ScalarProjectionBoundaryRequest,
27 ScalarTerminalBoundaryOutput, ScalarTerminalBoundaryRequest,
28 },
29 query::builder::{
30 PreparedFluentAggregateExplainStrategy,
31 PreparedFluentExistingRowsTerminalRuntimeRequest,
32 PreparedFluentExistingRowsTerminalStrategy, PreparedFluentNumericFieldRuntimeRequest,
33 PreparedFluentNumericFieldStrategy, PreparedFluentOrderSensitiveTerminalRuntimeRequest,
34 PreparedFluentOrderSensitiveTerminalStrategy, PreparedFluentProjectionRuntimeRequest,
35 PreparedFluentProjectionStrategy, PreparedFluentScalarTerminalRuntimeRequest,
36 PreparedFluentScalarTerminalStrategy,
37 },
38 query::explain::{
39 ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
40 },
41 query::fluent::load::{FluentProjectionTerminalOutput, FluentScalarTerminalOutput},
42 query::{
43 intent::{CompiledQuery, PlannedQuery},
44 plan::{FieldSlot, QueryMode},
45 },
46 session::{finalize_grouped_paged_execution, finalize_scalar_paged_execution},
47 },
48 error::InternalError,
49 traits::{CanisterKind, EntityKind, EntityValue, Path},
50 types::{Decimal, Id},
51 value::Value,
52};
53pub(in crate::db) use cache::QueryPlanCacheAttribution;
54#[cfg(test)]
55pub(in crate::db) use cache::QueryPlanVisibility;
56pub(in crate::db::session) use cache::query_plan_cache_reuse_event;
57#[cfg(feature = "diagnostics")]
58use candid::CandidType;
59#[cfg(feature = "diagnostics")]
60use serde::Deserialize;
61
62const fn trace_execution_family_from_executor(family: ExecutionFamily) -> TraceExecutionFamily {
65 match family {
66 ExecutionFamily::PrimaryKey => TraceExecutionFamily::PrimaryKey,
67 ExecutionFamily::Ordered => TraceExecutionFamily::Ordered,
68 ExecutionFamily::Grouped => TraceExecutionFamily::Grouped,
69 }
70}
71
72pub(in crate::db::session) fn query_error_from_executor_plan_error(
75 err: ExecutorPlanError,
76) -> QueryError {
77 match err {
78 ExecutorPlanError::Cursor(err) => QueryError::from_cursor_plan_error(*err),
79 }
80}
81
82#[cfg(feature = "diagnostics")]
89#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
90pub struct QueryExecutionAttribution {
91 pub compile_local_instructions: u64,
92 pub plan_lookup_local_instructions: u64,
93 pub executor_invocation_local_instructions: u64,
94 pub response_finalization_local_instructions: u64,
95 pub runtime_local_instructions: u64,
96 pub finalize_local_instructions: u64,
97 pub direct_data_row_scan_local_instructions: u64,
98 pub direct_data_row_key_stream_local_instructions: u64,
99 pub direct_data_row_row_read_local_instructions: u64,
100 pub direct_data_row_key_encode_local_instructions: u64,
101 pub direct_data_row_store_get_local_instructions: u64,
102 pub direct_data_row_order_window_local_instructions: u64,
103 pub direct_data_row_page_window_local_instructions: u64,
104 pub grouped_stream_local_instructions: u64,
105 pub grouped_fold_local_instructions: u64,
106 pub grouped_finalize_local_instructions: u64,
107 pub grouped_count_borrowed_hash_computations: u64,
108 pub grouped_count_bucket_candidate_checks: u64,
109 pub grouped_count_existing_group_hits: u64,
110 pub grouped_count_new_group_inserts: u64,
111 pub grouped_count_row_materialization_local_instructions: u64,
112 pub grouped_count_group_lookup_local_instructions: u64,
113 pub grouped_count_existing_group_update_local_instructions: u64,
114 pub grouped_count_new_group_insert_local_instructions: u64,
115 pub response_decode_local_instructions: u64,
116 pub execute_local_instructions: u64,
117 pub total_local_instructions: u64,
118 pub shared_query_plan_cache_hits: u64,
119 pub shared_query_plan_cache_misses: u64,
120}
121
122#[cfg(feature = "diagnostics")]
123#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
124struct QueryExecutePhaseAttribution {
125 executor_invocation_local_instructions: u64,
126 response_finalization_local_instructions: u64,
127 runtime_local_instructions: u64,
128 finalize_local_instructions: u64,
129 direct_data_row_scan_local_instructions: u64,
130 direct_data_row_key_stream_local_instructions: u64,
131 direct_data_row_row_read_local_instructions: u64,
132 direct_data_row_key_encode_local_instructions: u64,
133 direct_data_row_store_get_local_instructions: u64,
134 direct_data_row_order_window_local_instructions: u64,
135 direct_data_row_page_window_local_instructions: u64,
136 grouped_stream_local_instructions: u64,
137 grouped_fold_local_instructions: u64,
138 grouped_finalize_local_instructions: u64,
139 grouped_count: GroupedCountAttribution,
140}
141
142#[cfg(feature = "diagnostics")]
143#[expect(
144 clippy::missing_const_for_fn,
145 reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
146)]
147fn read_query_local_instruction_counter() -> u64 {
148 #[cfg(target_arch = "wasm32")]
149 {
150 canic_cdk::api::performance_counter(1)
151 }
152
153 #[cfg(not(target_arch = "wasm32"))]
154 {
155 0
156 }
157}
158
159#[cfg(feature = "diagnostics")]
160fn measure_query_stage<T, E>(run: impl FnOnce() -> Result<T, E>) -> (u64, Result<T, E>) {
161 let start = read_query_local_instruction_counter();
162 let result = run();
163 let delta = read_query_local_instruction_counter().saturating_sub(start);
164
165 (delta, result)
166}
167
168impl<C: CanisterKind> DbSession<C> {
169 #[cfg(feature = "diagnostics")]
170 const fn empty_query_execute_phase_attribution() -> QueryExecutePhaseAttribution {
171 QueryExecutePhaseAttribution {
172 executor_invocation_local_instructions: 0,
173 response_finalization_local_instructions: 0,
174 runtime_local_instructions: 0,
175 finalize_local_instructions: 0,
176 direct_data_row_scan_local_instructions: 0,
177 direct_data_row_key_stream_local_instructions: 0,
178 direct_data_row_row_read_local_instructions: 0,
179 direct_data_row_key_encode_local_instructions: 0,
180 direct_data_row_store_get_local_instructions: 0,
181 direct_data_row_order_window_local_instructions: 0,
182 direct_data_row_page_window_local_instructions: 0,
183 grouped_stream_local_instructions: 0,
184 grouped_fold_local_instructions: 0,
185 grouped_finalize_local_instructions: 0,
186 grouped_count: GroupedCountAttribution::none(),
187 }
188 }
189
190 #[cfg(feature = "diagnostics")]
191 const fn scalar_query_execute_phase_attribution(
192 phase: ScalarExecutePhaseAttribution,
193 executor_invocation_local_instructions: u64,
194 ) -> QueryExecutePhaseAttribution {
195 QueryExecutePhaseAttribution {
196 executor_invocation_local_instructions,
197 response_finalization_local_instructions: 0,
198 runtime_local_instructions: phase.runtime_local_instructions,
199 finalize_local_instructions: phase.finalize_local_instructions,
200 direct_data_row_scan_local_instructions: phase.direct_data_row_scan_local_instructions,
201 direct_data_row_key_stream_local_instructions: phase
202 .direct_data_row_key_stream_local_instructions,
203 direct_data_row_row_read_local_instructions: phase
204 .direct_data_row_row_read_local_instructions,
205 direct_data_row_key_encode_local_instructions: phase
206 .direct_data_row_key_encode_local_instructions,
207 direct_data_row_store_get_local_instructions: phase
208 .direct_data_row_store_get_local_instructions,
209 direct_data_row_order_window_local_instructions: phase
210 .direct_data_row_order_window_local_instructions,
211 direct_data_row_page_window_local_instructions: phase
212 .direct_data_row_page_window_local_instructions,
213 grouped_stream_local_instructions: 0,
214 grouped_fold_local_instructions: 0,
215 grouped_finalize_local_instructions: 0,
216 grouped_count: GroupedCountAttribution::none(),
217 }
218 }
219
220 #[cfg(feature = "diagnostics")]
221 const fn grouped_query_execute_phase_attribution(
222 phase: GroupedExecutePhaseAttribution,
223 executor_invocation_local_instructions: u64,
224 response_finalization_local_instructions: u64,
225 ) -> QueryExecutePhaseAttribution {
226 QueryExecutePhaseAttribution {
227 executor_invocation_local_instructions,
228 response_finalization_local_instructions,
229 runtime_local_instructions: phase
230 .stream_local_instructions
231 .saturating_add(phase.fold_local_instructions),
232 finalize_local_instructions: phase.finalize_local_instructions,
233 direct_data_row_scan_local_instructions: 0,
234 direct_data_row_key_stream_local_instructions: 0,
235 direct_data_row_row_read_local_instructions: 0,
236 direct_data_row_key_encode_local_instructions: 0,
237 direct_data_row_store_get_local_instructions: 0,
238 direct_data_row_order_window_local_instructions: 0,
239 direct_data_row_page_window_local_instructions: 0,
240 grouped_stream_local_instructions: phase.stream_local_instructions,
241 grouped_fold_local_instructions: phase.fold_local_instructions,
242 grouped_finalize_local_instructions: phase.finalize_local_instructions,
243 grouped_count: phase.grouped_count,
244 }
245 }
246
247 pub(in crate::db) fn compile_query_with_visible_indexes<E>(
250 &self,
251 query: &Query<E>,
252 ) -> Result<CompiledQuery<E>, QueryError>
253 where
254 E: EntityKind<Canister = C>,
255 {
256 self.map_cached_shared_query_plan_for_entity(query, CompiledQuery::<E>::from_plan)
257 }
258
259 pub(in crate::db) fn planned_query_with_visible_indexes<E>(
262 &self,
263 query: &Query<E>,
264 ) -> Result<PlannedQuery<E>, QueryError>
265 where
266 E: EntityKind<Canister = C>,
267 {
268 self.map_cached_shared_query_plan_for_entity(query, PlannedQuery::<E>::from_plan)
269 }
270
271 pub(in crate::db) fn explain_query_with_visible_indexes<E>(
273 &self,
274 query: &Query<E>,
275 ) -> Result<ExplainPlan, QueryError>
276 where
277 E: EntityKind<Canister = C>,
278 {
279 self.with_query_visible_indexes(query, Query::<E>::explain_with_visible_indexes)
280 }
281
282 pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
285 &self,
286 query: &Query<E>,
287 ) -> Result<String, QueryError>
288 where
289 E: EntityKind<Canister = C>,
290 {
291 self.with_query_visible_indexes(query, Query::<E>::plan_hash_hex_with_visible_indexes)
292 }
293
294 pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
297 &self,
298 query: &Query<E>,
299 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
300 where
301 E: EntityValue + EntityKind<Canister = C>,
302 {
303 self.with_query_visible_indexes(query, Query::<E>::explain_execution_with_visible_indexes)
304 }
305
306 pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
309 &self,
310 query: &Query<E>,
311 ) -> Result<String, QueryError>
312 where
313 E: EntityValue + EntityKind<Canister = C>,
314 {
315 self.with_query_visible_indexes(query, |query, visible_indexes| {
316 let (prepared_plan, cache_attribution) =
317 self.cached_prepared_query_plan_for_entity(query)?;
318 let mut plan = prepared_plan.logical_plan().clone();
319
320 plan.finalize_access_choice_for_model_with_indexes(
324 query.structural().model(),
325 visible_indexes.as_slice(),
326 );
327
328 query
329 .structural()
330 .finalized_execution_diagnostics_from_plan_with_descriptor_mutator(
331 &plan,
332 Some(query_plan_cache_reuse_event(cache_attribution)),
333 |_| {},
334 )
335 .map(|diagnostics| diagnostics.render_text_verbose())
336 })
337 }
338
339 pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
342 &self,
343 query: &Query<E>,
344 strategy: &S,
345 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
346 where
347 E: EntityValue + EntityKind<Canister = C>,
348 S: PreparedFluentAggregateExplainStrategy,
349 {
350 self.with_query_visible_indexes(query, |query, visible_indexes| {
351 query
352 .explain_prepared_aggregate_terminal_with_visible_indexes(visible_indexes, strategy)
353 })
354 }
355
356 pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
359 &self,
360 query: &Query<E>,
361 target_field: &str,
362 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
363 where
364 E: EntityValue + EntityKind<Canister = C>,
365 {
366 self.with_query_visible_indexes(query, |query, visible_indexes| {
367 query.explain_bytes_by_with_visible_indexes(visible_indexes, target_field)
368 })
369 }
370
371 pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
374 &self,
375 query: &Query<E>,
376 strategy: &PreparedFluentProjectionStrategy,
377 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
378 where
379 E: EntityValue + EntityKind<Canister = C>,
380 {
381 self.with_query_visible_indexes(query, |query, visible_indexes| {
382 query.explain_prepared_projection_terminal_with_visible_indexes(
383 visible_indexes,
384 strategy,
385 )
386 })
387 }
388
389 fn ensure_scalar_paged_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
392 match family {
393 ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
394 CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
395 )),
396 ExecutionFamily::Ordered => Ok(()),
397 ExecutionFamily::Grouped => Err(QueryError::invariant(
398 "grouped queries execute via execute(), not page().execute()",
399 )),
400 }
401 }
402
403 fn ensure_grouped_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
406 match family {
407 ExecutionFamily::Grouped => Ok(()),
408 ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
409 "grouped execution requires grouped logical plans",
410 )),
411 }
412 }
413
414 fn finalize_grouped_execution_page(
418 page: GroupedCursorPage,
419 trace: Option<ExecutionTrace>,
420 ) -> Result<PagedGroupedExecutionWithTrace, QueryError> {
421 finalize_grouped_paged_execution(page, trace)
422 }
423
424 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
426 where
427 E: PersistedRow<Canister = C> + EntityValue,
428 {
429 let mode = query.mode();
431 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
432
433 self.execute_query_dyn(mode, plan)
435 }
436
437 #[cfg(feature = "diagnostics")]
440 #[doc(hidden)]
441 #[expect(
442 clippy::too_many_lines,
443 reason = "the diagnostics-only attribution path keeps grouped and scalar execution on one explicit compile/execute accounting seam"
444 )]
445 pub fn execute_query_result_with_attribution<E>(
446 &self,
447 query: &Query<E>,
448 ) -> Result<(LoadQueryResult<E>, QueryExecutionAttribution), QueryError>
449 where
450 E: PersistedRow<Canister = C> + EntityValue,
451 {
452 let (plan_lookup_local_instructions, plan_and_cache) =
457 measure_query_stage(|| self.cached_prepared_query_plan_for_entity::<E>(query));
458 let (plan, cache_attribution) = plan_and_cache?;
459 let compile_local_instructions = plan_lookup_local_instructions;
460
461 let result =
464 || -> Result<(LoadQueryResult<E>, QueryExecutePhaseAttribution, u64), QueryError> {
465 if query.has_grouping() {
466 let (executor_invocation_local_instructions, grouped_page) =
467 measure_query_stage(|| {
468 self.execute_grouped_plan_with(plan, None, |executor, plan, cursor| {
469 executor
470 .execute_grouped_paged_with_cursor_traced_with_phase_attribution(
471 plan, cursor,
472 )
473 })
474 });
475 let (page, trace, phase_attribution) = grouped_page?;
476 let (response_finalization_local_instructions, grouped) =
477 measure_query_stage(|| Self::finalize_grouped_execution_page(page, trace));
478 let grouped = grouped?;
479
480 Ok((
481 LoadQueryResult::Grouped(grouped),
482 Self::grouped_query_execute_phase_attribution(
483 phase_attribution,
484 executor_invocation_local_instructions,
485 response_finalization_local_instructions,
486 ),
487 0,
488 ))
489 } else {
490 match query.mode() {
491 QueryMode::Load(_) => {
492 let (executor_invocation_local_instructions, executed) =
493 measure_query_stage(|| {
494 self.load_executor::<E>()
495 .execute_with_phase_attribution(plan)
496 .map_err(QueryError::execute)
497 });
498 let (rows, phase_attribution, response_decode_local_instructions) =
499 executed?;
500
501 Ok((
502 LoadQueryResult::Rows(rows),
503 Self::scalar_query_execute_phase_attribution(
504 phase_attribution,
505 executor_invocation_local_instructions,
506 ),
507 response_decode_local_instructions,
508 ))
509 }
510 QueryMode::Delete(_) => {
511 let (executor_invocation_local_instructions, result) =
512 measure_query_stage(|| self.execute_query_dyn(query.mode(), plan));
513 let result = result?;
514
515 Ok((
516 LoadQueryResult::Rows(result),
517 QueryExecutePhaseAttribution {
518 executor_invocation_local_instructions,
519 ..Self::empty_query_execute_phase_attribution()
520 },
521 0,
522 ))
523 }
524 }
525 }
526 }();
527 let (result, execute_phase_attribution, response_decode_local_instructions) = result?;
528 let execute_local_instructions = execute_phase_attribution
529 .executor_invocation_local_instructions
530 .saturating_add(execute_phase_attribution.response_finalization_local_instructions);
531 let total_local_instructions =
532 compile_local_instructions.saturating_add(execute_local_instructions);
533
534 Ok((
535 result,
536 QueryExecutionAttribution {
537 compile_local_instructions,
538 plan_lookup_local_instructions,
539 executor_invocation_local_instructions: execute_phase_attribution
540 .executor_invocation_local_instructions,
541 response_finalization_local_instructions: execute_phase_attribution
542 .response_finalization_local_instructions,
543 runtime_local_instructions: execute_phase_attribution.runtime_local_instructions,
544 finalize_local_instructions: execute_phase_attribution.finalize_local_instructions,
545 direct_data_row_scan_local_instructions: execute_phase_attribution
546 .direct_data_row_scan_local_instructions,
547 direct_data_row_key_stream_local_instructions: execute_phase_attribution
548 .direct_data_row_key_stream_local_instructions,
549 direct_data_row_row_read_local_instructions: execute_phase_attribution
550 .direct_data_row_row_read_local_instructions,
551 direct_data_row_key_encode_local_instructions: execute_phase_attribution
552 .direct_data_row_key_encode_local_instructions,
553 direct_data_row_store_get_local_instructions: execute_phase_attribution
554 .direct_data_row_store_get_local_instructions,
555 direct_data_row_order_window_local_instructions: execute_phase_attribution
556 .direct_data_row_order_window_local_instructions,
557 direct_data_row_page_window_local_instructions: execute_phase_attribution
558 .direct_data_row_page_window_local_instructions,
559 grouped_stream_local_instructions: execute_phase_attribution
560 .grouped_stream_local_instructions,
561 grouped_fold_local_instructions: execute_phase_attribution
562 .grouped_fold_local_instructions,
563 grouped_finalize_local_instructions: execute_phase_attribution
564 .grouped_finalize_local_instructions,
565 grouped_count_borrowed_hash_computations: execute_phase_attribution
566 .grouped_count
567 .borrowed_hash_computations,
568 grouped_count_bucket_candidate_checks: execute_phase_attribution
569 .grouped_count
570 .bucket_candidate_checks,
571 grouped_count_existing_group_hits: execute_phase_attribution
572 .grouped_count
573 .existing_group_hits,
574 grouped_count_new_group_inserts: execute_phase_attribution
575 .grouped_count
576 .new_group_inserts,
577 grouped_count_row_materialization_local_instructions: execute_phase_attribution
578 .grouped_count
579 .row_materialization_local_instructions,
580 grouped_count_group_lookup_local_instructions: execute_phase_attribution
581 .grouped_count
582 .group_lookup_local_instructions,
583 grouped_count_existing_group_update_local_instructions: execute_phase_attribution
584 .grouped_count
585 .existing_group_update_local_instructions,
586 grouped_count_new_group_insert_local_instructions: execute_phase_attribution
587 .grouped_count
588 .new_group_insert_local_instructions,
589 response_decode_local_instructions,
590 execute_local_instructions,
591 total_local_instructions,
592 shared_query_plan_cache_hits: cache_attribution.hits,
593 shared_query_plan_cache_misses: cache_attribution.misses,
594 },
595 ))
596 }
597
598 #[doc(hidden)]
601 pub fn execute_query_result<E>(
602 &self,
603 query: &Query<E>,
604 ) -> Result<LoadQueryResult<E>, QueryError>
605 where
606 E: PersistedRow<Canister = C> + EntityValue,
607 {
608 if query.has_grouping() {
609 return self
610 .execute_grouped(query, None)
611 .map(LoadQueryResult::Grouped);
612 }
613
614 self.execute_query(query).map(LoadQueryResult::Rows)
615 }
616
617 #[doc(hidden)]
619 pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
620 where
621 E: PersistedRow<Canister = C> + EntityValue,
622 {
623 if !query.mode().is_delete() {
625 return Err(QueryError::unsupported_query(
626 "delete count execution requires delete query mode",
627 ));
628 }
629
630 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
634
635 self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
637 .map_err(QueryError::execute)
638 }
639
640 pub(in crate::db) fn execute_query_dyn<E>(
645 &self,
646 mode: QueryMode,
647 plan: PreparedExecutionPlan<E>,
648 ) -> Result<EntityResponse<E>, QueryError>
649 where
650 E: PersistedRow<Canister = C> + EntityValue,
651 {
652 let result = match mode {
653 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
654 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
655 };
656
657 result.map_err(QueryError::execute)
658 }
659
660 pub(in crate::db) fn execute_load_query_with<E, T>(
663 &self,
664 query: &Query<E>,
665 op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
666 ) -> Result<T, QueryError>
667 where
668 E: PersistedRow<Canister = C> + EntityValue,
669 {
670 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
671
672 self.with_metrics(|| op(self.load_executor::<E>(), plan))
673 .map_err(QueryError::execute)
674 }
675
676 fn execute_scalar_terminal_boundary<E>(
679 &self,
680 query: &Query<E>,
681 request: ScalarTerminalBoundaryRequest,
682 ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
683 where
684 E: PersistedRow<Canister = C> + EntityValue,
685 {
686 self.execute_load_query_with(query, move |load, plan| {
687 load.execute_scalar_terminal_request(plan, request)
688 })
689 }
690
691 fn execute_scalar_projection_boundary<E>(
694 &self,
695 query: &Query<E>,
696 target_field: FieldSlot,
697 request: ScalarProjectionBoundaryRequest,
698 ) -> Result<ScalarProjectionBoundaryOutput, QueryError>
699 where
700 E: PersistedRow<Canister = C> + EntityValue,
701 {
702 self.execute_load_query_with(query, move |load, plan| {
703 load.execute_scalar_projection_boundary(plan, target_field, request)
704 })
705 }
706
707 pub(in crate::db) fn execute_fluent_existing_rows_terminal<E>(
710 &self,
711 query: &Query<E>,
712 strategy: PreparedFluentExistingRowsTerminalStrategy,
713 ) -> Result<FluentScalarTerminalOutput<E>, QueryError>
714 where
715 E: PersistedRow<Canister = C> + EntityValue,
716 {
717 match strategy.into_runtime_request() {
718 PreparedFluentExistingRowsTerminalRuntimeRequest::CountRows => self
719 .execute_scalar_terminal_boundary(query, ScalarTerminalBoundaryRequest::Count)?
720 .into_count()
721 .map(FluentScalarTerminalOutput::Count)
722 .map_err(QueryError::execute),
723 PreparedFluentExistingRowsTerminalRuntimeRequest::ExistsRows => self
724 .execute_scalar_terminal_boundary(query, ScalarTerminalBoundaryRequest::Exists)?
725 .into_exists()
726 .map(FluentScalarTerminalOutput::Exists)
727 .map_err(QueryError::execute),
728 }
729 }
730
731 pub(in crate::db) fn execute_fluent_scalar_terminal<E>(
734 &self,
735 query: &Query<E>,
736 strategy: PreparedFluentScalarTerminalStrategy,
737 ) -> Result<FluentScalarTerminalOutput<E>, QueryError>
738 where
739 E: PersistedRow<Canister = C> + EntityValue,
740 {
741 let request = match strategy.into_runtime_request() {
742 PreparedFluentScalarTerminalRuntimeRequest::IdTerminal { kind } => {
743 ScalarTerminalBoundaryRequest::IdTerminal { kind }
744 }
745 PreparedFluentScalarTerminalRuntimeRequest::IdBySlot { kind, target_field } => {
746 ScalarTerminalBoundaryRequest::IdBySlot { kind, target_field }
747 }
748 };
749
750 self.execute_scalar_terminal_boundary(query, request)?
751 .into_id::<E>()
752 .map(FluentScalarTerminalOutput::Id)
753 .map_err(QueryError::execute)
754 }
755
756 pub(in crate::db) fn execute_fluent_order_sensitive_terminal<E>(
759 &self,
760 query: &Query<E>,
761 strategy: PreparedFluentOrderSensitiveTerminalStrategy,
762 ) -> Result<FluentScalarTerminalOutput<E>, QueryError>
763 where
764 E: PersistedRow<Canister = C> + EntityValue,
765 {
766 match strategy.into_runtime_request() {
767 PreparedFluentOrderSensitiveTerminalRuntimeRequest::ResponseOrder { kind } => self
768 .execute_scalar_terminal_boundary(
769 query,
770 ScalarTerminalBoundaryRequest::IdTerminal { kind },
771 )?
772 .into_id::<E>()
773 .map(FluentScalarTerminalOutput::Id)
774 .map_err(QueryError::execute),
775 PreparedFluentOrderSensitiveTerminalRuntimeRequest::NthBySlot { target_field, nth } => {
776 self.execute_scalar_terminal_boundary(
777 query,
778 ScalarTerminalBoundaryRequest::NthBySlot { target_field, nth },
779 )?
780 .into_id::<E>()
781 .map(FluentScalarTerminalOutput::Id)
782 .map_err(QueryError::execute)
783 }
784 PreparedFluentOrderSensitiveTerminalRuntimeRequest::MedianBySlot { target_field } => {
785 self.execute_scalar_terminal_boundary(
786 query,
787 ScalarTerminalBoundaryRequest::MedianBySlot { target_field },
788 )?
789 .into_id::<E>()
790 .map(FluentScalarTerminalOutput::Id)
791 .map_err(QueryError::execute)
792 }
793 PreparedFluentOrderSensitiveTerminalRuntimeRequest::MinMaxBySlot { target_field } => {
794 self.execute_scalar_terminal_boundary(
795 query,
796 ScalarTerminalBoundaryRequest::MinMaxBySlot { target_field },
797 )?
798 .into_id_pair::<E>()
799 .map(FluentScalarTerminalOutput::IdPair)
800 .map_err(QueryError::execute)
801 }
802 }
803 }
804
805 pub(in crate::db) fn execute_fluent_numeric_field_terminal<E>(
808 &self,
809 query: &Query<E>,
810 strategy: PreparedFluentNumericFieldStrategy,
811 ) -> Result<Option<Decimal>, QueryError>
812 where
813 E: PersistedRow<Canister = C> + EntityValue,
814 {
815 let (target_field, runtime_request) = strategy.into_runtime_parts();
816 let request = match runtime_request {
817 PreparedFluentNumericFieldRuntimeRequest::Sum => ScalarNumericFieldBoundaryRequest::Sum,
818 PreparedFluentNumericFieldRuntimeRequest::SumDistinct => {
819 ScalarNumericFieldBoundaryRequest::SumDistinct
820 }
821 PreparedFluentNumericFieldRuntimeRequest::Avg => ScalarNumericFieldBoundaryRequest::Avg,
822 PreparedFluentNumericFieldRuntimeRequest::AvgDistinct => {
823 ScalarNumericFieldBoundaryRequest::AvgDistinct
824 }
825 };
826
827 self.execute_load_query_with(query, move |load, plan| {
828 load.execute_numeric_field_boundary(plan, target_field, request)
829 })
830 }
831
832 pub(in crate::db) fn execute_fluent_projection_terminal<E>(
835 &self,
836 query: &Query<E>,
837 strategy: PreparedFluentProjectionStrategy,
838 ) -> Result<FluentProjectionTerminalOutput<E>, QueryError>
839 where
840 E: PersistedRow<Canister = C> + EntityValue,
841 {
842 let (target_field, runtime_request) = strategy.into_runtime_parts();
843
844 match runtime_request {
845 PreparedFluentProjectionRuntimeRequest::Values => self
846 .execute_scalar_projection_boundary(
847 query,
848 target_field,
849 ScalarProjectionBoundaryRequest::Values,
850 )?
851 .into_values()
852 .map(FluentProjectionTerminalOutput::Values)
853 .map_err(QueryError::execute),
854 PreparedFluentProjectionRuntimeRequest::DistinctValues => self
855 .execute_scalar_projection_boundary(
856 query,
857 target_field,
858 ScalarProjectionBoundaryRequest::DistinctValues,
859 )?
860 .into_values()
861 .map(FluentProjectionTerminalOutput::Values)
862 .map_err(QueryError::execute),
863 PreparedFluentProjectionRuntimeRequest::CountDistinct => self
864 .execute_scalar_projection_boundary(
865 query,
866 target_field,
867 ScalarProjectionBoundaryRequest::CountDistinct,
868 )?
869 .into_count()
870 .map(FluentProjectionTerminalOutput::Count)
871 .map_err(QueryError::execute),
872 PreparedFluentProjectionRuntimeRequest::ValuesWithIds => self
873 .execute_scalar_projection_boundary(
874 query,
875 target_field,
876 ScalarProjectionBoundaryRequest::ValuesWithIds,
877 )?
878 .into_values_with_ids::<E>()
879 .map(FluentProjectionTerminalOutput::ValuesWithIds)
880 .map_err(QueryError::execute),
881 PreparedFluentProjectionRuntimeRequest::TerminalValue { terminal_kind } => self
882 .execute_scalar_projection_boundary(
883 query,
884 target_field,
885 ScalarProjectionBoundaryRequest::TerminalValue { terminal_kind },
886 )?
887 .into_terminal_value()
888 .map(FluentProjectionTerminalOutput::TerminalValue)
889 .map_err(QueryError::execute),
890 }
891 }
892
893 pub(in crate::db) fn execute_fluent_bytes<E>(&self, query: &Query<E>) -> Result<u64, QueryError>
896 where
897 E: PersistedRow<Canister = C> + EntityValue,
898 {
899 self.execute_load_query_with(query, |load, plan| load.bytes(plan))
900 }
901
902 pub(in crate::db) fn execute_fluent_bytes_by_slot<E>(
904 &self,
905 query: &Query<E>,
906 target_slot: FieldSlot,
907 ) -> Result<u64, QueryError>
908 where
909 E: PersistedRow<Canister = C> + EntityValue,
910 {
911 self.execute_load_query_with(query, move |load, plan| {
912 load.bytes_by_slot(plan, target_slot)
913 })
914 }
915
916 pub(in crate::db) fn execute_fluent_take<E>(
918 &self,
919 query: &Query<E>,
920 take_count: u32,
921 ) -> Result<EntityResponse<E>, QueryError>
922 where
923 E: PersistedRow<Canister = C> + EntityValue,
924 {
925 self.execute_load_query_with(query, move |load, plan| load.take(plan, take_count))
926 }
927
928 pub(in crate::db) fn execute_fluent_ranked_rows_by_slot<E>(
930 &self,
931 query: &Query<E>,
932 target_slot: FieldSlot,
933 take_count: u32,
934 descending: bool,
935 ) -> Result<EntityResponse<E>, QueryError>
936 where
937 E: PersistedRow<Canister = C> + EntityValue,
938 {
939 self.execute_load_query_with(query, move |load, plan| {
940 if descending {
941 load.top_k_by_slot(plan, target_slot, take_count)
942 } else {
943 load.bottom_k_by_slot(plan, target_slot, take_count)
944 }
945 })
946 }
947
948 pub(in crate::db) fn execute_fluent_ranked_values_by_slot<E>(
950 &self,
951 query: &Query<E>,
952 target_slot: FieldSlot,
953 take_count: u32,
954 descending: bool,
955 ) -> Result<Vec<Value>, QueryError>
956 where
957 E: PersistedRow<Canister = C> + EntityValue,
958 {
959 self.execute_load_query_with(query, move |load, plan| {
960 if descending {
961 load.top_k_by_values_slot(plan, target_slot, take_count)
962 } else {
963 load.bottom_k_by_values_slot(plan, target_slot, take_count)
964 }
965 })
966 }
967
968 pub(in crate::db) fn execute_fluent_ranked_values_with_ids_by_slot<E>(
970 &self,
971 query: &Query<E>,
972 target_slot: FieldSlot,
973 take_count: u32,
974 descending: bool,
975 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
976 where
977 E: PersistedRow<Canister = C> + EntityValue,
978 {
979 self.execute_load_query_with(query, move |load, plan| {
980 if descending {
981 load.top_k_by_with_ids_slot(plan, target_slot, take_count)
982 } else {
983 load.bottom_k_by_with_ids_slot(plan, target_slot, take_count)
984 }
985 })
986 }
987
988 pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
993 where
994 E: EntityKind<Canister = C>,
995 {
996 let visibility = self.query_plan_visibility_for_store_path(E::Store::PATH)?;
997 let visible_indexes = Self::visible_indexes_for_model(E::MODEL, visibility);
998 let (prepared_plan, cache_attribution) =
999 self.cached_prepared_query_plan_for_entity::<E>(query)?;
1000 let logical_plan = prepared_plan.logical_plan();
1001 let explain = logical_plan.explain();
1002 let plan_hash = query.plan_hash_hex_with_visible_indexes(&visible_indexes)?;
1003 let executable_access = prepared_plan.access().executable_contract();
1004 let access_strategy = summarize_executable_access_plan(&executable_access);
1005 let execution_family = match query.mode() {
1006 QueryMode::Load(_) => Some(trace_execution_family_from_executor(
1007 prepared_plan
1008 .execution_family()
1009 .map_err(QueryError::execute)?,
1010 )),
1011 QueryMode::Delete(_) => None,
1012 };
1013 let reuse = query_plan_cache_reuse_event(cache_attribution);
1014
1015 Ok(QueryTracePlan::new(
1016 plan_hash,
1017 access_strategy,
1018 execution_family,
1019 reuse,
1020 explain,
1021 ))
1022 }
1023
1024 pub(crate) fn execute_load_query_paged_with_trace<E>(
1026 &self,
1027 query: &Query<E>,
1028 cursor_token: Option<&str>,
1029 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
1030 where
1031 E: PersistedRow<Canister = C> + EntityValue,
1032 {
1033 let plan = self.cached_prepared_query_plan_for_entity::<E>(query)?.0;
1035 Self::ensure_scalar_paged_execution_family(
1036 plan.execution_family().map_err(QueryError::execute)?,
1037 )?;
1038
1039 let cursor_bytes = decode_optional_cursor_token(cursor_token)
1041 .map_err(QueryError::from_cursor_plan_error)?;
1042 let cursor = plan
1043 .prepare_cursor(cursor_bytes.as_deref())
1044 .map_err(query_error_from_executor_plan_error)?;
1045
1046 let (page, trace) = self
1048 .with_metrics(|| {
1049 self.load_executor::<E>()
1050 .execute_paged_with_cursor_traced(plan, cursor)
1051 })
1052 .map_err(QueryError::execute)?;
1053 finalize_scalar_paged_execution(page, trace)
1054 }
1055
1056 pub(in crate::db) fn execute_grouped<E>(
1061 &self,
1062 query: &Query<E>,
1063 cursor_token: Option<&str>,
1064 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
1065 where
1066 E: PersistedRow<Canister = C> + EntityValue,
1067 {
1068 let plan = self.cached_prepared_query_plan_for_entity::<E>(query)?.0;
1070
1071 let (page, trace) = self.execute_grouped_plan_with_trace(plan, cursor_token)?;
1074
1075 Self::finalize_grouped_execution_page(page, trace)
1076 }
1077
1078 fn execute_grouped_plan_with<E, T>(
1081 &self,
1082 plan: PreparedExecutionPlan<E>,
1083 cursor_token: Option<&str>,
1084 op: impl FnOnce(
1085 LoadExecutor<E>,
1086 PreparedExecutionPlan<E>,
1087 crate::db::cursor::GroupedPlannedCursor,
1088 ) -> Result<T, InternalError>,
1089 ) -> Result<T, QueryError>
1090 where
1091 E: PersistedRow<Canister = C> + EntityValue,
1092 {
1093 Self::ensure_grouped_execution_family(
1095 plan.execution_family().map_err(QueryError::execute)?,
1096 )?;
1097
1098 let cursor = decode_optional_grouped_cursor_token(cursor_token)
1100 .map_err(QueryError::from_cursor_plan_error)?;
1101 let cursor = plan
1102 .prepare_grouped_cursor_token(cursor)
1103 .map_err(query_error_from_executor_plan_error)?;
1104
1105 self.with_metrics(|| op(self.load_executor::<E>(), plan, cursor))
1108 .map_err(QueryError::execute)
1109 }
1110
1111 fn execute_grouped_plan_with_trace<E>(
1113 &self,
1114 plan: PreparedExecutionPlan<E>,
1115 cursor_token: Option<&str>,
1116 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
1117 where
1118 E: PersistedRow<Canister = C> + EntityValue,
1119 {
1120 self.execute_grouped_plan_with(plan, cursor_token, |executor, plan, cursor| {
1121 executor.execute_grouped_paged_with_cursor_traced(plan, cursor)
1122 })
1123 }
1124}