1mod cache;
8
9#[cfg(feature = "diagnostics")]
10use crate::db::diagnostics::measure_local_instruction_delta as measure_query_stage;
11#[cfg(feature = "diagnostics")]
12use crate::db::executor::{
13 GroupedCountAttribution, GroupedExecutePhaseAttribution, ScalarExecutePhaseAttribution,
14};
15use crate::{
16 db::{
17 DbSession, EntityResponse, LoadQueryResult, PagedGroupedExecutionWithTrace,
18 PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
19 TraceExecutionFamily,
20 access::summarize_executable_access_plan,
21 cursor::{
22 CursorPlanError, decode_optional_cursor_token, decode_optional_grouped_cursor_token,
23 },
24 diagnostics::ExecutionTrace,
25 executor::{
26 ExecutionFamily, ExecutorPlanError, LoadExecutor, PreparedExecutionPlan,
27 ScalarProjectionBoundaryOutput, ScalarProjectionBoundaryRequest,
28 ScalarTerminalBoundaryOutput, ScalarTerminalBoundaryRequest,
29 StructuralGroupedProjectionResult,
30 },
31 query::builder::{
32 PreparedFluentAggregateExplainStrategy,
33 PreparedFluentExistingRowsTerminalRuntimeRequest,
34 PreparedFluentExistingRowsTerminalStrategy, PreparedFluentNumericFieldStrategy,
35 PreparedFluentOrderSensitiveTerminalStrategy, PreparedFluentProjectionRuntimeRequest,
36 PreparedFluentProjectionStrategy, PreparedFluentScalarTerminalStrategy,
37 },
38 query::explain::{
39 ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
40 },
41 query::fluent::load::{FluentProjectionTerminalOutput, FluentScalarTerminalOutput},
42 query::{
43 intent::{CompiledQuery, PlannedQuery},
44 plan::{AccessPlannedQuery, FieldSlot, QueryMode, VisibleIndexes},
45 },
46 session::{finalize_scalar_paged_execution, finalize_structural_grouped_projection_result},
47 },
48 error::InternalError,
49 traits::{CanisterKind, EntityKind, EntityValue},
50 types::{Decimal, Id},
51 value::Value,
52};
53pub(in crate::db) use cache::QueryPlanCacheAttribution;
54#[cfg(test)]
55pub(in crate::db) use cache::QueryPlanVisibility;
56pub(in crate::db::session) use cache::query_plan_cache_reuse_event;
57#[cfg(feature = "diagnostics")]
58use candid::CandidType;
59#[cfg(feature = "diagnostics")]
60use serde::Deserialize;
61
62const fn trace_execution_family_from_executor(family: ExecutionFamily) -> TraceExecutionFamily {
65 match family {
66 ExecutionFamily::PrimaryKey => TraceExecutionFamily::PrimaryKey,
67 ExecutionFamily::Ordered => TraceExecutionFamily::Ordered,
68 ExecutionFamily::Grouped => TraceExecutionFamily::Grouped,
69 }
70}
71
72pub(in crate::db::session) fn query_error_from_executor_plan_error(
75 err: ExecutorPlanError,
76) -> QueryError {
77 match err {
78 ExecutorPlanError::Cursor(err) => QueryError::from_cursor_plan_error(*err),
79 }
80}
81
82#[cfg(feature = "diagnostics")]
92#[derive(CandidType, Clone, Debug, Default, Deserialize, Eq, PartialEq)]
93pub struct QueryExecutionAttribution {
94 pub compile_local_instructions: u64,
95 pub plan_lookup_local_instructions: u64,
96 pub executor_invocation_local_instructions: u64,
97 pub response_finalization_local_instructions: u64,
98 pub runtime_local_instructions: u64,
99 pub finalize_local_instructions: u64,
100 pub direct_data_row_scan_local_instructions: u64,
101 pub direct_data_row_key_stream_local_instructions: u64,
102 pub direct_data_row_row_read_local_instructions: u64,
103 pub direct_data_row_key_encode_local_instructions: u64,
104 pub direct_data_row_store_get_local_instructions: u64,
105 pub direct_data_row_order_window_local_instructions: u64,
106 pub direct_data_row_page_window_local_instructions: u64,
107 pub grouped_stream_local_instructions: u64,
108 pub grouped_fold_local_instructions: u64,
109 pub grouped_finalize_local_instructions: u64,
110 pub grouped_count_borrowed_hash_computations: u64,
111 pub grouped_count_bucket_candidate_checks: u64,
112 pub grouped_count_existing_group_hits: u64,
113 pub grouped_count_new_group_inserts: u64,
114 pub grouped_count_row_materialization_local_instructions: u64,
115 pub grouped_count_group_lookup_local_instructions: u64,
116 pub grouped_count_existing_group_update_local_instructions: u64,
117 pub grouped_count_new_group_insert_local_instructions: u64,
118 pub response_decode_local_instructions: u64,
119 pub execute_local_instructions: u64,
120 pub total_local_instructions: u64,
121 pub shared_query_plan_cache_hits: u64,
122 pub shared_query_plan_cache_misses: u64,
123}
124
125#[cfg(feature = "diagnostics")]
126#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
127struct QueryExecutePhaseAttribution {
128 executor_invocation_local_instructions: u64,
129 response_finalization_local_instructions: u64,
130 runtime_local_instructions: u64,
131 finalize_local_instructions: u64,
132 direct_data_row_scan_local_instructions: u64,
133 direct_data_row_key_stream_local_instructions: u64,
134 direct_data_row_row_read_local_instructions: u64,
135 direct_data_row_key_encode_local_instructions: u64,
136 direct_data_row_store_get_local_instructions: u64,
137 direct_data_row_order_window_local_instructions: u64,
138 direct_data_row_page_window_local_instructions: u64,
139 grouped_stream_local_instructions: u64,
140 grouped_fold_local_instructions: u64,
141 grouped_finalize_local_instructions: u64,
142 grouped_count: GroupedCountAttribution,
143}
144
145impl<C: CanisterKind> DbSession<C> {
146 #[cfg(feature = "diagnostics")]
147 const fn empty_query_execute_phase_attribution() -> QueryExecutePhaseAttribution {
148 QueryExecutePhaseAttribution {
149 executor_invocation_local_instructions: 0,
150 response_finalization_local_instructions: 0,
151 runtime_local_instructions: 0,
152 finalize_local_instructions: 0,
153 direct_data_row_scan_local_instructions: 0,
154 direct_data_row_key_stream_local_instructions: 0,
155 direct_data_row_row_read_local_instructions: 0,
156 direct_data_row_key_encode_local_instructions: 0,
157 direct_data_row_store_get_local_instructions: 0,
158 direct_data_row_order_window_local_instructions: 0,
159 direct_data_row_page_window_local_instructions: 0,
160 grouped_stream_local_instructions: 0,
161 grouped_fold_local_instructions: 0,
162 grouped_finalize_local_instructions: 0,
163 grouped_count: GroupedCountAttribution::none(),
164 }
165 }
166
167 #[cfg(feature = "diagnostics")]
168 const fn scalar_query_execute_phase_attribution(
169 phase: ScalarExecutePhaseAttribution,
170 executor_invocation_local_instructions: u64,
171 ) -> QueryExecutePhaseAttribution {
172 QueryExecutePhaseAttribution {
173 executor_invocation_local_instructions,
174 response_finalization_local_instructions: 0,
175 runtime_local_instructions: phase.runtime_local_instructions,
176 finalize_local_instructions: phase.finalize_local_instructions,
177 direct_data_row_scan_local_instructions: phase.direct_data_row_scan_local_instructions,
178 direct_data_row_key_stream_local_instructions: phase
179 .direct_data_row_key_stream_local_instructions,
180 direct_data_row_row_read_local_instructions: phase
181 .direct_data_row_row_read_local_instructions,
182 direct_data_row_key_encode_local_instructions: phase
183 .direct_data_row_key_encode_local_instructions,
184 direct_data_row_store_get_local_instructions: phase
185 .direct_data_row_store_get_local_instructions,
186 direct_data_row_order_window_local_instructions: phase
187 .direct_data_row_order_window_local_instructions,
188 direct_data_row_page_window_local_instructions: phase
189 .direct_data_row_page_window_local_instructions,
190 grouped_stream_local_instructions: 0,
191 grouped_fold_local_instructions: 0,
192 grouped_finalize_local_instructions: 0,
193 grouped_count: GroupedCountAttribution::none(),
194 }
195 }
196
197 #[cfg(feature = "diagnostics")]
198 const fn grouped_query_execute_phase_attribution(
199 phase: GroupedExecutePhaseAttribution,
200 executor_invocation_local_instructions: u64,
201 response_finalization_local_instructions: u64,
202 ) -> QueryExecutePhaseAttribution {
203 QueryExecutePhaseAttribution {
204 executor_invocation_local_instructions,
205 response_finalization_local_instructions,
206 runtime_local_instructions: phase
207 .stream_local_instructions
208 .saturating_add(phase.fold_local_instructions),
209 finalize_local_instructions: phase.finalize_local_instructions,
210 direct_data_row_scan_local_instructions: 0,
211 direct_data_row_key_stream_local_instructions: 0,
212 direct_data_row_row_read_local_instructions: 0,
213 direct_data_row_key_encode_local_instructions: 0,
214 direct_data_row_store_get_local_instructions: 0,
215 direct_data_row_order_window_local_instructions: 0,
216 direct_data_row_page_window_local_instructions: 0,
217 grouped_stream_local_instructions: phase.stream_local_instructions,
218 grouped_fold_local_instructions: phase.fold_local_instructions,
219 grouped_finalize_local_instructions: phase.finalize_local_instructions,
220 grouped_count: phase.grouped_count,
221 }
222 }
223
224 pub(in crate::db) fn compile_query_with_visible_indexes<E>(
227 &self,
228 query: &Query<E>,
229 ) -> Result<CompiledQuery<E>, QueryError>
230 where
231 E: EntityKind<Canister = C>,
232 {
233 self.map_cached_shared_query_plan_for_entity(query, CompiledQuery::<E>::from_plan)
234 }
235
236 pub(in crate::db) fn planned_query_with_visible_indexes<E>(
239 &self,
240 query: &Query<E>,
241 ) -> Result<PlannedQuery<E>, QueryError>
242 where
243 E: EntityKind<Canister = C>,
244 {
245 self.map_cached_shared_query_plan_for_entity(query, PlannedQuery::<E>::from_plan)
246 }
247
248 fn try_map_cached_logical_query_plan<E, T>(
251 &self,
252 query: &Query<E>,
253 map: impl FnOnce(&AccessPlannedQuery) -> Result<T, QueryError>,
254 ) -> Result<T, QueryError>
255 where
256 E: EntityKind<Canister = C>,
257 {
258 self.try_map_cached_shared_query_plan_ref_for_entity::<E, T>(query, |prepared_plan| {
259 map(prepared_plan.logical_plan())
260 })
261 }
262
263 fn cached_finalized_execution_explain_plan<E>(
267 &self,
268 query: &Query<E>,
269 visible_indexes: &VisibleIndexes<'_>,
270 ) -> Result<(AccessPlannedQuery, QueryPlanCacheAttribution), QueryError>
271 where
272 E: EntityKind<Canister = C>,
273 {
274 let (prepared_plan, cache_attribution) =
275 self.cached_shared_query_plan_for_entity::<E>(query)?;
276 let mut plan = prepared_plan.logical_plan().clone();
277
278 plan.finalize_access_choice_for_model_with_indexes(
279 query.structural().model(),
280 visible_indexes.as_slice(),
281 );
282
283 Ok((plan, cache_attribution))
284 }
285
286 pub(in crate::db) fn explain_query_with_visible_indexes<E>(
288 &self,
289 query: &Query<E>,
290 ) -> Result<ExplainPlan, QueryError>
291 where
292 E: EntityKind<Canister = C>,
293 {
294 self.try_map_cached_logical_query_plan(query, |plan| Ok(plan.explain()))
295 }
296
297 pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
300 &self,
301 query: &Query<E>,
302 ) -> Result<String, QueryError>
303 where
304 E: EntityKind<Canister = C>,
305 {
306 self.try_map_cached_logical_query_plan(query, |plan| Ok(plan.fingerprint().to_string()))
307 }
308
309 pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
312 &self,
313 query: &Query<E>,
314 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
315 where
316 E: EntityValue + EntityKind<Canister = C>,
317 {
318 self.with_query_visible_indexes(query, |query, visible_indexes| {
319 let (plan, _) =
320 self.cached_finalized_execution_explain_plan::<E>(query, visible_indexes)?;
321
322 query
323 .structural()
324 .explain_execution_descriptor_from_plan(&plan)
325 })
326 }
327
328 pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
331 &self,
332 query: &Query<E>,
333 ) -> Result<String, QueryError>
334 where
335 E: EntityValue + EntityKind<Canister = C>,
336 {
337 self.with_query_visible_indexes(query, |query, visible_indexes| {
338 let (plan, cache_attribution) =
339 self.cached_finalized_execution_explain_plan::<E>(query, visible_indexes)?;
340
341 query
342 .structural()
343 .finalized_execution_diagnostics_from_plan_with_descriptor_mutator(
344 &plan,
345 Some(query_plan_cache_reuse_event(cache_attribution)),
346 |_| {},
347 )
348 .map(|diagnostics| diagnostics.render_text_verbose())
349 })
350 }
351
352 pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
355 &self,
356 query: &Query<E>,
357 strategy: &S,
358 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
359 where
360 E: EntityValue + EntityKind<Canister = C>,
361 S: PreparedFluentAggregateExplainStrategy,
362 {
363 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
364
365 plan.explain_prepared_aggregate_terminal(strategy)
366 }
367
368 pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
371 &self,
372 query: &Query<E>,
373 target_field: &str,
374 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
375 where
376 E: EntityValue + EntityKind<Canister = C>,
377 {
378 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
379
380 plan.explain_bytes_by_terminal(target_field)
381 }
382
383 pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
386 &self,
387 query: &Query<E>,
388 strategy: &PreparedFluentProjectionStrategy,
389 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
390 where
391 E: EntityValue + EntityKind<Canister = C>,
392 {
393 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
394
395 plan.explain_prepared_projection_terminal(strategy)
396 }
397
398 fn ensure_scalar_paged_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
401 match family {
402 ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
403 CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
404 )),
405 ExecutionFamily::Ordered => Ok(()),
406 ExecutionFamily::Grouped => Err(QueryError::invariant(
407 "grouped queries execute via execute(), not page().execute()",
408 )),
409 }
410 }
411
412 fn ensure_grouped_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
415 match family {
416 ExecutionFamily::Grouped => Ok(()),
417 ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
418 "grouped execution requires grouped logical plans",
419 )),
420 }
421 }
422
423 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
425 where
426 E: PersistedRow<Canister = C> + EntityValue,
427 {
428 let mode = query.mode();
430 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
431
432 self.execute_query_dyn(mode, plan)
434 }
435
436 #[cfg(feature = "diagnostics")]
439 #[doc(hidden)]
440 #[expect(
441 clippy::too_many_lines,
442 reason = "the diagnostics-only attribution path keeps grouped and scalar execution on one explicit compile/execute accounting seam"
443 )]
444 #[expect(
445 clippy::needless_update,
446 reason = "diagnostics attribution literals stay default-backed so future counters do not break every initializer"
447 )]
448 pub fn execute_query_result_with_attribution<E>(
449 &self,
450 query: &Query<E>,
451 ) -> Result<(LoadQueryResult<E>, QueryExecutionAttribution), QueryError>
452 where
453 E: PersistedRow<Canister = C> + EntityValue,
454 {
455 let (plan_lookup_local_instructions, plan_and_cache) =
460 measure_query_stage(|| self.cached_prepared_query_plan_for_entity::<E>(query));
461 let (plan, cache_attribution) = plan_and_cache?;
462 let compile_local_instructions = plan_lookup_local_instructions;
463
464 let result =
467 || -> Result<(LoadQueryResult<E>, QueryExecutePhaseAttribution, u64), QueryError> {
468 if query.has_grouping() {
469 let (executor_invocation_local_instructions, grouped_page) =
470 measure_query_stage(|| {
471 self.execute_grouped_plan_with(plan, None, |executor, plan, cursor| {
472 executor
473 .execute_grouped_paged_with_cursor_traced_with_phase_attribution(
474 plan, cursor,
475 )
476 })
477 });
478 let (result, trace, phase_attribution) = grouped_page?;
479 let (response_finalization_local_instructions, grouped) =
480 measure_query_stage(|| {
481 finalize_structural_grouped_projection_result(result, trace)
482 });
483 let grouped = grouped?;
484
485 Ok((
486 LoadQueryResult::Grouped(grouped),
487 Self::grouped_query_execute_phase_attribution(
488 phase_attribution,
489 executor_invocation_local_instructions,
490 response_finalization_local_instructions,
491 ),
492 0,
493 ))
494 } else {
495 match query.mode() {
496 QueryMode::Load(_) => {
497 let (executor_invocation_local_instructions, executed) =
498 measure_query_stage(|| {
499 self.load_executor::<E>()
500 .execute_with_phase_attribution(plan)
501 .map_err(QueryError::execute)
502 });
503 let (rows, phase_attribution, response_decode_local_instructions) =
504 executed?;
505
506 Ok((
507 LoadQueryResult::Rows(rows),
508 Self::scalar_query_execute_phase_attribution(
509 phase_attribution,
510 executor_invocation_local_instructions,
511 ),
512 response_decode_local_instructions,
513 ))
514 }
515 QueryMode::Delete(_) => {
516 let (executor_invocation_local_instructions, result) =
517 measure_query_stage(|| self.execute_query_dyn(query.mode(), plan));
518 let result = result?;
519
520 Ok((
521 LoadQueryResult::Rows(result),
522 QueryExecutePhaseAttribution {
523 executor_invocation_local_instructions,
524 ..Self::empty_query_execute_phase_attribution()
525 },
526 0,
527 ))
528 }
529 }
530 }
531 }();
532 let (result, execute_phase_attribution, response_decode_local_instructions) = result?;
533 let execute_local_instructions = execute_phase_attribution
534 .executor_invocation_local_instructions
535 .saturating_add(execute_phase_attribution.response_finalization_local_instructions);
536 let total_local_instructions =
537 compile_local_instructions.saturating_add(execute_local_instructions);
538
539 Ok((
540 result,
541 QueryExecutionAttribution {
542 compile_local_instructions,
543 plan_lookup_local_instructions,
544 executor_invocation_local_instructions: execute_phase_attribution
545 .executor_invocation_local_instructions,
546 response_finalization_local_instructions: execute_phase_attribution
547 .response_finalization_local_instructions,
548 runtime_local_instructions: execute_phase_attribution.runtime_local_instructions,
549 finalize_local_instructions: execute_phase_attribution.finalize_local_instructions,
550 direct_data_row_scan_local_instructions: execute_phase_attribution
551 .direct_data_row_scan_local_instructions,
552 direct_data_row_key_stream_local_instructions: execute_phase_attribution
553 .direct_data_row_key_stream_local_instructions,
554 direct_data_row_row_read_local_instructions: execute_phase_attribution
555 .direct_data_row_row_read_local_instructions,
556 direct_data_row_key_encode_local_instructions: execute_phase_attribution
557 .direct_data_row_key_encode_local_instructions,
558 direct_data_row_store_get_local_instructions: execute_phase_attribution
559 .direct_data_row_store_get_local_instructions,
560 direct_data_row_order_window_local_instructions: execute_phase_attribution
561 .direct_data_row_order_window_local_instructions,
562 direct_data_row_page_window_local_instructions: execute_phase_attribution
563 .direct_data_row_page_window_local_instructions,
564 grouped_stream_local_instructions: execute_phase_attribution
565 .grouped_stream_local_instructions,
566 grouped_fold_local_instructions: execute_phase_attribution
567 .grouped_fold_local_instructions,
568 grouped_finalize_local_instructions: execute_phase_attribution
569 .grouped_finalize_local_instructions,
570 grouped_count_borrowed_hash_computations: execute_phase_attribution
571 .grouped_count
572 .borrowed_hash_computations,
573 grouped_count_bucket_candidate_checks: execute_phase_attribution
574 .grouped_count
575 .bucket_candidate_checks,
576 grouped_count_existing_group_hits: execute_phase_attribution
577 .grouped_count
578 .existing_group_hits,
579 grouped_count_new_group_inserts: execute_phase_attribution
580 .grouped_count
581 .new_group_inserts,
582 grouped_count_row_materialization_local_instructions: execute_phase_attribution
583 .grouped_count
584 .row_materialization_local_instructions,
585 grouped_count_group_lookup_local_instructions: execute_phase_attribution
586 .grouped_count
587 .group_lookup_local_instructions,
588 grouped_count_existing_group_update_local_instructions: execute_phase_attribution
589 .grouped_count
590 .existing_group_update_local_instructions,
591 grouped_count_new_group_insert_local_instructions: execute_phase_attribution
592 .grouped_count
593 .new_group_insert_local_instructions,
594 response_decode_local_instructions,
595 execute_local_instructions,
596 total_local_instructions,
597 shared_query_plan_cache_hits: cache_attribution.hits,
598 shared_query_plan_cache_misses: cache_attribution.misses,
599 ..QueryExecutionAttribution::default()
600 },
601 ))
602 }
603
604 #[doc(hidden)]
607 pub fn execute_query_result<E>(
608 &self,
609 query: &Query<E>,
610 ) -> Result<LoadQueryResult<E>, QueryError>
611 where
612 E: PersistedRow<Canister = C> + EntityValue,
613 {
614 if query.has_grouping() {
615 return self
616 .execute_grouped(query, None)
617 .map(LoadQueryResult::Grouped);
618 }
619
620 self.execute_query(query).map(LoadQueryResult::Rows)
621 }
622
623 #[doc(hidden)]
625 pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
626 where
627 E: PersistedRow<Canister = C> + EntityValue,
628 {
629 if !query.mode().is_delete() {
631 return Err(QueryError::unsupported_query(
632 "delete count execution requires delete query mode",
633 ));
634 }
635
636 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
640
641 self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
643 .map_err(QueryError::execute)
644 }
645
646 pub(in crate::db) fn execute_query_dyn<E>(
651 &self,
652 mode: QueryMode,
653 plan: PreparedExecutionPlan<E>,
654 ) -> Result<EntityResponse<E>, QueryError>
655 where
656 E: PersistedRow<Canister = C> + EntityValue,
657 {
658 let result = match mode {
659 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
660 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
661 };
662
663 result.map_err(QueryError::execute)
664 }
665
666 pub(in crate::db) fn execute_load_query_with<E, T>(
669 &self,
670 query: &Query<E>,
671 op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
672 ) -> Result<T, QueryError>
673 where
674 E: PersistedRow<Canister = C> + EntityValue,
675 {
676 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
677
678 self.with_metrics(|| op(self.load_executor::<E>(), plan))
679 .map_err(QueryError::execute)
680 }
681
682 fn execute_scalar_terminal_boundary<E>(
685 &self,
686 query: &Query<E>,
687 request: ScalarTerminalBoundaryRequest,
688 ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
689 where
690 E: PersistedRow<Canister = C> + EntityValue,
691 {
692 self.execute_load_query_with(query, move |load, plan| {
693 load.execute_scalar_terminal_request(plan, request)
694 })
695 }
696
697 fn execute_scalar_projection_boundary<E>(
700 &self,
701 query: &Query<E>,
702 target_field: FieldSlot,
703 request: ScalarProjectionBoundaryRequest,
704 ) -> Result<ScalarProjectionBoundaryOutput, QueryError>
705 where
706 E: PersistedRow<Canister = C> + EntityValue,
707 {
708 self.execute_load_query_with(query, move |load, plan| {
709 load.execute_scalar_projection_boundary(plan, target_field, request)
710 })
711 }
712
713 pub(in crate::db) fn execute_fluent_existing_rows_terminal<E>(
716 &self,
717 query: &Query<E>,
718 strategy: PreparedFluentExistingRowsTerminalStrategy,
719 ) -> Result<FluentScalarTerminalOutput<E>, QueryError>
720 where
721 E: PersistedRow<Canister = C> + EntityValue,
722 {
723 let (request, output_shape) = strategy.into_executor_request();
724 let output = self.execute_scalar_terminal_boundary(query, request)?;
725
726 match output_shape {
727 PreparedFluentExistingRowsTerminalRuntimeRequest::CountRows => output
728 .into_count()
729 .map(FluentScalarTerminalOutput::Count)
730 .map_err(QueryError::execute),
731 PreparedFluentExistingRowsTerminalRuntimeRequest::ExistsRows => output
732 .into_exists()
733 .map(FluentScalarTerminalOutput::Exists)
734 .map_err(QueryError::execute),
735 }
736 }
737
738 pub(in crate::db) fn execute_fluent_scalar_terminal<E>(
741 &self,
742 query: &Query<E>,
743 strategy: PreparedFluentScalarTerminalStrategy,
744 ) -> Result<FluentScalarTerminalOutput<E>, QueryError>
745 where
746 E: PersistedRow<Canister = C> + EntityValue,
747 {
748 let request = strategy.into_executor_request();
749
750 self.execute_scalar_terminal_boundary(query, request)?
751 .into_id::<E>()
752 .map(FluentScalarTerminalOutput::Id)
753 .map_err(QueryError::execute)
754 }
755
756 pub(in crate::db) fn execute_fluent_order_sensitive_terminal<E>(
759 &self,
760 query: &Query<E>,
761 strategy: PreparedFluentOrderSensitiveTerminalStrategy,
762 ) -> Result<FluentScalarTerminalOutput<E>, QueryError>
763 where
764 E: PersistedRow<Canister = C> + EntityValue,
765 {
766 let (request, returns_id_pair) = strategy.into_executor_request();
767 let output = self.execute_scalar_terminal_boundary(query, request)?;
768
769 if returns_id_pair {
770 return output
771 .into_id_pair::<E>()
772 .map(FluentScalarTerminalOutput::IdPair)
773 .map_err(QueryError::execute);
774 }
775
776 output
777 .into_id::<E>()
778 .map(FluentScalarTerminalOutput::Id)
779 .map_err(QueryError::execute)
780 }
781
782 pub(in crate::db) fn execute_fluent_numeric_field_terminal<E>(
785 &self,
786 query: &Query<E>,
787 strategy: PreparedFluentNumericFieldStrategy,
788 ) -> Result<Option<Decimal>, QueryError>
789 where
790 E: PersistedRow<Canister = C> + EntityValue,
791 {
792 let (target_field, request) = strategy.into_executor_request();
793
794 self.execute_load_query_with(query, move |load, plan| {
795 load.execute_numeric_field_boundary(plan, target_field, request)
796 })
797 }
798
799 pub(in crate::db) fn execute_fluent_projection_terminal<E>(
802 &self,
803 query: &Query<E>,
804 strategy: PreparedFluentProjectionStrategy,
805 ) -> Result<FluentProjectionTerminalOutput<E>, QueryError>
806 where
807 E: PersistedRow<Canister = C> + EntityValue,
808 {
809 let (target_field, request, output_shape) = strategy.into_executor_request();
810 let output = self.execute_scalar_projection_boundary(query, target_field, request)?;
811
812 match output_shape {
813 PreparedFluentProjectionRuntimeRequest::Values
814 | PreparedFluentProjectionRuntimeRequest::DistinctValues => output
815 .into_values()
816 .map(FluentProjectionTerminalOutput::Values)
817 .map_err(QueryError::execute),
818 PreparedFluentProjectionRuntimeRequest::CountDistinct => output
819 .into_count()
820 .map(FluentProjectionTerminalOutput::Count)
821 .map_err(QueryError::execute),
822 PreparedFluentProjectionRuntimeRequest::ValuesWithIds => output
823 .into_values_with_ids::<E>()
824 .map(FluentProjectionTerminalOutput::ValuesWithIds)
825 .map_err(QueryError::execute),
826 PreparedFluentProjectionRuntimeRequest::TerminalValue { .. } => output
827 .into_terminal_value()
828 .map(FluentProjectionTerminalOutput::TerminalValue)
829 .map_err(QueryError::execute),
830 }
831 }
832
833 pub(in crate::db) fn execute_fluent_bytes<E>(&self, query: &Query<E>) -> Result<u64, QueryError>
836 where
837 E: PersistedRow<Canister = C> + EntityValue,
838 {
839 self.execute_load_query_with(query, |load, plan| load.bytes(plan))
840 }
841
842 pub(in crate::db) fn execute_fluent_bytes_by_slot<E>(
844 &self,
845 query: &Query<E>,
846 target_slot: FieldSlot,
847 ) -> Result<u64, QueryError>
848 where
849 E: PersistedRow<Canister = C> + EntityValue,
850 {
851 self.execute_load_query_with(query, move |load, plan| {
852 load.bytes_by_slot(plan, target_slot)
853 })
854 }
855
856 pub(in crate::db) fn execute_fluent_take<E>(
858 &self,
859 query: &Query<E>,
860 take_count: u32,
861 ) -> Result<EntityResponse<E>, QueryError>
862 where
863 E: PersistedRow<Canister = C> + EntityValue,
864 {
865 self.execute_load_query_with(query, move |load, plan| load.take(plan, take_count))
866 }
867
868 pub(in crate::db) fn execute_fluent_ranked_rows_by_slot<E>(
870 &self,
871 query: &Query<E>,
872 target_slot: FieldSlot,
873 take_count: u32,
874 descending: bool,
875 ) -> Result<EntityResponse<E>, QueryError>
876 where
877 E: PersistedRow<Canister = C> + EntityValue,
878 {
879 self.execute_load_query_with(query, move |load, plan| {
880 if descending {
881 load.top_k_by_slot(plan, target_slot, take_count)
882 } else {
883 load.bottom_k_by_slot(plan, target_slot, take_count)
884 }
885 })
886 }
887
888 pub(in crate::db) fn execute_fluent_ranked_values_by_slot<E>(
890 &self,
891 query: &Query<E>,
892 target_slot: FieldSlot,
893 take_count: u32,
894 descending: bool,
895 ) -> Result<Vec<Value>, QueryError>
896 where
897 E: PersistedRow<Canister = C> + EntityValue,
898 {
899 self.execute_load_query_with(query, move |load, plan| {
900 if descending {
901 load.top_k_by_values_slot(plan, target_slot, take_count)
902 } else {
903 load.bottom_k_by_values_slot(plan, target_slot, take_count)
904 }
905 })
906 }
907
908 pub(in crate::db) fn execute_fluent_ranked_values_with_ids_by_slot<E>(
910 &self,
911 query: &Query<E>,
912 target_slot: FieldSlot,
913 take_count: u32,
914 descending: bool,
915 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
916 where
917 E: PersistedRow<Canister = C> + EntityValue,
918 {
919 self.execute_load_query_with(query, move |load, plan| {
920 if descending {
921 load.top_k_by_with_ids_slot(plan, target_slot, take_count)
922 } else {
923 load.bottom_k_by_with_ids_slot(plan, target_slot, take_count)
924 }
925 })
926 }
927
928 pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
933 where
934 E: EntityKind<Canister = C>,
935 {
936 let (prepared_plan, cache_attribution) =
937 self.cached_prepared_query_plan_for_entity::<E>(query)?;
938 let logical_plan = prepared_plan.logical_plan();
939 let explain = logical_plan.explain();
940 let plan_hash = logical_plan.fingerprint().to_string();
941 let executable_access = prepared_plan.access().executable_contract();
942 let access_strategy = summarize_executable_access_plan(&executable_access);
943 let execution_family = match query.mode() {
944 QueryMode::Load(_) => Some(trace_execution_family_from_executor(
945 prepared_plan
946 .execution_family()
947 .map_err(QueryError::execute)?,
948 )),
949 QueryMode::Delete(_) => None,
950 };
951 let reuse = query_plan_cache_reuse_event(cache_attribution);
952
953 Ok(QueryTracePlan::new(
954 plan_hash,
955 access_strategy,
956 execution_family,
957 reuse,
958 explain,
959 ))
960 }
961
962 pub(crate) fn execute_load_query_paged_with_trace<E>(
964 &self,
965 query: &Query<E>,
966 cursor_token: Option<&str>,
967 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
968 where
969 E: PersistedRow<Canister = C> + EntityValue,
970 {
971 let plan = self.cached_prepared_query_plan_for_entity::<E>(query)?.0;
973 Self::ensure_scalar_paged_execution_family(
974 plan.execution_family().map_err(QueryError::execute)?,
975 )?;
976
977 let cursor_bytes = decode_optional_cursor_token(cursor_token)
979 .map_err(QueryError::from_cursor_plan_error)?;
980 let cursor = plan
981 .prepare_cursor(cursor_bytes.as_deref())
982 .map_err(query_error_from_executor_plan_error)?;
983
984 let (page, trace) = self
986 .with_metrics(|| {
987 self.load_executor::<E>()
988 .execute_paged_with_cursor_traced(plan, cursor)
989 })
990 .map_err(QueryError::execute)?;
991 finalize_scalar_paged_execution(page, trace)
992 }
993
994 pub(in crate::db) fn execute_grouped<E>(
999 &self,
1000 query: &Query<E>,
1001 cursor_token: Option<&str>,
1002 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
1003 where
1004 E: PersistedRow<Canister = C> + EntityValue,
1005 {
1006 let plan = self.cached_prepared_query_plan_for_entity::<E>(query)?.0;
1008
1009 let (result, trace) = self.execute_grouped_plan_with_trace(plan, cursor_token)?;
1012
1013 finalize_structural_grouped_projection_result(result, trace)
1014 }
1015
1016 pub(in crate::db::session) fn execute_grouped_plan_with<E, T>(
1019 &self,
1020 plan: PreparedExecutionPlan<E>,
1021 cursor_token: Option<&str>,
1022 op: impl FnOnce(
1023 LoadExecutor<E>,
1024 PreparedExecutionPlan<E>,
1025 crate::db::cursor::GroupedPlannedCursor,
1026 ) -> Result<T, InternalError>,
1027 ) -> Result<T, QueryError>
1028 where
1029 E: PersistedRow<Canister = C> + EntityValue,
1030 {
1031 Self::ensure_grouped_execution_family(
1033 plan.execution_family().map_err(QueryError::execute)?,
1034 )?;
1035
1036 let cursor = decode_optional_grouped_cursor_token(cursor_token)
1038 .map_err(QueryError::from_cursor_plan_error)?;
1039 let cursor = plan
1040 .prepare_grouped_cursor_token(cursor)
1041 .map_err(query_error_from_executor_plan_error)?;
1042
1043 self.with_metrics(|| op(self.load_executor::<E>(), plan, cursor))
1046 .map_err(QueryError::execute)
1047 }
1048
1049 pub(in crate::db::session) fn execute_grouped_plan_with_trace<E>(
1051 &self,
1052 plan: PreparedExecutionPlan<E>,
1053 cursor_token: Option<&str>,
1054 ) -> Result<(StructuralGroupedProjectionResult, Option<ExecutionTrace>), QueryError>
1055 where
1056 E: PersistedRow<Canister = C> + EntityValue,
1057 {
1058 self.execute_grouped_plan_with(plan, cursor_token, |executor, plan, cursor| {
1059 executor.execute_grouped_paged_with_cursor_traced(plan, cursor)
1060 })
1061 }
1062}