#[cfg(feature = "diagnostics")]
use crate::db::executor::{GroupedExecutePhaseAttribution, ScalarExecutePhaseAttribution};
use crate::{
db::{
DbSession, EntityResponse, LoadQueryResult, PersistedRow, Query, QueryError,
cursor::CursorPlanError,
diagnostics::ExecutionTrace,
executor::{
ExecutionFamily, ExecutorPlanError, LoadExecutor, PreparedExecutionPlan,
StructuralGroupedProjectionResult,
},
query::plan::QueryMode,
session::finalize_structural_grouped_projection_result,
},
error::InternalError,
traits::{CanisterKind, EntityValue},
};
#[expect(
clippy::large_enum_variant,
reason = "the grouped execution result stays inline to avoid adding a boxed allocation on query execution paths"
)]
pub(in crate::db::session::query) enum PreparedQueryExecutionOutcome<E>
where
E: PersistedRow,
{
Scalar {
rows: EntityResponse<E>,
#[cfg(feature = "diagnostics")]
phase: Option<ScalarExecutePhaseAttribution>,
#[cfg(feature = "diagnostics")]
response_decode_local_instructions: u64,
},
Grouped {
result: StructuralGroupedProjectionResult,
trace: Option<ExecutionTrace>,
#[cfg(feature = "diagnostics")]
phase: Option<GroupedExecutePhaseAttribution>,
},
Delete {
rows: EntityResponse<E>,
},
DeleteCount {
row_count: u32,
},
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db::session::query) enum PreparedQueryExecutionOutput {
Rows,
DeleteCount,
}
pub(in crate::db::session) fn query_error_from_executor_plan_error(
err: ExecutorPlanError,
) -> QueryError {
match err {
ExecutorPlanError::Cursor(err) => QueryError::from_cursor_plan_error(*err),
}
}
impl<C: CanisterKind> DbSession<C> {
pub(in crate::db::session::query) fn ensure_scalar_paged_execution_family(
family: ExecutionFamily,
) -> Result<(), QueryError> {
match family {
ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
)),
ExecutionFamily::Ordered => Ok(()),
ExecutionFamily::Grouped => Err(QueryError::invariant(
"grouped queries execute via execute(), not page().execute()",
)),
}
}
pub(in crate::db::session::query) fn ensure_grouped_execution_family(
family: ExecutionFamily,
) -> Result<(), QueryError> {
match family {
ExecutionFamily::Grouped => Ok(()),
ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
"grouped execution requires grouped logical plans",
)),
}
}
pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_query_result(query)
.and_then(LoadQueryResult::into_rows)
}
#[doc(hidden)]
pub fn execute_query_result<E>(
&self,
query: &Query<E>,
) -> Result<LoadQueryResult<E>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
self.execute_prepared(plan, false, PreparedQueryExecutionOutput::Rows)
.and_then(Self::load_result_from_prepared_outcome)
}
#[doc(hidden)]
pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
if !query.mode().is_delete() {
return Err(QueryError::unsupported_query(
"delete count execution requires delete query mode",
));
}
let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
match self.execute_prepared(plan, false, PreparedQueryExecutionOutput::DeleteCount)? {
PreparedQueryExecutionOutcome::DeleteCount { row_count } => Ok(row_count),
PreparedQueryExecutionOutcome::Scalar { .. }
| PreparedQueryExecutionOutcome::Grouped { .. }
| PreparedQueryExecutionOutcome::Delete { .. } => Err(QueryError::invariant(
"delete count execution returned non-count result",
)),
}
}
pub(in crate::db::session::query) fn execute_prepared<E>(
&self,
plan: PreparedExecutionPlan<E>,
collect_attribution: bool,
output: PreparedQueryExecutionOutput,
) -> Result<PreparedQueryExecutionOutcome<E>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
#[cfg(not(feature = "diagnostics"))]
let _ = collect_attribution;
if plan.is_grouped() {
if output == PreparedQueryExecutionOutput::DeleteCount {
return Err(QueryError::invariant(
"delete count execution requires delete query mode",
));
}
#[cfg(feature = "diagnostics")]
if collect_attribution {
let (result, trace, phase) =
self.execute_grouped_with_cursor(plan, None, |executor, plan, cursor| {
executor.execute_grouped_paged_with_cursor_traced_with_phase_attribution(
plan, cursor,
)
})?;
return Ok(PreparedQueryExecutionOutcome::Grouped {
result,
trace,
phase: Some(phase),
});
}
let (result, trace) = self.execute_grouped_with_trace(plan, None)?;
return Ok(PreparedQueryExecutionOutcome::Grouped {
result,
trace,
#[cfg(feature = "diagnostics")]
phase: None,
});
}
match plan.mode() {
QueryMode::Load(_) => {
if output == PreparedQueryExecutionOutput::DeleteCount {
return Err(QueryError::invariant(
"delete count execution requires delete query mode",
));
}
#[cfg(feature = "diagnostics")]
if collect_attribution {
let (rows, phase, response_decode_local_instructions) = self
.load_executor::<E>()
.execute_with_phase_attribution(plan)
.map_err(QueryError::execute)?;
return Ok(PreparedQueryExecutionOutcome::Scalar {
rows,
phase: Some(phase),
response_decode_local_instructions,
});
}
let rows = self
.with_metrics(|| self.load_executor::<E>().execute(plan))
.map_err(QueryError::execute)?;
Ok(PreparedQueryExecutionOutcome::Scalar {
rows,
#[cfg(feature = "diagnostics")]
phase: None,
#[cfg(feature = "diagnostics")]
response_decode_local_instructions: 0,
})
}
QueryMode::Delete(_) => match output {
PreparedQueryExecutionOutput::Rows => {
let rows = self
.with_metrics(|| self.delete_executor::<E>().execute(plan))
.map_err(QueryError::execute)?;
Ok(PreparedQueryExecutionOutcome::Delete { rows })
}
PreparedQueryExecutionOutput::DeleteCount => {
let row_count = self
.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
.map_err(QueryError::execute)?;
Ok(PreparedQueryExecutionOutcome::DeleteCount { row_count })
}
},
}
}
fn load_result_from_prepared_outcome<E>(
outcome: PreparedQueryExecutionOutcome<E>,
) -> Result<LoadQueryResult<E>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
match outcome {
PreparedQueryExecutionOutcome::Scalar { rows, .. }
| PreparedQueryExecutionOutcome::Delete { rows } => Ok(LoadQueryResult::Rows(rows)),
PreparedQueryExecutionOutcome::Grouped { result, trace, .. } => {
finalize_structural_grouped_projection_result(result, trace)
.map(LoadQueryResult::Grouped)
}
PreparedQueryExecutionOutcome::DeleteCount { .. } => Err(QueryError::invariant(
"delete count result cannot be converted to load query result",
)),
}
}
pub(in crate::db) fn execute_with_plan<E, T>(
&self,
query: &Query<E>,
op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
) -> Result<T, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
self.with_metrics(|| op(self.load_executor::<E>(), plan))
.map_err(QueryError::execute)
}
}