1mod cache;
8
9#[cfg(feature = "diagnostics")]
10use crate::db::diagnostics::measure_local_instruction_delta as measure_query_stage;
11#[cfg(feature = "diagnostics")]
12use crate::db::executor::{
13 GroupedCountAttribution, GroupedExecutePhaseAttribution, ScalarExecutePhaseAttribution,
14};
15use crate::{
16 db::{
17 DbSession, EntityResponse, LoadQueryResult, PagedGroupedExecutionWithTrace,
18 PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
19 TraceExecutionFamily,
20 access::summarize_executable_access_plan,
21 cursor::{
22 CursorPlanError, decode_optional_cursor_token, decode_optional_grouped_cursor_token,
23 },
24 diagnostics::ExecutionTrace,
25 executor::{
26 ExecutionFamily, ExecutorPlanError, LoadExecutor, PreparedExecutionPlan,
27 ScalarProjectionBoundaryOutput, ScalarProjectionBoundaryRequest,
28 ScalarTerminalBoundaryOutput, ScalarTerminalBoundaryRequest,
29 StructuralGroupedProjectionResult,
30 },
31 query::builder::{
32 PreparedFluentAggregateExplainStrategy,
33 PreparedFluentExistingRowsTerminalRuntimeRequest,
34 PreparedFluentExistingRowsTerminalStrategy, PreparedFluentNumericFieldStrategy,
35 PreparedFluentOrderSensitiveTerminalStrategy, PreparedFluentProjectionRuntimeRequest,
36 PreparedFluentProjectionStrategy, PreparedFluentScalarTerminalStrategy,
37 },
38 query::explain::{
39 ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
40 },
41 query::fluent::load::{FluentProjectionTerminalOutput, FluentScalarTerminalOutput},
42 query::{
43 intent::{CompiledQuery, PlannedQuery},
44 plan::{AccessPlannedQuery, FieldSlot, QueryMode, VisibleIndexes},
45 },
46 session::{finalize_scalar_paged_execution, finalize_structural_grouped_projection_result},
47 },
48 error::InternalError,
49 traits::{CanisterKind, EntityKind, EntityValue},
50 types::{Decimal, Id},
51 value::Value,
52};
53pub(in crate::db) use cache::QueryPlanCacheAttribution;
54#[cfg(test)]
55pub(in crate::db) use cache::QueryPlanVisibility;
56pub(in crate::db::session) use cache::query_plan_cache_reuse_event;
57#[cfg(feature = "diagnostics")]
58use candid::CandidType;
59#[cfg(feature = "diagnostics")]
60use serde::Deserialize;
61
62const fn trace_execution_family_from_executor(family: ExecutionFamily) -> TraceExecutionFamily {
65 match family {
66 ExecutionFamily::PrimaryKey => TraceExecutionFamily::PrimaryKey,
67 ExecutionFamily::Ordered => TraceExecutionFamily::Ordered,
68 ExecutionFamily::Grouped => TraceExecutionFamily::Grouped,
69 }
70}
71
72pub(in crate::db::session) fn query_error_from_executor_plan_error(
75 err: ExecutorPlanError,
76) -> QueryError {
77 match err {
78 ExecutorPlanError::Cursor(err) => QueryError::from_cursor_plan_error(*err),
79 }
80}
81
82#[cfg(feature = "diagnostics")]
92#[derive(CandidType, Clone, Debug, Default, Deserialize, Eq, PartialEq)]
93pub struct QueryExecutionAttribution {
94 pub compile_local_instructions: u64,
95 pub plan_lookup_local_instructions: u64,
96 pub executor_invocation_local_instructions: u64,
97 pub response_finalization_local_instructions: u64,
98 pub runtime_local_instructions: u64,
99 pub finalize_local_instructions: u64,
100 pub direct_data_row_scan_local_instructions: u64,
101 pub direct_data_row_key_stream_local_instructions: u64,
102 pub direct_data_row_row_read_local_instructions: u64,
103 pub direct_data_row_key_encode_local_instructions: u64,
104 pub direct_data_row_store_get_local_instructions: u64,
105 pub direct_data_row_order_window_local_instructions: u64,
106 pub direct_data_row_page_window_local_instructions: u64,
107 pub grouped_stream_local_instructions: u64,
108 pub grouped_fold_local_instructions: u64,
109 pub grouped_finalize_local_instructions: u64,
110 pub grouped_count_borrowed_hash_computations: u64,
111 pub grouped_count_bucket_candidate_checks: u64,
112 pub grouped_count_existing_group_hits: u64,
113 pub grouped_count_new_group_inserts: u64,
114 pub grouped_count_row_materialization_local_instructions: u64,
115 pub grouped_count_group_lookup_local_instructions: u64,
116 pub grouped_count_existing_group_update_local_instructions: u64,
117 pub grouped_count_new_group_insert_local_instructions: u64,
118 pub response_decode_local_instructions: u64,
119 pub execute_local_instructions: u64,
120 pub total_local_instructions: u64,
121 pub shared_query_plan_cache_hits: u64,
122 pub shared_query_plan_cache_misses: u64,
123}
124
125#[cfg(feature = "diagnostics")]
126#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
127struct QueryExecutePhaseAttribution {
128 executor_invocation_local_instructions: u64,
129 response_finalization_local_instructions: u64,
130 runtime_local_instructions: u64,
131 finalize_local_instructions: u64,
132 direct_data_row_scan_local_instructions: u64,
133 direct_data_row_key_stream_local_instructions: u64,
134 direct_data_row_row_read_local_instructions: u64,
135 direct_data_row_key_encode_local_instructions: u64,
136 direct_data_row_store_get_local_instructions: u64,
137 direct_data_row_order_window_local_instructions: u64,
138 direct_data_row_page_window_local_instructions: u64,
139 grouped_stream_local_instructions: u64,
140 grouped_fold_local_instructions: u64,
141 grouped_finalize_local_instructions: u64,
142 grouped_count: GroupedCountAttribution,
143}
144
145impl<C: CanisterKind> DbSession<C> {
146 #[cfg(feature = "diagnostics")]
147 const fn empty_query_execute_phase_attribution() -> QueryExecutePhaseAttribution {
148 QueryExecutePhaseAttribution {
149 executor_invocation_local_instructions: 0,
150 response_finalization_local_instructions: 0,
151 runtime_local_instructions: 0,
152 finalize_local_instructions: 0,
153 direct_data_row_scan_local_instructions: 0,
154 direct_data_row_key_stream_local_instructions: 0,
155 direct_data_row_row_read_local_instructions: 0,
156 direct_data_row_key_encode_local_instructions: 0,
157 direct_data_row_store_get_local_instructions: 0,
158 direct_data_row_order_window_local_instructions: 0,
159 direct_data_row_page_window_local_instructions: 0,
160 grouped_stream_local_instructions: 0,
161 grouped_fold_local_instructions: 0,
162 grouped_finalize_local_instructions: 0,
163 grouped_count: GroupedCountAttribution::none(),
164 }
165 }
166
167 #[cfg(feature = "diagnostics")]
168 const fn scalar_query_execute_phase_attribution(
169 phase: ScalarExecutePhaseAttribution,
170 executor_invocation_local_instructions: u64,
171 ) -> QueryExecutePhaseAttribution {
172 QueryExecutePhaseAttribution {
173 executor_invocation_local_instructions,
174 response_finalization_local_instructions: 0,
175 runtime_local_instructions: phase.runtime_local_instructions,
176 finalize_local_instructions: phase.finalize_local_instructions,
177 direct_data_row_scan_local_instructions: phase.direct_data_row_scan_local_instructions,
178 direct_data_row_key_stream_local_instructions: phase
179 .direct_data_row_key_stream_local_instructions,
180 direct_data_row_row_read_local_instructions: phase
181 .direct_data_row_row_read_local_instructions,
182 direct_data_row_key_encode_local_instructions: phase
183 .direct_data_row_key_encode_local_instructions,
184 direct_data_row_store_get_local_instructions: phase
185 .direct_data_row_store_get_local_instructions,
186 direct_data_row_order_window_local_instructions: phase
187 .direct_data_row_order_window_local_instructions,
188 direct_data_row_page_window_local_instructions: phase
189 .direct_data_row_page_window_local_instructions,
190 grouped_stream_local_instructions: 0,
191 grouped_fold_local_instructions: 0,
192 grouped_finalize_local_instructions: 0,
193 grouped_count: GroupedCountAttribution::none(),
194 }
195 }
196
197 #[cfg(feature = "diagnostics")]
198 const fn grouped_query_execute_phase_attribution(
199 phase: GroupedExecutePhaseAttribution,
200 executor_invocation_local_instructions: u64,
201 response_finalization_local_instructions: u64,
202 ) -> QueryExecutePhaseAttribution {
203 QueryExecutePhaseAttribution {
204 executor_invocation_local_instructions,
205 response_finalization_local_instructions,
206 runtime_local_instructions: phase
207 .stream_local_instructions
208 .saturating_add(phase.fold_local_instructions),
209 finalize_local_instructions: phase.finalize_local_instructions,
210 direct_data_row_scan_local_instructions: 0,
211 direct_data_row_key_stream_local_instructions: 0,
212 direct_data_row_row_read_local_instructions: 0,
213 direct_data_row_key_encode_local_instructions: 0,
214 direct_data_row_store_get_local_instructions: 0,
215 direct_data_row_order_window_local_instructions: 0,
216 direct_data_row_page_window_local_instructions: 0,
217 grouped_stream_local_instructions: phase.stream_local_instructions,
218 grouped_fold_local_instructions: phase.fold_local_instructions,
219 grouped_finalize_local_instructions: phase.finalize_local_instructions,
220 grouped_count: phase.grouped_count,
221 }
222 }
223
224 pub(in crate::db) fn compile_query_with_visible_indexes<E>(
227 &self,
228 query: &Query<E>,
229 ) -> Result<CompiledQuery<E>, QueryError>
230 where
231 E: EntityKind<Canister = C>,
232 {
233 self.map_cached_shared_query_plan_for_entity(query, CompiledQuery::<E>::from_plan)
234 }
235
236 pub(in crate::db) fn planned_query_with_visible_indexes<E>(
239 &self,
240 query: &Query<E>,
241 ) -> Result<PlannedQuery<E>, QueryError>
242 where
243 E: EntityKind<Canister = C>,
244 {
245 self.map_cached_shared_query_plan_for_entity(query, PlannedQuery::<E>::from_plan)
246 }
247
248 fn cached_logical_query_plan<E>(
251 &self,
252 query: &Query<E>,
253 ) -> Result<AccessPlannedQuery, QueryError>
254 where
255 E: EntityKind<Canister = C>,
256 {
257 let (prepared_plan, _) = self.cached_shared_query_plan_for_entity::<E>(query)?;
258
259 Ok(prepared_plan.logical_plan().clone())
260 }
261
262 fn cached_finalized_execution_explain_plan<E>(
266 &self,
267 query: &Query<E>,
268 visible_indexes: &VisibleIndexes<'_>,
269 ) -> Result<(AccessPlannedQuery, QueryPlanCacheAttribution), QueryError>
270 where
271 E: EntityKind<Canister = C>,
272 {
273 let (prepared_plan, cache_attribution) =
274 self.cached_shared_query_plan_for_entity::<E>(query)?;
275 let mut plan = prepared_plan.logical_plan().clone();
276
277 plan.finalize_access_choice_for_model_with_indexes(
278 query.structural().model(),
279 visible_indexes.as_slice(),
280 );
281
282 Ok((plan, cache_attribution))
283 }
284
285 pub(in crate::db) fn explain_query_with_visible_indexes<E>(
287 &self,
288 query: &Query<E>,
289 ) -> Result<ExplainPlan, QueryError>
290 where
291 E: EntityKind<Canister = C>,
292 {
293 let plan = self.cached_logical_query_plan(query)?;
294
295 Ok(plan.explain())
296 }
297
298 pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
301 &self,
302 query: &Query<E>,
303 ) -> Result<String, QueryError>
304 where
305 E: EntityKind<Canister = C>,
306 {
307 let plan = self.cached_logical_query_plan(query)?;
308
309 Ok(plan.fingerprint().to_string())
310 }
311
312 pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
315 &self,
316 query: &Query<E>,
317 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
318 where
319 E: EntityValue + EntityKind<Canister = C>,
320 {
321 self.with_query_visible_indexes(query, |query, visible_indexes| {
322 let (plan, _) =
323 self.cached_finalized_execution_explain_plan::<E>(query, visible_indexes)?;
324
325 query
326 .structural()
327 .explain_execution_descriptor_from_plan(&plan)
328 })
329 }
330
331 pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
334 &self,
335 query: &Query<E>,
336 ) -> Result<String, QueryError>
337 where
338 E: EntityValue + EntityKind<Canister = C>,
339 {
340 self.with_query_visible_indexes(query, |query, visible_indexes| {
341 let (plan, cache_attribution) =
342 self.cached_finalized_execution_explain_plan::<E>(query, visible_indexes)?;
343
344 query
345 .structural()
346 .finalized_execution_diagnostics_from_plan_with_descriptor_mutator(
347 &plan,
348 Some(query_plan_cache_reuse_event(cache_attribution)),
349 |_| {},
350 )
351 .map(|diagnostics| diagnostics.render_text_verbose())
352 })
353 }
354
355 pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
358 &self,
359 query: &Query<E>,
360 strategy: &S,
361 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
362 where
363 E: EntityValue + EntityKind<Canister = C>,
364 S: PreparedFluentAggregateExplainStrategy,
365 {
366 self.with_query_visible_indexes(query, |query, visible_indexes| {
367 query
368 .explain_prepared_aggregate_terminal_with_visible_indexes(visible_indexes, strategy)
369 })
370 }
371
372 pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
375 &self,
376 query: &Query<E>,
377 target_field: &str,
378 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
379 where
380 E: EntityValue + EntityKind<Canister = C>,
381 {
382 self.with_query_visible_indexes(query, |query, visible_indexes| {
383 query.explain_bytes_by_with_visible_indexes(visible_indexes, target_field)
384 })
385 }
386
387 pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
390 &self,
391 query: &Query<E>,
392 strategy: &PreparedFluentProjectionStrategy,
393 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
394 where
395 E: EntityValue + EntityKind<Canister = C>,
396 {
397 self.with_query_visible_indexes(query, |query, visible_indexes| {
398 query.explain_prepared_projection_terminal_with_visible_indexes(
399 visible_indexes,
400 strategy,
401 )
402 })
403 }
404
405 fn ensure_scalar_paged_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
408 match family {
409 ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
410 CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
411 )),
412 ExecutionFamily::Ordered => Ok(()),
413 ExecutionFamily::Grouped => Err(QueryError::invariant(
414 "grouped queries execute via execute(), not page().execute()",
415 )),
416 }
417 }
418
419 fn ensure_grouped_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
422 match family {
423 ExecutionFamily::Grouped => Ok(()),
424 ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
425 "grouped execution requires grouped logical plans",
426 )),
427 }
428 }
429
430 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
432 where
433 E: PersistedRow<Canister = C> + EntityValue,
434 {
435 let mode = query.mode();
437 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
438
439 self.execute_query_dyn(mode, plan)
441 }
442
443 #[cfg(feature = "diagnostics")]
446 #[doc(hidden)]
447 #[expect(
448 clippy::too_many_lines,
449 reason = "the diagnostics-only attribution path keeps grouped and scalar execution on one explicit compile/execute accounting seam"
450 )]
451 #[expect(
452 clippy::needless_update,
453 reason = "diagnostics attribution literals stay default-backed so future counters do not break every initializer"
454 )]
455 pub fn execute_query_result_with_attribution<E>(
456 &self,
457 query: &Query<E>,
458 ) -> Result<(LoadQueryResult<E>, QueryExecutionAttribution), QueryError>
459 where
460 E: PersistedRow<Canister = C> + EntityValue,
461 {
462 let (plan_lookup_local_instructions, plan_and_cache) =
467 measure_query_stage(|| self.cached_prepared_query_plan_for_entity::<E>(query));
468 let (plan, cache_attribution) = plan_and_cache?;
469 let compile_local_instructions = plan_lookup_local_instructions;
470
471 let result =
474 || -> Result<(LoadQueryResult<E>, QueryExecutePhaseAttribution, u64), QueryError> {
475 if query.has_grouping() {
476 let (executor_invocation_local_instructions, grouped_page) =
477 measure_query_stage(|| {
478 self.execute_grouped_plan_with(plan, None, |executor, plan, cursor| {
479 executor
480 .execute_grouped_paged_with_cursor_traced_with_phase_attribution(
481 plan, cursor,
482 )
483 })
484 });
485 let (result, trace, phase_attribution) = grouped_page?;
486 let (response_finalization_local_instructions, grouped) =
487 measure_query_stage(|| {
488 finalize_structural_grouped_projection_result(result, trace)
489 });
490 let grouped = grouped?;
491
492 Ok((
493 LoadQueryResult::Grouped(grouped),
494 Self::grouped_query_execute_phase_attribution(
495 phase_attribution,
496 executor_invocation_local_instructions,
497 response_finalization_local_instructions,
498 ),
499 0,
500 ))
501 } else {
502 match query.mode() {
503 QueryMode::Load(_) => {
504 let (executor_invocation_local_instructions, executed) =
505 measure_query_stage(|| {
506 self.load_executor::<E>()
507 .execute_with_phase_attribution(plan)
508 .map_err(QueryError::execute)
509 });
510 let (rows, phase_attribution, response_decode_local_instructions) =
511 executed?;
512
513 Ok((
514 LoadQueryResult::Rows(rows),
515 Self::scalar_query_execute_phase_attribution(
516 phase_attribution,
517 executor_invocation_local_instructions,
518 ),
519 response_decode_local_instructions,
520 ))
521 }
522 QueryMode::Delete(_) => {
523 let (executor_invocation_local_instructions, result) =
524 measure_query_stage(|| self.execute_query_dyn(query.mode(), plan));
525 let result = result?;
526
527 Ok((
528 LoadQueryResult::Rows(result),
529 QueryExecutePhaseAttribution {
530 executor_invocation_local_instructions,
531 ..Self::empty_query_execute_phase_attribution()
532 },
533 0,
534 ))
535 }
536 }
537 }
538 }();
539 let (result, execute_phase_attribution, response_decode_local_instructions) = result?;
540 let execute_local_instructions = execute_phase_attribution
541 .executor_invocation_local_instructions
542 .saturating_add(execute_phase_attribution.response_finalization_local_instructions);
543 let total_local_instructions =
544 compile_local_instructions.saturating_add(execute_local_instructions);
545
546 Ok((
547 result,
548 QueryExecutionAttribution {
549 compile_local_instructions,
550 plan_lookup_local_instructions,
551 executor_invocation_local_instructions: execute_phase_attribution
552 .executor_invocation_local_instructions,
553 response_finalization_local_instructions: execute_phase_attribution
554 .response_finalization_local_instructions,
555 runtime_local_instructions: execute_phase_attribution.runtime_local_instructions,
556 finalize_local_instructions: execute_phase_attribution.finalize_local_instructions,
557 direct_data_row_scan_local_instructions: execute_phase_attribution
558 .direct_data_row_scan_local_instructions,
559 direct_data_row_key_stream_local_instructions: execute_phase_attribution
560 .direct_data_row_key_stream_local_instructions,
561 direct_data_row_row_read_local_instructions: execute_phase_attribution
562 .direct_data_row_row_read_local_instructions,
563 direct_data_row_key_encode_local_instructions: execute_phase_attribution
564 .direct_data_row_key_encode_local_instructions,
565 direct_data_row_store_get_local_instructions: execute_phase_attribution
566 .direct_data_row_store_get_local_instructions,
567 direct_data_row_order_window_local_instructions: execute_phase_attribution
568 .direct_data_row_order_window_local_instructions,
569 direct_data_row_page_window_local_instructions: execute_phase_attribution
570 .direct_data_row_page_window_local_instructions,
571 grouped_stream_local_instructions: execute_phase_attribution
572 .grouped_stream_local_instructions,
573 grouped_fold_local_instructions: execute_phase_attribution
574 .grouped_fold_local_instructions,
575 grouped_finalize_local_instructions: execute_phase_attribution
576 .grouped_finalize_local_instructions,
577 grouped_count_borrowed_hash_computations: execute_phase_attribution
578 .grouped_count
579 .borrowed_hash_computations,
580 grouped_count_bucket_candidate_checks: execute_phase_attribution
581 .grouped_count
582 .bucket_candidate_checks,
583 grouped_count_existing_group_hits: execute_phase_attribution
584 .grouped_count
585 .existing_group_hits,
586 grouped_count_new_group_inserts: execute_phase_attribution
587 .grouped_count
588 .new_group_inserts,
589 grouped_count_row_materialization_local_instructions: execute_phase_attribution
590 .grouped_count
591 .row_materialization_local_instructions,
592 grouped_count_group_lookup_local_instructions: execute_phase_attribution
593 .grouped_count
594 .group_lookup_local_instructions,
595 grouped_count_existing_group_update_local_instructions: execute_phase_attribution
596 .grouped_count
597 .existing_group_update_local_instructions,
598 grouped_count_new_group_insert_local_instructions: execute_phase_attribution
599 .grouped_count
600 .new_group_insert_local_instructions,
601 response_decode_local_instructions,
602 execute_local_instructions,
603 total_local_instructions,
604 shared_query_plan_cache_hits: cache_attribution.hits,
605 shared_query_plan_cache_misses: cache_attribution.misses,
606 ..QueryExecutionAttribution::default()
607 },
608 ))
609 }
610
611 #[doc(hidden)]
614 pub fn execute_query_result<E>(
615 &self,
616 query: &Query<E>,
617 ) -> Result<LoadQueryResult<E>, QueryError>
618 where
619 E: PersistedRow<Canister = C> + EntityValue,
620 {
621 if query.has_grouping() {
622 return self
623 .execute_grouped(query, None)
624 .map(LoadQueryResult::Grouped);
625 }
626
627 self.execute_query(query).map(LoadQueryResult::Rows)
628 }
629
630 #[doc(hidden)]
632 pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
633 where
634 E: PersistedRow<Canister = C> + EntityValue,
635 {
636 if !query.mode().is_delete() {
638 return Err(QueryError::unsupported_query(
639 "delete count execution requires delete query mode",
640 ));
641 }
642
643 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
647
648 self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
650 .map_err(QueryError::execute)
651 }
652
653 pub(in crate::db) fn execute_query_dyn<E>(
658 &self,
659 mode: QueryMode,
660 plan: PreparedExecutionPlan<E>,
661 ) -> Result<EntityResponse<E>, QueryError>
662 where
663 E: PersistedRow<Canister = C> + EntityValue,
664 {
665 let result = match mode {
666 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
667 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
668 };
669
670 result.map_err(QueryError::execute)
671 }
672
673 pub(in crate::db) fn execute_load_query_with<E, T>(
676 &self,
677 query: &Query<E>,
678 op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
679 ) -> Result<T, QueryError>
680 where
681 E: PersistedRow<Canister = C> + EntityValue,
682 {
683 let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
684
685 self.with_metrics(|| op(self.load_executor::<E>(), plan))
686 .map_err(QueryError::execute)
687 }
688
689 fn execute_scalar_terminal_boundary<E>(
692 &self,
693 query: &Query<E>,
694 request: ScalarTerminalBoundaryRequest,
695 ) -> Result<ScalarTerminalBoundaryOutput, QueryError>
696 where
697 E: PersistedRow<Canister = C> + EntityValue,
698 {
699 self.execute_load_query_with(query, move |load, plan| {
700 load.execute_scalar_terminal_request(plan, request)
701 })
702 }
703
704 fn execute_scalar_projection_boundary<E>(
707 &self,
708 query: &Query<E>,
709 target_field: FieldSlot,
710 request: ScalarProjectionBoundaryRequest,
711 ) -> Result<ScalarProjectionBoundaryOutput, QueryError>
712 where
713 E: PersistedRow<Canister = C> + EntityValue,
714 {
715 self.execute_load_query_with(query, move |load, plan| {
716 load.execute_scalar_projection_boundary(plan, target_field, request)
717 })
718 }
719
720 pub(in crate::db) fn execute_fluent_existing_rows_terminal<E>(
723 &self,
724 query: &Query<E>,
725 strategy: PreparedFluentExistingRowsTerminalStrategy,
726 ) -> Result<FluentScalarTerminalOutput<E>, QueryError>
727 where
728 E: PersistedRow<Canister = C> + EntityValue,
729 {
730 let (request, output_shape) = strategy.into_executor_request();
731 let output = self.execute_scalar_terminal_boundary(query, request)?;
732
733 match output_shape {
734 PreparedFluentExistingRowsTerminalRuntimeRequest::CountRows => output
735 .into_count()
736 .map(FluentScalarTerminalOutput::Count)
737 .map_err(QueryError::execute),
738 PreparedFluentExistingRowsTerminalRuntimeRequest::ExistsRows => output
739 .into_exists()
740 .map(FluentScalarTerminalOutput::Exists)
741 .map_err(QueryError::execute),
742 }
743 }
744
745 pub(in crate::db) fn execute_fluent_scalar_terminal<E>(
748 &self,
749 query: &Query<E>,
750 strategy: PreparedFluentScalarTerminalStrategy,
751 ) -> Result<FluentScalarTerminalOutput<E>, QueryError>
752 where
753 E: PersistedRow<Canister = C> + EntityValue,
754 {
755 let request = strategy.into_executor_request();
756
757 self.execute_scalar_terminal_boundary(query, request)?
758 .into_id::<E>()
759 .map(FluentScalarTerminalOutput::Id)
760 .map_err(QueryError::execute)
761 }
762
763 pub(in crate::db) fn execute_fluent_order_sensitive_terminal<E>(
766 &self,
767 query: &Query<E>,
768 strategy: PreparedFluentOrderSensitiveTerminalStrategy,
769 ) -> Result<FluentScalarTerminalOutput<E>, QueryError>
770 where
771 E: PersistedRow<Canister = C> + EntityValue,
772 {
773 let (request, returns_id_pair) = strategy.into_executor_request();
774 let output = self.execute_scalar_terminal_boundary(query, request)?;
775
776 if returns_id_pair {
777 return output
778 .into_id_pair::<E>()
779 .map(FluentScalarTerminalOutput::IdPair)
780 .map_err(QueryError::execute);
781 }
782
783 output
784 .into_id::<E>()
785 .map(FluentScalarTerminalOutput::Id)
786 .map_err(QueryError::execute)
787 }
788
789 pub(in crate::db) fn execute_fluent_numeric_field_terminal<E>(
792 &self,
793 query: &Query<E>,
794 strategy: PreparedFluentNumericFieldStrategy,
795 ) -> Result<Option<Decimal>, QueryError>
796 where
797 E: PersistedRow<Canister = C> + EntityValue,
798 {
799 let (target_field, request) = strategy.into_executor_request();
800
801 self.execute_load_query_with(query, move |load, plan| {
802 load.execute_numeric_field_boundary(plan, target_field, request)
803 })
804 }
805
806 pub(in crate::db) fn execute_fluent_projection_terminal<E>(
809 &self,
810 query: &Query<E>,
811 strategy: PreparedFluentProjectionStrategy,
812 ) -> Result<FluentProjectionTerminalOutput<E>, QueryError>
813 where
814 E: PersistedRow<Canister = C> + EntityValue,
815 {
816 let (target_field, request, output_shape) = strategy.into_executor_request();
817 let output = self.execute_scalar_projection_boundary(query, target_field, request)?;
818
819 match output_shape {
820 PreparedFluentProjectionRuntimeRequest::Values
821 | PreparedFluentProjectionRuntimeRequest::DistinctValues => output
822 .into_values()
823 .map(FluentProjectionTerminalOutput::Values)
824 .map_err(QueryError::execute),
825 PreparedFluentProjectionRuntimeRequest::CountDistinct => output
826 .into_count()
827 .map(FluentProjectionTerminalOutput::Count)
828 .map_err(QueryError::execute),
829 PreparedFluentProjectionRuntimeRequest::ValuesWithIds => output
830 .into_values_with_ids::<E>()
831 .map(FluentProjectionTerminalOutput::ValuesWithIds)
832 .map_err(QueryError::execute),
833 PreparedFluentProjectionRuntimeRequest::TerminalValue { .. } => output
834 .into_terminal_value()
835 .map(FluentProjectionTerminalOutput::TerminalValue)
836 .map_err(QueryError::execute),
837 }
838 }
839
840 pub(in crate::db) fn execute_fluent_bytes<E>(&self, query: &Query<E>) -> Result<u64, QueryError>
843 where
844 E: PersistedRow<Canister = C> + EntityValue,
845 {
846 self.execute_load_query_with(query, |load, plan| load.bytes(plan))
847 }
848
849 pub(in crate::db) fn execute_fluent_bytes_by_slot<E>(
851 &self,
852 query: &Query<E>,
853 target_slot: FieldSlot,
854 ) -> Result<u64, QueryError>
855 where
856 E: PersistedRow<Canister = C> + EntityValue,
857 {
858 self.execute_load_query_with(query, move |load, plan| {
859 load.bytes_by_slot(plan, target_slot)
860 })
861 }
862
863 pub(in crate::db) fn execute_fluent_take<E>(
865 &self,
866 query: &Query<E>,
867 take_count: u32,
868 ) -> Result<EntityResponse<E>, QueryError>
869 where
870 E: PersistedRow<Canister = C> + EntityValue,
871 {
872 self.execute_load_query_with(query, move |load, plan| load.take(plan, take_count))
873 }
874
875 pub(in crate::db) fn execute_fluent_ranked_rows_by_slot<E>(
877 &self,
878 query: &Query<E>,
879 target_slot: FieldSlot,
880 take_count: u32,
881 descending: bool,
882 ) -> Result<EntityResponse<E>, QueryError>
883 where
884 E: PersistedRow<Canister = C> + EntityValue,
885 {
886 self.execute_load_query_with(query, move |load, plan| {
887 if descending {
888 load.top_k_by_slot(plan, target_slot, take_count)
889 } else {
890 load.bottom_k_by_slot(plan, target_slot, take_count)
891 }
892 })
893 }
894
895 pub(in crate::db) fn execute_fluent_ranked_values_by_slot<E>(
897 &self,
898 query: &Query<E>,
899 target_slot: FieldSlot,
900 take_count: u32,
901 descending: bool,
902 ) -> Result<Vec<Value>, QueryError>
903 where
904 E: PersistedRow<Canister = C> + EntityValue,
905 {
906 self.execute_load_query_with(query, move |load, plan| {
907 if descending {
908 load.top_k_by_values_slot(plan, target_slot, take_count)
909 } else {
910 load.bottom_k_by_values_slot(plan, target_slot, take_count)
911 }
912 })
913 }
914
915 pub(in crate::db) fn execute_fluent_ranked_values_with_ids_by_slot<E>(
917 &self,
918 query: &Query<E>,
919 target_slot: FieldSlot,
920 take_count: u32,
921 descending: bool,
922 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
923 where
924 E: PersistedRow<Canister = C> + EntityValue,
925 {
926 self.execute_load_query_with(query, move |load, plan| {
927 if descending {
928 load.top_k_by_with_ids_slot(plan, target_slot, take_count)
929 } else {
930 load.bottom_k_by_with_ids_slot(plan, target_slot, take_count)
931 }
932 })
933 }
934
935 pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
940 where
941 E: EntityKind<Canister = C>,
942 {
943 let (prepared_plan, cache_attribution) =
944 self.cached_prepared_query_plan_for_entity::<E>(query)?;
945 let logical_plan = prepared_plan.logical_plan();
946 let explain = logical_plan.explain();
947 let plan_hash = logical_plan.fingerprint().to_string();
948 let executable_access = prepared_plan.access().executable_contract();
949 let access_strategy = summarize_executable_access_plan(&executable_access);
950 let execution_family = match query.mode() {
951 QueryMode::Load(_) => Some(trace_execution_family_from_executor(
952 prepared_plan
953 .execution_family()
954 .map_err(QueryError::execute)?,
955 )),
956 QueryMode::Delete(_) => None,
957 };
958 let reuse = query_plan_cache_reuse_event(cache_attribution);
959
960 Ok(QueryTracePlan::new(
961 plan_hash,
962 access_strategy,
963 execution_family,
964 reuse,
965 explain,
966 ))
967 }
968
969 pub(crate) fn execute_load_query_paged_with_trace<E>(
971 &self,
972 query: &Query<E>,
973 cursor_token: Option<&str>,
974 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
975 where
976 E: PersistedRow<Canister = C> + EntityValue,
977 {
978 let plan = self.cached_prepared_query_plan_for_entity::<E>(query)?.0;
980 Self::ensure_scalar_paged_execution_family(
981 plan.execution_family().map_err(QueryError::execute)?,
982 )?;
983
984 let cursor_bytes = decode_optional_cursor_token(cursor_token)
986 .map_err(QueryError::from_cursor_plan_error)?;
987 let cursor = plan
988 .prepare_cursor(cursor_bytes.as_deref())
989 .map_err(query_error_from_executor_plan_error)?;
990
991 let (page, trace) = self
993 .with_metrics(|| {
994 self.load_executor::<E>()
995 .execute_paged_with_cursor_traced(plan, cursor)
996 })
997 .map_err(QueryError::execute)?;
998 finalize_scalar_paged_execution(page, trace)
999 }
1000
1001 pub(in crate::db) fn execute_grouped<E>(
1006 &self,
1007 query: &Query<E>,
1008 cursor_token: Option<&str>,
1009 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
1010 where
1011 E: PersistedRow<Canister = C> + EntityValue,
1012 {
1013 let plan = self.cached_prepared_query_plan_for_entity::<E>(query)?.0;
1015
1016 let (result, trace) = self.execute_grouped_plan_with_trace(plan, cursor_token)?;
1019
1020 finalize_structural_grouped_projection_result(result, trace)
1021 }
1022
1023 pub(in crate::db::session) fn execute_grouped_plan_with<E, T>(
1026 &self,
1027 plan: PreparedExecutionPlan<E>,
1028 cursor_token: Option<&str>,
1029 op: impl FnOnce(
1030 LoadExecutor<E>,
1031 PreparedExecutionPlan<E>,
1032 crate::db::cursor::GroupedPlannedCursor,
1033 ) -> Result<T, InternalError>,
1034 ) -> Result<T, QueryError>
1035 where
1036 E: PersistedRow<Canister = C> + EntityValue,
1037 {
1038 Self::ensure_grouped_execution_family(
1040 plan.execution_family().map_err(QueryError::execute)?,
1041 )?;
1042
1043 let cursor = decode_optional_grouped_cursor_token(cursor_token)
1045 .map_err(QueryError::from_cursor_plan_error)?;
1046 let cursor = plan
1047 .prepare_grouped_cursor_token(cursor)
1048 .map_err(query_error_from_executor_plan_error)?;
1049
1050 self.with_metrics(|| op(self.load_executor::<E>(), plan, cursor))
1053 .map_err(QueryError::execute)
1054 }
1055
1056 pub(in crate::db::session) fn execute_grouped_plan_with_trace<E>(
1058 &self,
1059 plan: PreparedExecutionPlan<E>,
1060 cursor_token: Option<&str>,
1061 ) -> Result<(StructuralGroupedProjectionResult, Option<ExecutionTrace>), QueryError>
1062 where
1063 E: PersistedRow<Canister = C> + EntityValue,
1064 {
1065 self.execute_grouped_plan_with(plan, cursor_token, |executor, plan, cursor| {
1066 executor.execute_grouped_paged_with_cursor_traced(plan, cursor)
1067 })
1068 }
1069}