mod cache;
#[cfg(feature = "diagnostics")]
use crate::db::diagnostics::measure_local_instruction_delta as measure_query_stage;
#[cfg(feature = "diagnostics")]
use crate::db::executor::{
GroupedCountAttribution, GroupedExecutePhaseAttribution, ScalarExecutePhaseAttribution,
};
use crate::{
db::{
DbSession, EntityResponse, LoadQueryResult, PagedGroupedExecutionWithTrace,
PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
TraceExecutionFamily,
access::summarize_executable_access_plan,
cursor::{
CursorPlanError, decode_optional_cursor_token, decode_optional_grouped_cursor_token,
},
diagnostics::ExecutionTrace,
executor::{
ExecutionFamily, ExecutorPlanError, LoadExecutor, PreparedExecutionPlan,
ScalarProjectionBoundaryOutput, ScalarProjectionBoundaryRequest,
ScalarTerminalBoundaryOutput, ScalarTerminalBoundaryRequest,
StructuralGroupedProjectionResult,
},
query::builder::{
PreparedFluentAggregateExplainStrategy,
PreparedFluentExistingRowsTerminalRuntimeRequest,
PreparedFluentExistingRowsTerminalStrategy, PreparedFluentNumericFieldStrategy,
PreparedFluentOrderSensitiveTerminalStrategy, PreparedFluentProjectionRuntimeRequest,
PreparedFluentProjectionStrategy, PreparedFluentScalarTerminalStrategy,
},
query::explain::{
ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
},
query::fluent::load::{FluentProjectionTerminalOutput, FluentScalarTerminalOutput},
query::{
intent::{CompiledQuery, PlannedQuery},
plan::{AccessPlannedQuery, FieldSlot, QueryMode, VisibleIndexes},
},
session::{finalize_scalar_paged_execution, finalize_structural_grouped_projection_result},
},
error::InternalError,
traits::{CanisterKind, EntityKind, EntityValue},
types::{Decimal, Id},
value::Value,
};
pub(in crate::db) use cache::QueryPlanCacheAttribution;
#[cfg(test)]
pub(in crate::db) use cache::QueryPlanVisibility;
pub(in crate::db::session) use cache::query_plan_cache_reuse_event;
#[cfg(feature = "diagnostics")]
use candid::CandidType;
#[cfg(feature = "diagnostics")]
use serde::Deserialize;
const fn trace_execution_family_from_executor(family: ExecutionFamily) -> TraceExecutionFamily {
match family {
ExecutionFamily::PrimaryKey => TraceExecutionFamily::PrimaryKey,
ExecutionFamily::Ordered => TraceExecutionFamily::Ordered,
ExecutionFamily::Grouped => TraceExecutionFamily::Grouped,
}
}
pub(in crate::db::session) fn query_error_from_executor_plan_error(
err: ExecutorPlanError,
) -> QueryError {
match err {
ExecutorPlanError::Cursor(err) => QueryError::from_cursor_plan_error(*err),
}
}
#[cfg(feature = "diagnostics")]
#[derive(CandidType, Clone, Debug, Default, Deserialize, Eq, PartialEq)]
pub struct QueryExecutionAttribution {
pub compile_local_instructions: u64,
pub plan_lookup_local_instructions: u64,
pub executor_invocation_local_instructions: u64,
pub response_finalization_local_instructions: u64,
pub runtime_local_instructions: u64,
pub finalize_local_instructions: u64,
pub direct_data_row_scan_local_instructions: u64,
pub direct_data_row_key_stream_local_instructions: u64,
pub direct_data_row_row_read_local_instructions: u64,
pub direct_data_row_key_encode_local_instructions: u64,
pub direct_data_row_store_get_local_instructions: u64,
pub direct_data_row_order_window_local_instructions: u64,
pub direct_data_row_page_window_local_instructions: u64,
pub grouped_stream_local_instructions: u64,
pub grouped_fold_local_instructions: u64,
pub grouped_finalize_local_instructions: u64,
pub grouped_count_borrowed_hash_computations: u64,
pub grouped_count_bucket_candidate_checks: u64,
pub grouped_count_existing_group_hits: u64,
pub grouped_count_new_group_inserts: u64,
pub grouped_count_row_materialization_local_instructions: u64,
pub grouped_count_group_lookup_local_instructions: u64,
pub grouped_count_existing_group_update_local_instructions: u64,
pub grouped_count_new_group_insert_local_instructions: u64,
pub response_decode_local_instructions: u64,
pub execute_local_instructions: u64,
pub total_local_instructions: u64,
pub shared_query_plan_cache_hits: u64,
pub shared_query_plan_cache_misses: u64,
}
#[cfg(feature = "diagnostics")]
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
struct QueryExecutePhaseAttribution {
executor_invocation_local_instructions: u64,
response_finalization_local_instructions: u64,
runtime_local_instructions: u64,
finalize_local_instructions: u64,
direct_data_row_scan_local_instructions: u64,
direct_data_row_key_stream_local_instructions: u64,
direct_data_row_row_read_local_instructions: u64,
direct_data_row_key_encode_local_instructions: u64,
direct_data_row_store_get_local_instructions: u64,
direct_data_row_order_window_local_instructions: u64,
direct_data_row_page_window_local_instructions: u64,
grouped_stream_local_instructions: u64,
grouped_fold_local_instructions: u64,
grouped_finalize_local_instructions: u64,
grouped_count: GroupedCountAttribution,
}
impl<C: CanisterKind> DbSession<C> {
#[cfg(feature = "diagnostics")]
const fn empty_query_execute_phase_attribution() -> QueryExecutePhaseAttribution {
QueryExecutePhaseAttribution {
executor_invocation_local_instructions: 0,
response_finalization_local_instructions: 0,
runtime_local_instructions: 0,
finalize_local_instructions: 0,
direct_data_row_scan_local_instructions: 0,
direct_data_row_key_stream_local_instructions: 0,
direct_data_row_row_read_local_instructions: 0,
direct_data_row_key_encode_local_instructions: 0,
direct_data_row_store_get_local_instructions: 0,
direct_data_row_order_window_local_instructions: 0,
direct_data_row_page_window_local_instructions: 0,
grouped_stream_local_instructions: 0,
grouped_fold_local_instructions: 0,
grouped_finalize_local_instructions: 0,
grouped_count: GroupedCountAttribution::none(),
}
}
#[cfg(feature = "diagnostics")]
const fn scalar_query_execute_phase_attribution(
phase: ScalarExecutePhaseAttribution,
executor_invocation_local_instructions: u64,
) -> QueryExecutePhaseAttribution {
QueryExecutePhaseAttribution {
executor_invocation_local_instructions,
response_finalization_local_instructions: 0,
runtime_local_instructions: phase.runtime_local_instructions,
finalize_local_instructions: phase.finalize_local_instructions,
direct_data_row_scan_local_instructions: phase.direct_data_row_scan_local_instructions,
direct_data_row_key_stream_local_instructions: phase
.direct_data_row_key_stream_local_instructions,
direct_data_row_row_read_local_instructions: phase
.direct_data_row_row_read_local_instructions,
direct_data_row_key_encode_local_instructions: phase
.direct_data_row_key_encode_local_instructions,
direct_data_row_store_get_local_instructions: phase
.direct_data_row_store_get_local_instructions,
direct_data_row_order_window_local_instructions: phase
.direct_data_row_order_window_local_instructions,
direct_data_row_page_window_local_instructions: phase
.direct_data_row_page_window_local_instructions,
grouped_stream_local_instructions: 0,
grouped_fold_local_instructions: 0,
grouped_finalize_local_instructions: 0,
grouped_count: GroupedCountAttribution::none(),
}
}
#[cfg(feature = "diagnostics")]
const fn grouped_query_execute_phase_attribution(
phase: GroupedExecutePhaseAttribution,
executor_invocation_local_instructions: u64,
response_finalization_local_instructions: u64,
) -> QueryExecutePhaseAttribution {
QueryExecutePhaseAttribution {
executor_invocation_local_instructions,
response_finalization_local_instructions,
runtime_local_instructions: phase
.stream_local_instructions
.saturating_add(phase.fold_local_instructions),
finalize_local_instructions: phase.finalize_local_instructions,
direct_data_row_scan_local_instructions: 0,
direct_data_row_key_stream_local_instructions: 0,
direct_data_row_row_read_local_instructions: 0,
direct_data_row_key_encode_local_instructions: 0,
direct_data_row_store_get_local_instructions: 0,
direct_data_row_order_window_local_instructions: 0,
direct_data_row_page_window_local_instructions: 0,
grouped_stream_local_instructions: phase.stream_local_instructions,
grouped_fold_local_instructions: phase.fold_local_instructions,
grouped_finalize_local_instructions: phase.finalize_local_instructions,
grouped_count: phase.grouped_count,
}
}
pub(in crate::db) fn compile_query_with_visible_indexes<E>(
&self,
query: &Query<E>,
) -> Result<CompiledQuery<E>, QueryError>
where
E: EntityKind<Canister = C>,
{
self.map_cached_shared_query_plan_for_entity(query, CompiledQuery::<E>::from_plan)
}
pub(in crate::db) fn planned_query_with_visible_indexes<E>(
&self,
query: &Query<E>,
) -> Result<PlannedQuery<E>, QueryError>
where
E: EntityKind<Canister = C>,
{
self.map_cached_shared_query_plan_for_entity(query, PlannedQuery::<E>::from_plan)
}
fn cached_logical_query_plan<E>(
&self,
query: &Query<E>,
) -> Result<AccessPlannedQuery, QueryError>
where
E: EntityKind<Canister = C>,
{
let (prepared_plan, _) = self.cached_shared_query_plan_for_entity::<E>(query)?;
Ok(prepared_plan.logical_plan().clone())
}
fn cached_finalized_execution_explain_plan<E>(
&self,
query: &Query<E>,
visible_indexes: &VisibleIndexes<'_>,
) -> Result<(AccessPlannedQuery, QueryPlanCacheAttribution), QueryError>
where
E: EntityKind<Canister = C>,
{
let (prepared_plan, cache_attribution) =
self.cached_shared_query_plan_for_entity::<E>(query)?;
let mut plan = prepared_plan.logical_plan().clone();
plan.finalize_access_choice_for_model_with_indexes(
query.structural().model(),
visible_indexes.as_slice(),
);
Ok((plan, cache_attribution))
}
pub(in crate::db) fn explain_query_with_visible_indexes<E>(
&self,
query: &Query<E>,
) -> Result<ExplainPlan, QueryError>
where
E: EntityKind<Canister = C>,
{
let plan = self.cached_logical_query_plan(query)?;
Ok(plan.explain())
}
pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
&self,
query: &Query<E>,
) -> Result<String, QueryError>
where
E: EntityKind<Canister = C>,
{
let plan = self.cached_logical_query_plan(query)?;
Ok(plan.fingerprint().to_string())
}
pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
&self,
query: &Query<E>,
) -> Result<ExplainExecutionNodeDescriptor, QueryError>
where
E: EntityValue + EntityKind<Canister = C>,
{
self.with_query_visible_indexes(query, |query, visible_indexes| {
let (plan, _) =
self.cached_finalized_execution_explain_plan::<E>(query, visible_indexes)?;
query
.structural()
.explain_execution_descriptor_from_plan(&plan)
})
}
pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
&self,
query: &Query<E>,
) -> Result<String, QueryError>
where
E: EntityValue + EntityKind<Canister = C>,
{
self.with_query_visible_indexes(query, |query, visible_indexes| {
let (plan, cache_attribution) =
self.cached_finalized_execution_explain_plan::<E>(query, visible_indexes)?;
query
.structural()
.finalized_execution_diagnostics_from_plan_with_descriptor_mutator(
&plan,
Some(query_plan_cache_reuse_event(cache_attribution)),
|_| {},
)
.map(|diagnostics| diagnostics.render_text_verbose())
})
}
pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
&self,
query: &Query<E>,
strategy: &S,
) -> Result<ExplainAggregateTerminalPlan, QueryError>
where
E: EntityValue + EntityKind<Canister = C>,
S: PreparedFluentAggregateExplainStrategy,
{
self.with_query_visible_indexes(query, |query, visible_indexes| {
query
.explain_prepared_aggregate_terminal_with_visible_indexes(visible_indexes, strategy)
})
}
pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
&self,
query: &Query<E>,
target_field: &str,
) -> Result<ExplainExecutionNodeDescriptor, QueryError>
where
E: EntityValue + EntityKind<Canister = C>,
{
self.with_query_visible_indexes(query, |query, visible_indexes| {
query.explain_bytes_by_with_visible_indexes(visible_indexes, target_field)
})
}
pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
&self,
query: &Query<E>,
strategy: &PreparedFluentProjectionStrategy,
) -> Result<ExplainExecutionNodeDescriptor, QueryError>
where
E: EntityValue + EntityKind<Canister = C>,
{
self.with_query_visible_indexes(query, |query, visible_indexes| {
query.explain_prepared_projection_terminal_with_visible_indexes(
visible_indexes,
strategy,
)
})
}
fn ensure_scalar_paged_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
match family {
ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
)),
ExecutionFamily::Ordered => Ok(()),
ExecutionFamily::Grouped => Err(QueryError::invariant(
"grouped queries execute via execute(), not page().execute()",
)),
}
}
fn ensure_grouped_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
match family {
ExecutionFamily::Grouped => Ok(()),
ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
"grouped execution requires grouped logical plans",
)),
}
}
pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let mode = query.mode();
let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
self.execute_query_dyn(mode, plan)
}
#[cfg(feature = "diagnostics")]
#[doc(hidden)]
#[expect(
clippy::too_many_lines,
reason = "the diagnostics-only attribution path keeps grouped and scalar execution on one explicit compile/execute accounting seam"
)]
#[expect(
clippy::needless_update,
reason = "diagnostics attribution literals stay default-backed so future counters do not break every initializer"
)]
pub fn execute_query_result_with_attribution<E>(
&self,
query: &Query<E>,
) -> Result<(LoadQueryResult<E>, QueryExecutionAttribution), QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let (plan_lookup_local_instructions, plan_and_cache) =
measure_query_stage(|| self.cached_prepared_query_plan_for_entity::<E>(query));
let (plan, cache_attribution) = plan_and_cache?;
let compile_local_instructions = plan_lookup_local_instructions;
let result =
|| -> Result<(LoadQueryResult<E>, QueryExecutePhaseAttribution, u64), QueryError> {
if query.has_grouping() {
let (executor_invocation_local_instructions, grouped_page) =
measure_query_stage(|| {
self.execute_grouped_plan_with(plan, None, |executor, plan, cursor| {
executor
.execute_grouped_paged_with_cursor_traced_with_phase_attribution(
plan, cursor,
)
})
});
let (result, trace, phase_attribution) = grouped_page?;
let (response_finalization_local_instructions, grouped) =
measure_query_stage(|| {
finalize_structural_grouped_projection_result(result, trace)
});
let grouped = grouped?;
Ok((
LoadQueryResult::Grouped(grouped),
Self::grouped_query_execute_phase_attribution(
phase_attribution,
executor_invocation_local_instructions,
response_finalization_local_instructions,
),
0,
))
} else {
match query.mode() {
QueryMode::Load(_) => {
let (executor_invocation_local_instructions, executed) =
measure_query_stage(|| {
self.load_executor::<E>()
.execute_with_phase_attribution(plan)
.map_err(QueryError::execute)
});
let (rows, phase_attribution, response_decode_local_instructions) =
executed?;
Ok((
LoadQueryResult::Rows(rows),
Self::scalar_query_execute_phase_attribution(
phase_attribution,
executor_invocation_local_instructions,
),
response_decode_local_instructions,
))
}
QueryMode::Delete(_) => {
let (executor_invocation_local_instructions, result) =
measure_query_stage(|| self.execute_query_dyn(query.mode(), plan));
let result = result?;
Ok((
LoadQueryResult::Rows(result),
QueryExecutePhaseAttribution {
executor_invocation_local_instructions,
..Self::empty_query_execute_phase_attribution()
},
0,
))
}
}
}
}();
let (result, execute_phase_attribution, response_decode_local_instructions) = result?;
let execute_local_instructions = execute_phase_attribution
.executor_invocation_local_instructions
.saturating_add(execute_phase_attribution.response_finalization_local_instructions);
let total_local_instructions =
compile_local_instructions.saturating_add(execute_local_instructions);
Ok((
result,
QueryExecutionAttribution {
compile_local_instructions,
plan_lookup_local_instructions,
executor_invocation_local_instructions: execute_phase_attribution
.executor_invocation_local_instructions,
response_finalization_local_instructions: execute_phase_attribution
.response_finalization_local_instructions,
runtime_local_instructions: execute_phase_attribution.runtime_local_instructions,
finalize_local_instructions: execute_phase_attribution.finalize_local_instructions,
direct_data_row_scan_local_instructions: execute_phase_attribution
.direct_data_row_scan_local_instructions,
direct_data_row_key_stream_local_instructions: execute_phase_attribution
.direct_data_row_key_stream_local_instructions,
direct_data_row_row_read_local_instructions: execute_phase_attribution
.direct_data_row_row_read_local_instructions,
direct_data_row_key_encode_local_instructions: execute_phase_attribution
.direct_data_row_key_encode_local_instructions,
direct_data_row_store_get_local_instructions: execute_phase_attribution
.direct_data_row_store_get_local_instructions,
direct_data_row_order_window_local_instructions: execute_phase_attribution
.direct_data_row_order_window_local_instructions,
direct_data_row_page_window_local_instructions: execute_phase_attribution
.direct_data_row_page_window_local_instructions,
grouped_stream_local_instructions: execute_phase_attribution
.grouped_stream_local_instructions,
grouped_fold_local_instructions: execute_phase_attribution
.grouped_fold_local_instructions,
grouped_finalize_local_instructions: execute_phase_attribution
.grouped_finalize_local_instructions,
grouped_count_borrowed_hash_computations: execute_phase_attribution
.grouped_count
.borrowed_hash_computations,
grouped_count_bucket_candidate_checks: execute_phase_attribution
.grouped_count
.bucket_candidate_checks,
grouped_count_existing_group_hits: execute_phase_attribution
.grouped_count
.existing_group_hits,
grouped_count_new_group_inserts: execute_phase_attribution
.grouped_count
.new_group_inserts,
grouped_count_row_materialization_local_instructions: execute_phase_attribution
.grouped_count
.row_materialization_local_instructions,
grouped_count_group_lookup_local_instructions: execute_phase_attribution
.grouped_count
.group_lookup_local_instructions,
grouped_count_existing_group_update_local_instructions: execute_phase_attribution
.grouped_count
.existing_group_update_local_instructions,
grouped_count_new_group_insert_local_instructions: execute_phase_attribution
.grouped_count
.new_group_insert_local_instructions,
response_decode_local_instructions,
execute_local_instructions,
total_local_instructions,
shared_query_plan_cache_hits: cache_attribution.hits,
shared_query_plan_cache_misses: cache_attribution.misses,
..QueryExecutionAttribution::default()
},
))
}
#[doc(hidden)]
pub fn execute_query_result<E>(
&self,
query: &Query<E>,
) -> Result<LoadQueryResult<E>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
if query.has_grouping() {
return self
.execute_grouped(query, None)
.map(LoadQueryResult::Grouped);
}
self.execute_query(query).map(LoadQueryResult::Rows)
}
#[doc(hidden)]
pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
if !query.mode().is_delete() {
return Err(QueryError::unsupported_query(
"delete count execution requires delete query mode",
));
}
let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
.map_err(QueryError::execute)
}
pub(in crate::db) fn execute_query_dyn<E>(
&self,
mode: QueryMode,
plan: PreparedExecutionPlan<E>,
) -> Result<EntityResponse<E>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let result = match mode {
QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
};
result.map_err(QueryError::execute)
}
pub(in crate::db) fn execute_load_query_with<E, T>(
&self,
query: &Query<E>,
op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
) -> Result<T, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
self.with_metrics(|| op(self.load_executor::<E>(), plan))
.map_err(QueryError::execute)
}
fn execute_scalar_terminal_boundary<E>(
&self,
query: &Query<E>,
request: ScalarTerminalBoundaryRequest,
) -> Result<ScalarTerminalBoundaryOutput, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_load_query_with(query, move |load, plan| {
load.execute_scalar_terminal_request(plan, request)
})
}
fn execute_scalar_projection_boundary<E>(
&self,
query: &Query<E>,
target_field: FieldSlot,
request: ScalarProjectionBoundaryRequest,
) -> Result<ScalarProjectionBoundaryOutput, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_load_query_with(query, move |load, plan| {
load.execute_scalar_projection_boundary(plan, target_field, request)
})
}
pub(in crate::db) fn execute_fluent_existing_rows_terminal<E>(
&self,
query: &Query<E>,
strategy: PreparedFluentExistingRowsTerminalStrategy,
) -> Result<FluentScalarTerminalOutput<E>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let (request, output_shape) = strategy.into_executor_request();
let output = self.execute_scalar_terminal_boundary(query, request)?;
match output_shape {
PreparedFluentExistingRowsTerminalRuntimeRequest::CountRows => output
.into_count()
.map(FluentScalarTerminalOutput::Count)
.map_err(QueryError::execute),
PreparedFluentExistingRowsTerminalRuntimeRequest::ExistsRows => output
.into_exists()
.map(FluentScalarTerminalOutput::Exists)
.map_err(QueryError::execute),
}
}
pub(in crate::db) fn execute_fluent_scalar_terminal<E>(
&self,
query: &Query<E>,
strategy: PreparedFluentScalarTerminalStrategy,
) -> Result<FluentScalarTerminalOutput<E>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let request = strategy.into_executor_request();
self.execute_scalar_terminal_boundary(query, request)?
.into_id::<E>()
.map(FluentScalarTerminalOutput::Id)
.map_err(QueryError::execute)
}
pub(in crate::db) fn execute_fluent_order_sensitive_terminal<E>(
&self,
query: &Query<E>,
strategy: PreparedFluentOrderSensitiveTerminalStrategy,
) -> Result<FluentScalarTerminalOutput<E>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let (request, returns_id_pair) = strategy.into_executor_request();
let output = self.execute_scalar_terminal_boundary(query, request)?;
if returns_id_pair {
return output
.into_id_pair::<E>()
.map(FluentScalarTerminalOutput::IdPair)
.map_err(QueryError::execute);
}
output
.into_id::<E>()
.map(FluentScalarTerminalOutput::Id)
.map_err(QueryError::execute)
}
pub(in crate::db) fn execute_fluent_numeric_field_terminal<E>(
&self,
query: &Query<E>,
strategy: PreparedFluentNumericFieldStrategy,
) -> Result<Option<Decimal>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let (target_field, request) = strategy.into_executor_request();
self.execute_load_query_with(query, move |load, plan| {
load.execute_numeric_field_boundary(plan, target_field, request)
})
}
pub(in crate::db) fn execute_fluent_projection_terminal<E>(
&self,
query: &Query<E>,
strategy: PreparedFluentProjectionStrategy,
) -> Result<FluentProjectionTerminalOutput<E>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let (target_field, request, output_shape) = strategy.into_executor_request();
let output = self.execute_scalar_projection_boundary(query, target_field, request)?;
match output_shape {
PreparedFluentProjectionRuntimeRequest::Values
| PreparedFluentProjectionRuntimeRequest::DistinctValues => output
.into_values()
.map(FluentProjectionTerminalOutput::Values)
.map_err(QueryError::execute),
PreparedFluentProjectionRuntimeRequest::CountDistinct => output
.into_count()
.map(FluentProjectionTerminalOutput::Count)
.map_err(QueryError::execute),
PreparedFluentProjectionRuntimeRequest::ValuesWithIds => output
.into_values_with_ids::<E>()
.map(FluentProjectionTerminalOutput::ValuesWithIds)
.map_err(QueryError::execute),
PreparedFluentProjectionRuntimeRequest::TerminalValue { .. } => output
.into_terminal_value()
.map(FluentProjectionTerminalOutput::TerminalValue)
.map_err(QueryError::execute),
}
}
pub(in crate::db) fn execute_fluent_bytes<E>(&self, query: &Query<E>) -> Result<u64, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_load_query_with(query, |load, plan| load.bytes(plan))
}
pub(in crate::db) fn execute_fluent_bytes_by_slot<E>(
&self,
query: &Query<E>,
target_slot: FieldSlot,
) -> Result<u64, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_load_query_with(query, move |load, plan| {
load.bytes_by_slot(plan, target_slot)
})
}
pub(in crate::db) fn execute_fluent_take<E>(
&self,
query: &Query<E>,
take_count: u32,
) -> Result<EntityResponse<E>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_load_query_with(query, move |load, plan| load.take(plan, take_count))
}
pub(in crate::db) fn execute_fluent_ranked_rows_by_slot<E>(
&self,
query: &Query<E>,
target_slot: FieldSlot,
take_count: u32,
descending: bool,
) -> Result<EntityResponse<E>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_load_query_with(query, move |load, plan| {
if descending {
load.top_k_by_slot(plan, target_slot, take_count)
} else {
load.bottom_k_by_slot(plan, target_slot, take_count)
}
})
}
pub(in crate::db) fn execute_fluent_ranked_values_by_slot<E>(
&self,
query: &Query<E>,
target_slot: FieldSlot,
take_count: u32,
descending: bool,
) -> Result<Vec<Value>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_load_query_with(query, move |load, plan| {
if descending {
load.top_k_by_values_slot(plan, target_slot, take_count)
} else {
load.bottom_k_by_values_slot(plan, target_slot, take_count)
}
})
}
pub(in crate::db) fn execute_fluent_ranked_values_with_ids_by_slot<E>(
&self,
query: &Query<E>,
target_slot: FieldSlot,
take_count: u32,
descending: bool,
) -> Result<Vec<(Id<E>, Value)>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_load_query_with(query, move |load, plan| {
if descending {
load.top_k_by_with_ids_slot(plan, target_slot, take_count)
} else {
load.bottom_k_by_with_ids_slot(plan, target_slot, take_count)
}
})
}
pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
where
E: EntityKind<Canister = C>,
{
let (prepared_plan, cache_attribution) =
self.cached_prepared_query_plan_for_entity::<E>(query)?;
let logical_plan = prepared_plan.logical_plan();
let explain = logical_plan.explain();
let plan_hash = logical_plan.fingerprint().to_string();
let executable_access = prepared_plan.access().executable_contract();
let access_strategy = summarize_executable_access_plan(&executable_access);
let execution_family = match query.mode() {
QueryMode::Load(_) => Some(trace_execution_family_from_executor(
prepared_plan
.execution_family()
.map_err(QueryError::execute)?,
)),
QueryMode::Delete(_) => None,
};
let reuse = query_plan_cache_reuse_event(cache_attribution);
Ok(QueryTracePlan::new(
plan_hash,
access_strategy,
execution_family,
reuse,
explain,
))
}
pub(crate) fn execute_load_query_paged_with_trace<E>(
&self,
query: &Query<E>,
cursor_token: Option<&str>,
) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let plan = self.cached_prepared_query_plan_for_entity::<E>(query)?.0;
Self::ensure_scalar_paged_execution_family(
plan.execution_family().map_err(QueryError::execute)?,
)?;
let cursor_bytes = decode_optional_cursor_token(cursor_token)
.map_err(QueryError::from_cursor_plan_error)?;
let cursor = plan
.prepare_cursor(cursor_bytes.as_deref())
.map_err(query_error_from_executor_plan_error)?;
let (page, trace) = self
.with_metrics(|| {
self.load_executor::<E>()
.execute_paged_with_cursor_traced(plan, cursor)
})
.map_err(QueryError::execute)?;
finalize_scalar_paged_execution(page, trace)
}
pub(in crate::db) fn execute_grouped<E>(
&self,
query: &Query<E>,
cursor_token: Option<&str>,
) -> Result<PagedGroupedExecutionWithTrace, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let plan = self.cached_prepared_query_plan_for_entity::<E>(query)?.0;
let (result, trace) = self.execute_grouped_plan_with_trace(plan, cursor_token)?;
finalize_structural_grouped_projection_result(result, trace)
}
pub(in crate::db::session) fn execute_grouped_plan_with<E, T>(
&self,
plan: PreparedExecutionPlan<E>,
cursor_token: Option<&str>,
op: impl FnOnce(
LoadExecutor<E>,
PreparedExecutionPlan<E>,
crate::db::cursor::GroupedPlannedCursor,
) -> Result<T, InternalError>,
) -> Result<T, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
Self::ensure_grouped_execution_family(
plan.execution_family().map_err(QueryError::execute)?,
)?;
let cursor = decode_optional_grouped_cursor_token(cursor_token)
.map_err(QueryError::from_cursor_plan_error)?;
let cursor = plan
.prepare_grouped_cursor_token(cursor)
.map_err(query_error_from_executor_plan_error)?;
self.with_metrics(|| op(self.load_executor::<E>(), plan, cursor))
.map_err(QueryError::execute)
}
pub(in crate::db::session) fn execute_grouped_plan_with_trace<E>(
&self,
plan: PreparedExecutionPlan<E>,
cursor_token: Option<&str>,
) -> Result<(StructuralGroupedProjectionResult, Option<ExecutionTrace>), QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_grouped_plan_with(plan, cursor_token, |executor, plan, cursor| {
executor.execute_grouped_paged_with_cursor_traced(plan, cursor)
})
}
}