#[cfg(feature = "diagnostics")]
use crate::db::session::{
query::QueryPlanCompilePhaseAttribution,
sql::{SqlExecutePhaseAttribution, measure_sql_stage},
};
use crate::{
db::{
DbSession, PersistedRow, QueryError,
executor::{
EntityAuthority, SharedPreparedExecutionPlan, StructuralGroupedProjectionResult,
},
query::intent::StructuralQuery,
schema::AcceptedSchemaSnapshot,
session::{
finalize_structural_grouped_projection_result,
sql::SqlProjectionContract,
sql::projection::{SqlProjectionPayload, execute_sql_projection_rows_for_canister},
sql::{SqlCacheAttribution, SqlCompiledCommandExecutionContext, SqlStatementResult},
sql_grouped_cursor_from_bytes,
},
},
traits::{CanisterKind, EntityValue},
};
use super::diagnostics::GroupedSqlDiagnosticsCollector;
#[cfg(feature = "diagnostics")]
use super::diagnostics::measure_execute_phase_with_physical_access;
#[cfg(feature = "diagnostics")]
use super::select_plan::ResolvedSelectPreparedPlan;
#[cfg(feature = "diagnostics")]
use crate::db::session::sql::projection::execute_sql_projection_rows_for_canister_with_direct_data_row_attribution;
impl<C: CanisterKind> DbSession<C> {
pub(in crate::db::session::sql::execute) fn grouped_sql_statement_result_from_result(
columns: Vec<String>,
fixed_scales: Vec<Option<u32>>,
result: StructuralGroupedProjectionResult,
) -> Result<SqlStatementResult, QueryError> {
let row_count = result.row_count();
let grouped = finalize_structural_grouped_projection_result(result, None)?;
let (rows, continuation_cursor, _) = grouped.into_rows_cursor_and_trace();
let next_cursor = sql_grouped_cursor_from_bytes(continuation_cursor);
Ok(SqlStatementResult::Grouped {
columns,
fixed_scales,
rows,
row_count,
next_cursor,
})
}
fn execute_sql_projection_from_structural_prepared_plan(
&self,
prepared_plan: SharedPreparedExecutionPlan,
projection: SqlProjectionContract,
cache_attribution: SqlCacheAttribution,
) -> Result<(SqlProjectionPayload, SqlCacheAttribution), QueryError> {
let (columns, fixed_scales) = projection.into_components();
let (rows, row_count) =
execute_sql_projection_rows_for_canister(&self.db, self.debug, prepared_plan)
.map_err(QueryError::execute)?;
Ok((
SqlProjectionPayload::new(columns, fixed_scales, rows, row_count),
cache_attribution,
))
}
fn execute_sql_statement_from_structural_prepared_plan(
&self,
prepared_plan: SharedPreparedExecutionPlan,
projection: SqlProjectionContract,
cache_attribution: SqlCacheAttribution,
) -> Result<(SqlStatementResult, SqlCacheAttribution), QueryError> {
let (payload, cache_attribution) = self
.execute_sql_projection_from_structural_prepared_plan(
prepared_plan,
projection,
cache_attribution,
)?;
Ok((payload.into_statement_result(), cache_attribution))
}
fn execute_grouped_sql_core<T>(
&self,
prepared_plan: SharedPreparedExecutionPlan,
projection: SqlProjectionContract,
diagnostics: Option<GroupedSqlDiagnosticsCollector<'_>>,
execute_grouped: impl FnOnce(
&Self,
SharedPreparedExecutionPlan,
)
-> Result<(StructuralGroupedProjectionResult, T), QueryError>,
) -> Result<(SqlStatementResult, T), QueryError> {
let (columns, fixed_scales) = projection.into_components();
let (result, extra) = execute_grouped(self, prepared_plan)?;
let statement_result = if let Some(diagnostics) = diagnostics {
diagnostics.finalize_grouped_sql_statement::<C>(columns, fixed_scales, result)?
} else {
Self::grouped_sql_statement_result_from_result(columns, fixed_scales, result)?
};
Ok((statement_result, extra))
}
fn execute_grouped_sql_statement_from_prepared_plan<T>(
&self,
prepared_plan: SharedPreparedExecutionPlan,
projection: SqlProjectionContract,
execute_grouped: impl FnOnce(
&Self,
SharedPreparedExecutionPlan,
)
-> Result<(StructuralGroupedProjectionResult, T), QueryError>,
) -> Result<(SqlStatementResult, T), QueryError> {
self.execute_grouped_sql_core(prepared_plan, projection, None, execute_grouped)
}
#[cfg(feature = "diagnostics")]
fn execute_grouped_sql_statement_with_response_attribution<T>(
&self,
prepared_plan: SharedPreparedExecutionPlan,
projection: SqlProjectionContract,
execute_grouped: impl FnOnce(
&Self,
SharedPreparedExecutionPlan,
)
-> Result<(StructuralGroupedProjectionResult, T), QueryError>,
) -> Result<(SqlStatementResult, T, u64), QueryError> {
let mut response_finalization_local_instructions = 0;
let diagnostics =
GroupedSqlDiagnosticsCollector::new(&mut response_finalization_local_instructions);
let (statement_result, extra) = self.execute_grouped_sql_core(
prepared_plan,
projection,
Some(diagnostics),
execute_grouped,
)?;
Ok((
statement_result,
extra,
response_finalization_local_instructions,
))
}
pub(in crate::db::session::sql) fn execute_sql_projection_from_structural_query_without_sql_compiled_cache(
&self,
query: StructuralQuery,
authority: EntityAuthority,
accepted_schema: &AcceptedSchemaSnapshot,
) -> Result<(SqlProjectionPayload, SqlCacheAttribution), QueryError> {
let (prepared_plan, projection, cache_attribution) = self
.sql_select_prepared_plan_for_accepted_authority(&query, authority, accepted_schema)?;
self.execute_sql_projection_from_structural_prepared_plan(
prepared_plan,
projection,
cache_attribution,
)
}
#[cfg(feature = "diagnostics")]
pub(super) fn execute_select_compiled_sql_with_phase_attribution_from_resolver<E>(
&self,
query: &StructuralQuery,
resolve_plan: impl FnOnce() -> Result<
(ResolvedSelectPreparedPlan, QueryPlanCompilePhaseAttribution),
QueryError,
>,
) -> Result<
(
SqlStatementResult,
SqlCacheAttribution,
SqlExecutePhaseAttribution,
),
QueryError,
>
where
E: PersistedRow<Canister = C> + EntityValue,
{
if query.has_grouping() {
let (planner_local_instructions, resolved_query_plan) = measure_sql_stage(resolve_plan);
let (resolved, plan_compile_attribution) = resolved_query_plan?;
let (prepared_plan, projection, cache_attribution) = resolved.into_parts();
let ((execute_local_instructions, store_local_instructions), statement_result) =
measure_execute_phase_with_physical_access(move || {
self.execute_grouped_sql_statement_with_response_attribution(
prepared_plan,
projection,
|session, prepared_plan| {
let plan = prepared_plan
.typed_clone::<E>()
.map_err(QueryError::execute)?;
session
.execute_grouped_with_phase_attribution(plan, None)
.map(|(result, _trace, phase_attribution)| {
(result, phase_attribution)
})
},
)
});
let (
statement_result,
grouped_phase_attribution,
response_finalization_local_instructions,
) = statement_result?;
return Ok((
statement_result,
cache_attribution,
SqlExecutePhaseAttribution::from_grouped_select_phase(
planner_local_instructions,
plan_compile_attribution,
execute_local_instructions,
store_local_instructions,
response_finalization_local_instructions,
grouped_phase_attribution,
),
));
}
let (planner_local_instructions, resolved_query_plan) = measure_sql_stage(resolve_plan);
let (resolved, plan_compile_attribution) = resolved_query_plan?;
let (prepared_plan, projection, cache_attribution) = resolved.into_parts();
let ((execute_local_instructions, store_local_instructions), payload) =
measure_execute_phase_with_physical_access(move || {
let (columns, fixed_scales) = projection.into_components();
execute_sql_projection_rows_for_canister_with_direct_data_row_attribution(
&self.db,
self.debug,
prepared_plan,
)
.map(|((rows, row_count), direct_data_row, kernel_row)| {
(
SqlProjectionPayload::new(columns, fixed_scales, rows, row_count),
direct_data_row,
kernel_row,
)
})
.map_err(QueryError::execute)
});
let (payload, direct_data_row, kernel_row) = payload?;
let (response_finalization_local_instructions, statement_result) =
measure_sql_stage(|| Ok::<_, QueryError>(payload.into_statement_result()));
let statement_result = statement_result?;
Ok((
statement_result,
cache_attribution,
SqlExecutePhaseAttribution::from_projection_select_phase(
planner_local_instructions,
plan_compile_attribution,
execute_local_instructions,
store_local_instructions,
response_finalization_local_instructions,
direct_data_row,
kernel_row,
),
))
}
pub(super) fn execute_select_compiled_sql_with_cache_attribution<E>(
&self,
query: &StructuralQuery,
) -> Result<(SqlStatementResult, SqlCacheAttribution), QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let catalog = self
.accepted_schema_catalog_context_for_query::<E>()
.map_err(QueryError::execute)?;
let authority = catalog
.accepted_entity_authority_for::<E>()
.map_err(QueryError::execute)?;
let resolved = self
.resolve_select_prepared_plan_for_authority_with_catalog(query, authority, &catalog)?;
let (prepared_plan, projection, cache_attribution) = resolved.into_parts();
self.execute_select_compiled_sql_from_prepared_plan::<E>(
query,
prepared_plan,
projection,
cache_attribution,
)
}
pub(super) fn execute_select_compiled_sql_with_context<E>(
&self,
query: &StructuralQuery,
context: &SqlCompiledCommandExecutionContext,
) -> Result<(SqlStatementResult, SqlCacheAttribution), QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let resolved = self.resolve_select_prepared_plan_for_context::<E>(query, context)?;
let (prepared_plan, projection, cache_attribution) = resolved.into_parts();
self.execute_select_compiled_sql_from_prepared_plan::<E>(
query,
prepared_plan,
projection,
cache_attribution,
)
}
#[cfg(feature = "diagnostics")]
pub(super) fn execute_select_compiled_sql_with_context_phase_attribution<E>(
&self,
query: &StructuralQuery,
context: &SqlCompiledCommandExecutionContext,
) -> Result<
(
SqlStatementResult,
SqlCacheAttribution,
SqlExecutePhaseAttribution,
),
QueryError,
>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_select_compiled_sql_with_phase_attribution_from_resolver::<E>(query, || {
self.resolve_select_prepared_plan_for_context_with_compile_phase_attribution::<E>(
query, context,
)
})
}
fn execute_select_compiled_sql_from_prepared_plan<E>(
&self,
query: &StructuralQuery,
prepared_plan: SharedPreparedExecutionPlan,
projection: SqlProjectionContract,
cache_attribution: SqlCacheAttribution,
) -> Result<(SqlStatementResult, SqlCacheAttribution), QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
if query.has_grouping() {
let (statement_result, ()) = self.execute_grouped_sql_statement_from_prepared_plan(
prepared_plan,
projection,
|session, prepared_plan| {
let plan = prepared_plan
.typed_clone::<E>()
.map_err(QueryError::execute)?;
session
.execute_grouped_with_trace(plan, None)
.map(|(result, _trace)| (result, ()))
},
)?;
return Ok((statement_result, cache_attribution));
}
self.execute_sql_statement_from_structural_prepared_plan(
prepared_plan,
projection,
cache_attribution,
)
}
}