#[cfg(feature = "diagnostics")]
use crate::db::session::{
query::QueryPlanCompilePhaseAttribution,
sql::{SqlExecutePhaseAttribution, measure_sql_stage},
};
use crate::{
db::{
DbSession, PersistedRow, QueryError,
executor::{
EntityAuthority, SharedPreparedExecutionPlan, StructuralGroupedProjectionResult,
},
query::intent::StructuralQuery,
schema::AcceptedSchemaSnapshot,
session::{
finalize_structural_grouped_projection_result,
sql::SqlProjectionContract,
sql::projection::{SqlProjectionPayload, execute_sql_projection_rows_for_canister},
sql::{SqlCacheAttribution, SqlCompiledCommandExecutionContext, SqlStatementResult},
sql_grouped_cursor_from_bytes,
},
},
traits::{CanisterKind, EntityValue},
};
#[cfg(test)]
use crate::{
db::{
GroupedRow,
executor::initial_read_plan_requires_materialized_sort,
query::{
admission::{QueryAdmissionPolicy, QueryAdmissionSummary, QueryMaterializationSummary},
plan::GroupedExecutionConfig,
},
},
error::InternalError,
value::OutputValue,
};
#[cfg(test)]
use candid::{CandidType, Encode};
use super::diagnostics::GroupedSqlDiagnosticsCollector;
#[cfg(feature = "diagnostics")]
use super::diagnostics::measure_execute_phase_with_physical_access;
#[cfg(feature = "diagnostics")]
use super::select_plan::ResolvedSelectPreparedPlan;
#[cfg(feature = "diagnostics")]
use crate::db::session::sql::projection::execute_sql_projection_rows_for_canister_with_direct_data_row_attribution;
#[derive(CandidType)]
#[cfg(test)]
enum SqlReadResponseSizeProbe {
Projection(SqlReadProjectionSizeProbe),
Grouped(SqlReadGroupedSizeProbe),
}
#[derive(CandidType)]
#[cfg(test)]
struct SqlReadProjectionSizeProbe {
columns: Vec<String>,
fixed_scales: Vec<Option<u32>>,
rows: Vec<Vec<OutputValue>>,
row_count: u32,
}
#[derive(CandidType)]
#[cfg(test)]
struct SqlReadGroupedSizeProbe {
columns: Vec<String>,
fixed_scales: Vec<Option<u32>>,
rows: Vec<SqlReadGroupedRowSizeProbe>,
row_count: u32,
next_cursor: Option<String>,
}
#[derive(CandidType)]
#[cfg(test)]
struct SqlReadGroupedRowSizeProbe {
group_key: Vec<OutputValue>,
aggregate_values: Vec<OutputValue>,
}
impl<C: CanisterKind> DbSession<C> {
pub(in crate::db::session::sql::execute) fn grouped_sql_statement_result_from_result(
columns: Vec<String>,
fixed_scales: Vec<Option<u32>>,
result: StructuralGroupedProjectionResult,
) -> Result<SqlStatementResult, QueryError> {
let row_count = result.row_count();
let grouped = finalize_structural_grouped_projection_result(result, None)?;
let (rows, continuation_cursor, _) = grouped.into_rows_cursor_and_trace();
let next_cursor = sql_grouped_cursor_from_bytes(continuation_cursor);
Ok(SqlStatementResult::Grouped {
columns,
fixed_scales,
rows,
row_count,
next_cursor,
})
}
fn execute_sql_projection_from_structural_prepared_plan(
&self,
prepared_plan: SharedPreparedExecutionPlan,
projection: SqlProjectionContract,
cache_attribution: SqlCacheAttribution,
) -> Result<(SqlProjectionPayload, SqlCacheAttribution), QueryError> {
let (columns, fixed_scales) = projection.into_components();
let (rows, row_count) =
execute_sql_projection_rows_for_canister(&self.db, self.debug, prepared_plan)
.map_err(QueryError::execute)?;
Ok((
SqlProjectionPayload::new(columns, fixed_scales, rows, row_count),
cache_attribution,
))
}
fn execute_sql_statement_from_structural_prepared_plan(
&self,
prepared_plan: SharedPreparedExecutionPlan,
projection: SqlProjectionContract,
cache_attribution: SqlCacheAttribution,
) -> Result<(SqlStatementResult, SqlCacheAttribution), QueryError> {
let (payload, cache_attribution) = self
.execute_sql_projection_from_structural_prepared_plan(
prepared_plan,
projection,
cache_attribution,
)?;
Ok((payload.into_statement_result(), cache_attribution))
}
fn execute_grouped_sql_core<T>(
&self,
prepared_plan: SharedPreparedExecutionPlan,
projection: SqlProjectionContract,
diagnostics: Option<GroupedSqlDiagnosticsCollector<'_>>,
execute_grouped: impl FnOnce(
&Self,
SharedPreparedExecutionPlan,
)
-> Result<(StructuralGroupedProjectionResult, T), QueryError>,
) -> Result<(SqlStatementResult, T), QueryError> {
let (columns, fixed_scales) = projection.into_components();
let (result, extra) = execute_grouped(self, prepared_plan)?;
let statement_result = if let Some(diagnostics) = diagnostics {
diagnostics.finalize_grouped_sql_statement::<C>(columns, fixed_scales, result)?
} else {
Self::grouped_sql_statement_result_from_result(columns, fixed_scales, result)?
};
Ok((statement_result, extra))
}
fn execute_grouped_sql_statement_from_prepared_plan<T>(
&self,
prepared_plan: SharedPreparedExecutionPlan,
projection: SqlProjectionContract,
execute_grouped: impl FnOnce(
&Self,
SharedPreparedExecutionPlan,
)
-> Result<(StructuralGroupedProjectionResult, T), QueryError>,
) -> Result<(SqlStatementResult, T), QueryError> {
self.execute_grouped_sql_core(prepared_plan, projection, None, execute_grouped)
}
#[cfg(feature = "diagnostics")]
fn execute_grouped_sql_statement_with_response_attribution<T>(
&self,
prepared_plan: SharedPreparedExecutionPlan,
projection: SqlProjectionContract,
execute_grouped: impl FnOnce(
&Self,
SharedPreparedExecutionPlan,
)
-> Result<(StructuralGroupedProjectionResult, T), QueryError>,
) -> Result<(SqlStatementResult, T, u64), QueryError> {
let mut response_finalization_local_instructions = 0;
let diagnostics =
GroupedSqlDiagnosticsCollector::new(&mut response_finalization_local_instructions);
let (statement_result, extra) = self.execute_grouped_sql_core(
prepared_plan,
projection,
Some(diagnostics),
execute_grouped,
)?;
Ok((
statement_result,
extra,
response_finalization_local_instructions,
))
}
pub(in crate::db::session::sql) fn execute_sql_projection_from_structural_query_without_sql_compiled_cache(
&self,
query: StructuralQuery,
authority: EntityAuthority,
accepted_schema: &AcceptedSchemaSnapshot,
) -> Result<(SqlProjectionPayload, SqlCacheAttribution), QueryError> {
let (prepared_plan, projection, cache_attribution) = self
.sql_select_prepared_plan_for_accepted_authority(&query, authority, accepted_schema)?;
self.execute_sql_projection_from_structural_prepared_plan(
prepared_plan,
projection,
cache_attribution,
)
}
#[cfg(feature = "diagnostics")]
pub(super) fn execute_select_compiled_sql_with_phase_attribution_from_resolver<E>(
&self,
query: &StructuralQuery,
resolve_plan: impl FnOnce() -> Result<
(ResolvedSelectPreparedPlan, QueryPlanCompilePhaseAttribution),
QueryError,
>,
) -> Result<
(
SqlStatementResult,
SqlCacheAttribution,
SqlExecutePhaseAttribution,
),
QueryError,
>
where
E: PersistedRow<Canister = C> + EntityValue,
{
if query.has_grouping() {
let (planner_local_instructions, resolved_query_plan) = measure_sql_stage(resolve_plan);
let (resolved, plan_compile_attribution) = resolved_query_plan?;
let (prepared_plan, projection, cache_attribution) = resolved.into_parts();
let ((execute_local_instructions, store_local_instructions), statement_result) =
measure_execute_phase_with_physical_access(move || {
self.execute_grouped_sql_statement_with_response_attribution(
prepared_plan,
projection,
|session, prepared_plan| {
let plan = prepared_plan
.typed_clone::<E>()
.map_err(QueryError::execute)?;
session
.execute_grouped_with_phase_attribution(plan, None)
.map(|(result, _trace, phase_attribution)| {
(result, phase_attribution)
})
},
)
});
let (
statement_result,
grouped_phase_attribution,
response_finalization_local_instructions,
) = statement_result?;
return Ok((
statement_result,
cache_attribution,
SqlExecutePhaseAttribution::from_grouped_select_phase(
planner_local_instructions,
plan_compile_attribution,
execute_local_instructions,
store_local_instructions,
response_finalization_local_instructions,
grouped_phase_attribution,
),
));
}
let (planner_local_instructions, resolved_query_plan) = measure_sql_stage(resolve_plan);
let (resolved, plan_compile_attribution) = resolved_query_plan?;
let (prepared_plan, projection, cache_attribution) = resolved.into_parts();
let ((execute_local_instructions, store_local_instructions), payload) =
measure_execute_phase_with_physical_access(move || {
let (columns, fixed_scales) = projection.into_components();
execute_sql_projection_rows_for_canister_with_direct_data_row_attribution(
&self.db,
self.debug,
prepared_plan,
)
.map(|((rows, row_count), direct_data_row, kernel_row)| {
(
SqlProjectionPayload::new(columns, fixed_scales, rows, row_count),
direct_data_row,
kernel_row,
)
})
.map_err(QueryError::execute)
});
let (payload, direct_data_row, kernel_row) = payload?;
let (response_finalization_local_instructions, statement_result) =
measure_sql_stage(|| Ok::<_, QueryError>(payload.into_statement_result()));
let statement_result = statement_result?;
Ok((
statement_result,
cache_attribution,
SqlExecutePhaseAttribution::from_projection_select_phase(
planner_local_instructions,
plan_compile_attribution,
execute_local_instructions,
store_local_instructions,
response_finalization_local_instructions,
direct_data_row,
kernel_row,
),
))
}
pub(super) fn execute_select_compiled_sql_with_cache_attribution<E>(
&self,
query: &StructuralQuery,
) -> Result<(SqlStatementResult, SqlCacheAttribution), QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let catalog = self
.accepted_schema_catalog_context_for_query::<E>()
.map_err(QueryError::execute)?;
let authority = catalog
.accepted_entity_authority_for::<E>()
.map_err(QueryError::execute)?;
let resolved = self
.resolve_select_prepared_plan_for_authority_with_catalog(query, authority, &catalog)?;
let (prepared_plan, projection, cache_attribution) = resolved.into_parts();
self.execute_select_compiled_sql_from_prepared_plan::<E>(
query,
prepared_plan,
projection,
cache_attribution,
)
}
pub(super) fn execute_select_compiled_sql_with_context<E>(
&self,
query: &StructuralQuery,
context: &SqlCompiledCommandExecutionContext,
) -> Result<(SqlStatementResult, SqlCacheAttribution), QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let resolved = self.resolve_select_prepared_plan_for_context::<E>(query, context)?;
let (prepared_plan, projection, cache_attribution) = resolved.into_parts();
self.execute_select_compiled_sql_from_prepared_plan::<E>(
query,
prepared_plan,
projection,
cache_attribution,
)
}
#[cfg(test)]
pub(super) fn execute_select_compiled_sql_with_context_and_read_admission_policy<E>(
&self,
query: &StructuralQuery,
context: &SqlCompiledCommandExecutionContext,
policy: &QueryAdmissionPolicy,
) -> Result<(SqlStatementResult, SqlCacheAttribution), QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let resolved = self.resolve_select_prepared_plan_for_context::<E>(query, context)?;
let (prepared_plan, projection, cache_attribution) = resolved.into_parts();
let prepared_plan =
prepared_read_admission_plan_with_execution_caps(policy, prepared_plan)?;
enforce_read_admission_policy(policy, &prepared_plan)?;
let result = self.execute_select_compiled_sql_from_prepared_plan::<E>(
query,
prepared_plan,
projection,
cache_attribution,
)?;
enforce_sql_read_response_byte_policy(policy, &result.0)?;
Ok(result)
}
#[cfg(feature = "diagnostics")]
pub(super) fn execute_select_compiled_sql_with_context_phase_attribution<E>(
&self,
query: &StructuralQuery,
context: &SqlCompiledCommandExecutionContext,
) -> Result<
(
SqlStatementResult,
SqlCacheAttribution,
SqlExecutePhaseAttribution,
),
QueryError,
>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_select_compiled_sql_with_phase_attribution_from_resolver::<E>(query, || {
self.resolve_select_prepared_plan_for_context_with_compile_phase_attribution::<E>(
query, context,
)
})
}
fn execute_select_compiled_sql_from_prepared_plan<E>(
&self,
query: &StructuralQuery,
prepared_plan: SharedPreparedExecutionPlan,
projection: SqlProjectionContract,
cache_attribution: SqlCacheAttribution,
) -> Result<(SqlStatementResult, SqlCacheAttribution), QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
if query.has_grouping() {
let (statement_result, ()) = self.execute_grouped_sql_statement_from_prepared_plan(
prepared_plan,
projection,
|session, prepared_plan| {
let plan = prepared_plan
.typed_clone::<E>()
.map_err(QueryError::execute)?;
session
.execute_grouped_with_trace(plan, None)
.map(|(result, _trace)| (result, ()))
},
)?;
return Ok((statement_result, cache_attribution));
}
self.execute_sql_statement_from_structural_prepared_plan(
prepared_plan,
projection,
cache_attribution,
)
}
}
#[cfg(test)]
pub(super) fn enforce_read_admission_policy(
policy: &QueryAdmissionPolicy,
prepared_plan: &SharedPreparedExecutionPlan,
) -> Result<(), QueryError> {
let mut summary = QueryAdmissionSummary::from_plan(policy.lane(), prepared_plan.logical_plan());
if initial_read_plan_requires_materialized_sort(prepared_plan).map_err(QueryError::execute)? {
let returned_row_bound = summary.returned_row_bound();
let returned_row_bound_kind = summary.returned_row_bound_kind();
summary = summary.with_materialization(QueryMaterializationSummary::sort(
returned_row_bound,
returned_row_bound_kind,
));
}
let admission = policy.evaluate(summary);
if let Some(rejection) = admission.rejection() {
Err(QueryError::from(rejection.code()))
} else {
Ok(())
}
}
#[cfg(test)]
fn prepared_read_admission_plan_with_execution_caps(
policy: &QueryAdmissionPolicy,
prepared_plan: SharedPreparedExecutionPlan,
) -> Result<SharedPreparedExecutionPlan, QueryError> {
let Some(execution) = grouped_execution_config_for_read_admission(policy, &prepared_plan)
else {
return Ok(prepared_plan);
};
prepared_plan
.with_grouped_execution_config(execution)
.map_err(QueryError::execute)
}
#[cfg(test)]
fn grouped_execution_config_for_read_admission(
policy: &QueryAdmissionPolicy,
prepared_plan: &SharedPreparedExecutionPlan,
) -> Option<GroupedExecutionConfig> {
let policy_execution = policy.grouped().execution_config()?;
let grouped_plan = prepared_plan.logical_plan().grouped_plan()?;
let plan_execution = grouped_plan.group.execution;
Some(GroupedExecutionConfig::with_hard_limits(
plan_execution
.max_groups()
.min(policy_execution.max_groups()),
plan_execution
.max_group_bytes()
.min(policy_execution.max_group_bytes()),
))
}
#[cfg(test)]
pub(super) fn enforce_sql_read_response_byte_policy(
policy: &QueryAdmissionPolicy,
result: &SqlStatementResult,
) -> Result<(), QueryError> {
let Some(max_response_bytes) = policy.max_response_bytes() else {
return Ok(());
};
let max_response_bytes = usize::try_from(max_response_bytes.get()).unwrap_or(usize::MAX);
if sql_read_projection_response_len_exceeds_max(result, max_response_bytes)? {
Err(QueryError::from(
icydb_diagnostic_code::QueryReadAdmissionCode::ProjectionResponseMayExceedLimit,
))
} else {
Ok(())
}
}
#[cfg(test)]
fn sql_read_projection_response_len_exceeds_max(
result: &SqlStatementResult,
max_response_bytes: usize,
) -> Result<bool, QueryError> {
match result {
SqlStatementResult::Projection {
columns,
fixed_scales,
rows,
row_count,
} => {
let base_len = encoded_sql_read_projection_response_len(
columns.clone(),
fixed_scales.clone(),
Vec::new(),
*row_count,
)?;
encoded_sql_read_projection_rows_len_exceeds_max(base_len, max_response_bytes, rows)
}
SqlStatementResult::Grouped {
columns,
fixed_scales,
rows,
row_count,
next_cursor,
} => {
let base_len = encoded_sql_read_grouped_response_len(
columns.clone(),
fixed_scales.clone(),
Vec::new(),
*row_count,
next_cursor.clone(),
)?;
encoded_sql_read_grouped_rows_len_exceeds_max(base_len, max_response_bytes, rows)
}
_ => Ok(false),
}
}
#[cfg(test)]
fn encoded_sql_read_projection_rows_len_exceeds_max(
mut estimated_payload_len: usize,
max_response_bytes: usize,
rows: &[Vec<OutputValue>],
) -> Result<bool, QueryError> {
if estimated_payload_len > max_response_bytes {
return Ok(true);
}
for row in rows {
let row_len = Encode!(row)
.map_err(|_| QueryError::execute(InternalError::query_executor_invariant()))?
.len();
estimated_payload_len = estimated_payload_len.saturating_add(row_len);
if estimated_payload_len > max_response_bytes {
return Ok(true);
}
}
Ok(false)
}
#[cfg(test)]
fn encoded_sql_read_grouped_rows_len_exceeds_max(
mut estimated_payload_len: usize,
max_response_bytes: usize,
rows: &[GroupedRow],
) -> Result<bool, QueryError> {
if estimated_payload_len > max_response_bytes {
return Ok(true);
}
for row in rows {
let row_len = Encode!(&grouped_row_size_probe(row))
.map_err(|_| QueryError::execute(InternalError::query_executor_invariant()))?
.len();
estimated_payload_len = estimated_payload_len.saturating_add(row_len);
if estimated_payload_len > max_response_bytes {
return Ok(true);
}
}
Ok(false)
}
#[cfg(test)]
fn grouped_row_size_probe(row: &GroupedRow) -> SqlReadGroupedRowSizeProbe {
SqlReadGroupedRowSizeProbe {
group_key: row.group_key().to_vec(),
aggregate_values: row.aggregate_values().to_vec(),
}
}
#[cfg(test)]
fn encoded_sql_read_projection_response_len(
columns: Vec<String>,
fixed_scales: Vec<Option<u32>>,
rows: Vec<Vec<OutputValue>>,
row_count: u32,
) -> Result<usize, QueryError> {
let payload = SqlReadResponseSizeProbe::Projection(SqlReadProjectionSizeProbe {
columns,
fixed_scales,
rows,
row_count,
});
let encoded = Encode!(&payload)
.map_err(|_| QueryError::execute(InternalError::query_executor_invariant()))?;
Ok(encoded.len())
}
#[cfg(test)]
fn encoded_sql_read_grouped_response_len(
columns: Vec<String>,
fixed_scales: Vec<Option<u32>>,
rows: Vec<SqlReadGroupedRowSizeProbe>,
row_count: u32,
next_cursor: Option<String>,
) -> Result<usize, QueryError> {
let payload = SqlReadResponseSizeProbe::Grouped(SqlReadGroupedSizeProbe {
columns,
fixed_scales,
rows,
row_count,
next_cursor,
});
let encoded = Encode!(&payload)
.map_err(|_| QueryError::execute(InternalError::query_executor_invariant()))?;
Ok(encoded.len())
}