mod attribution;
mod cache;
mod compile;
mod compile_cache;
mod compiled;
mod execute;
mod projection;
mod result;
#[cfg(feature = "diagnostics")]
use crate::db::DataStore;
#[cfg(feature = "diagnostics")]
use crate::db::executor::{
current_pure_covering_decode_local_instructions,
current_pure_covering_row_assembly_local_instructions,
};
#[cfg(test)]
use crate::db::sql::parser::parse_sql;
#[cfg(feature = "diagnostics")]
use crate::db::{GroupedCountAttribution, GroupedExecutionAttribution};
use crate::{
db::{
DbSession, PersistedRow, QueryError,
executor::{EntityAuthority, SharedPreparedExecutionPlan},
query::intent::StructuralQuery,
schema::{AcceptedSchemaSnapshot, SchemaInfo},
schema::{
execute_sql_ddl_expression_index_addition, execute_sql_ddl_field_addition,
execute_sql_ddl_field_default_change, execute_sql_ddl_field_nullability_change,
execute_sql_ddl_field_path_index_addition, execute_sql_ddl_secondary_index_drop,
},
session::query::QueryPlanCacheAttribution,
session::sql::projection::{
projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
},
sql::{
ddl::{PreparedSqlDdlCommand, prepare_sql_ddl_statement},
parser::{SqlDdlStatement, SqlExplainTarget, SqlStatement, parse_sql_with_attribution},
},
},
traits::{CanisterKind, EntityValue, Path},
};
pub(in crate::db::session::sql) use crate::db::diagnostics::measure_local_instruction_delta as measure_sql_stage;
pub use crate::db::sql::ddl::{SqlDdlExecutionStatus, SqlDdlMutationKind, SqlDdlPreparationReport};
#[cfg(feature = "diagnostics")]
pub(in crate::db) use attribution::SqlExecutePhaseAttribution;
#[cfg(feature = "diagnostics")]
pub use attribution::{
SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
};
pub(in crate::db) use cache::{SqlCacheAttribution, SqlCompiledCommandCacheKey};
pub(in crate::db::session::sql) use cache::{
SqlCompiledCommandSurface, sql_compiled_command_cache_miss_reason,
};
pub(in crate::db::session::sql) use compile::{
SqlCompileAttributionBuilder, SqlCompilePhaseAttribution,
};
pub(in crate::db) use compiled::{CompiledSqlCommand, SqlProjectionContract};
pub use result::SqlStatementResult;
#[cfg(all(test, not(feature = "diagnostics")))]
pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
#[cfg(feature = "diagnostics")]
pub use crate::db::session::sql::projection::{
SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
};
#[cfg(test)]
pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
parse_sql(sql).map_err(QueryError::from_sql_parse_error)
}
#[doc(hidden)]
pub fn sql_statement_entity_name(sql: &str) -> Result<Option<String>, QueryError> {
let (statement, _) =
parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
Ok(sql_statement_entity_name_from_statement(&statement).map(str::to_string))
}
const fn sql_statement_entity_name_from_statement(statement: &SqlStatement) -> Option<&str> {
match statement {
SqlStatement::Select(statement) => Some(statement.entity.as_str()),
SqlStatement::Delete(statement) => Some(statement.entity.as_str()),
SqlStatement::Insert(statement) => Some(statement.entity.as_str()),
SqlStatement::Update(statement) => Some(statement.entity.as_str()),
SqlStatement::Ddl(SqlDdlStatement::CreateIndex(statement)) => {
Some(statement.entity.as_str())
}
SqlStatement::Ddl(SqlDdlStatement::DropIndex(statement)) => match &statement.entity {
Some(entity) => Some(entity.as_str()),
None => None,
},
SqlStatement::Ddl(SqlDdlStatement::AlterTableAddColumn(statement)) => {
Some(statement.entity.as_str())
}
SqlStatement::Ddl(SqlDdlStatement::AlterTableAlterColumn(statement)) => {
Some(statement.entity.as_str())
}
SqlStatement::Ddl(SqlDdlStatement::AlterTableDropColumn(statement)) => {
Some(statement.entity.as_str())
}
SqlStatement::Explain(statement) => match &statement.statement {
SqlExplainTarget::Select(statement) => Some(statement.entity.as_str()),
SqlExplainTarget::Delete(statement) => Some(statement.entity.as_str()),
},
SqlStatement::Describe(statement) => Some(statement.entity.as_str()),
SqlStatement::ShowIndexes(statement) => Some(statement.entity.as_str()),
SqlStatement::ShowColumns(statement) => Some(statement.entity.as_str()),
SqlStatement::ShowEntities(_) => None,
}
}
fn measured<T>(stage: impl FnOnce() -> Result<T, QueryError>) -> Result<(u64, T), QueryError> {
let (local_instructions, result) = measure_sql_stage(stage);
let value = result?;
Ok((local_instructions, value))
}
impl<C: CanisterKind> DbSession<C> {
fn sql_select_prepared_plan_for_accepted_authority(
&self,
query: &StructuralQuery,
authority: EntityAuthority,
accepted_schema: &AcceptedSchemaSnapshot,
) -> Result<
(
SharedPreparedExecutionPlan,
SqlProjectionContract,
SqlCacheAttribution,
),
QueryError,
> {
let (prepared_plan, cache_attribution) = self
.cached_shared_query_plan_for_accepted_authority(
authority.clone(),
accepted_schema,
query,
)?;
Ok(Self::sql_select_projection_from_prepared_plan(
prepared_plan,
authority,
cache_attribution,
))
}
fn sql_select_prepared_plan_for_entity<E>(
&self,
query: &StructuralQuery,
) -> Result<
(
SharedPreparedExecutionPlan,
SqlProjectionContract,
SqlCacheAttribution,
),
QueryError,
>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let (accepted_schema, authority) = self
.accepted_entity_authority::<E>()
.map_err(QueryError::execute)?;
self.sql_select_prepared_plan_for_accepted_authority(query, authority, &accepted_schema)
}
fn sql_select_projection_from_prepared_plan(
prepared_plan: SharedPreparedExecutionPlan,
authority: EntityAuthority,
cache_attribution: QueryPlanCacheAttribution,
) -> (
SharedPreparedExecutionPlan,
SqlProjectionContract,
SqlCacheAttribution,
) {
let projection_spec = prepared_plan
.logical_plan()
.projection_spec(authority.model());
let projection = SqlProjectionContract::new(
projection_labels_from_projection_spec(&projection_spec),
projection_fixed_scales_from_projection_spec(&projection_spec),
);
(
prepared_plan,
projection,
SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
)
}
fn ensure_sql_statement_supported_for_surface(
statement: &SqlStatement,
surface: SqlCompiledCommandSurface,
) -> Result<(), QueryError> {
match (surface, statement) {
(
SqlCompiledCommandSurface::Query,
SqlStatement::Select(_)
| SqlStatement::Explain(_)
| SqlStatement::Describe(_)
| SqlStatement::ShowIndexes(_)
| SqlStatement::ShowColumns(_)
| SqlStatement::ShowEntities(_),
)
| (
SqlCompiledCommandSurface::Update,
SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
) => Ok(()),
(_, SqlStatement::Ddl(_)) => Err(QueryError::unsupported_query(
"SQL DDL execution is not supported in this release",
)),
(SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => {
Err(QueryError::unsupported_query(
"execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
))
}
(SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => {
Err(QueryError::unsupported_query(
"execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
))
}
(SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => {
Err(QueryError::unsupported_query(
"execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
))
}
(SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => {
Err(QueryError::unsupported_query(
"execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
))
}
(SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => {
Err(QueryError::unsupported_query(
"execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
))
}
(SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => {
Err(QueryError::unsupported_query(
"execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
))
}
(SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => {
Err(QueryError::unsupported_query(
"execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
))
}
(SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => {
Err(QueryError::unsupported_query(
"execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
))
}
(SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => {
Err(QueryError::unsupported_query(
"execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
))
}
}
}
pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let compiled = self.compile_sql_query::<E>(sql)?;
self.execute_compiled_sql_owned::<E>(compiled)
}
#[cfg(feature = "diagnostics")]
#[doc(hidden)]
pub fn execute_sql_query_with_attribution<E>(
&self,
sql: &str,
) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let (compile_local_instructions, compiled) =
measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
let store_get_calls_before = DataStore::current_get_call_count();
let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
let pure_covering_row_assembly_before =
current_pure_covering_row_assembly_local_instructions();
let (result, execute_cache_attribution, execute_phase_attribution) =
self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
let store_get_calls =
DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
let pure_covering_decode_local_instructions =
current_pure_covering_decode_local_instructions()
.saturating_sub(pure_covering_decode_before);
let pure_covering_row_assembly_local_instructions =
current_pure_covering_row_assembly_local_instructions()
.saturating_sub(pure_covering_row_assembly_before);
let execute_local_instructions = execute_phase_attribution
.planner_local_instructions
.saturating_add(execute_phase_attribution.store_local_instructions)
.saturating_add(execute_phase_attribution.executor_local_instructions)
.saturating_add(execute_phase_attribution.response_finalization_local_instructions);
let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
let total_local_instructions =
compile_local_instructions.saturating_add(execute_local_instructions);
let grouped = matches!(&result, SqlStatementResult::Grouped { .. }).then_some(
GroupedExecutionAttribution {
stream_local_instructions: execute_phase_attribution
.grouped_stream_local_instructions,
fold_local_instructions: execute_phase_attribution.grouped_fold_local_instructions,
finalize_local_instructions: execute_phase_attribution
.grouped_finalize_local_instructions,
count: GroupedCountAttribution::from_executor(
execute_phase_attribution.grouped_count,
),
},
);
let pure_covering = (pure_covering_decode_local_instructions > 0
|| pure_covering_row_assembly_local_instructions > 0)
.then_some(SqlPureCoveringAttribution {
decode_local_instructions: pure_covering_decode_local_instructions,
row_assembly_local_instructions: pure_covering_row_assembly_local_instructions,
});
Ok((
result,
SqlQueryExecutionAttribution {
compile_local_instructions,
compile: SqlCompileAttribution {
cache_key_local_instructions: compile_phase_attribution.cache_key,
cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
parse_local_instructions: compile_phase_attribution.parse,
parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
parse_select_local_instructions: compile_phase_attribution.parse_select,
parse_expr_local_instructions: compile_phase_attribution.parse_expr,
parse_predicate_local_instructions: compile_phase_attribution.parse_predicate,
aggregate_lane_check_local_instructions: compile_phase_attribution
.aggregate_lane_check,
prepare_local_instructions: compile_phase_attribution.prepare,
lower_local_instructions: compile_phase_attribution.lower,
bind_local_instructions: compile_phase_attribution.bind,
cache_insert_local_instructions: compile_phase_attribution.cache_insert,
},
plan_lookup_local_instructions: execute_phase_attribution
.planner_local_instructions,
execution: SqlExecutionAttribution {
planner_local_instructions: execute_phase_attribution
.planner_local_instructions,
store_local_instructions: execute_phase_attribution.store_local_instructions,
executor_invocation_local_instructions: execute_phase_attribution
.executor_invocation_local_instructions,
executor_local_instructions: execute_phase_attribution
.executor_local_instructions,
response_finalization_local_instructions: execute_phase_attribution
.response_finalization_local_instructions,
},
grouped,
scalar_aggregate: SqlScalarAggregateAttribution::from_executor(
execute_phase_attribution.scalar_aggregate_terminal,
),
pure_covering,
store_get_calls,
response_decode_local_instructions: 0,
execute_local_instructions,
total_local_instructions,
cache: SqlQueryCacheAttribution {
sql_compiled_command_hits: cache_attribution.sql_compiled_command_cache_hits,
sql_compiled_command_misses: cache_attribution
.sql_compiled_command_cache_misses,
shared_query_plan_hits: cache_attribution.shared_query_plan_cache_hits,
shared_query_plan_misses: cache_attribution.shared_query_plan_cache_misses,
},
},
))
}
pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let compiled = self.compile_sql_update::<E>(sql)?;
self.execute_compiled_sql_owned::<E>(compiled)
}
pub fn prepare_sql_ddl<E>(&self, sql: &str) -> Result<SqlDdlPreparationReport, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let (_, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
Ok(prepared.report().clone())
}
fn prepare_sql_ddl_command<E>(
&self,
sql: &str,
) -> Result<(AcceptedSchemaSnapshot, PreparedSqlDdlCommand), QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let (statement, _) =
parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
let (accepted_schema, _) = self
.accepted_entity_authority::<E>()
.map_err(QueryError::execute)?;
let schema_info = SchemaInfo::from_accepted_snapshot_for_model_with_expression_indexes(
E::MODEL,
&accepted_schema,
true,
);
let prepared = match prepare_sql_ddl_statement(
&statement,
&accepted_schema,
&schema_info,
E::Store::PATH,
) {
Ok(prepared) => prepared,
Err(err) => {
return Err(QueryError::unsupported_query(format!(
"SQL DDL preparation failed before execution: {err}"
)));
}
};
Ok((accepted_schema, prepared))
}
pub fn execute_sql_ddl<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let (accepted_before, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
if !prepared.mutates_schema() {
return Ok(SqlStatementResult::Ddl(
prepared
.report()
.clone()
.with_execution_status(SqlDdlExecutionStatus::NoOp),
));
}
let Some(derivation) = prepared.derivation() else {
return Err(QueryError::unsupported_query(
"SQL DDL execution could not find a prepared schema derivation".to_string(),
));
};
let store = self
.db
.recovered_store(E::Store::PATH)
.map_err(QueryError::execute)?;
let (rows_scanned, index_keys_written) = match prepared.bound().statement() {
crate::db::sql::ddl::BoundSqlDdlStatement::AddColumn(_) => {
execute_sql_ddl_field_addition(
store,
E::ENTITY_TAG,
E::PATH,
&accepted_before,
derivation,
)
.map_err(QueryError::execute)?;
(0, 0)
}
crate::db::sql::ddl::BoundSqlDdlStatement::AlterColumnDefault(_) => {
execute_sql_ddl_field_default_change(
store,
E::ENTITY_TAG,
E::PATH,
&accepted_before,
derivation,
)
.map_err(QueryError::execute)?;
(0, 0)
}
crate::db::sql::ddl::BoundSqlDdlStatement::AlterColumnNullability(_) => {
let rows_scanned = execute_sql_ddl_field_nullability_change(
store,
E::ENTITY_TAG,
E::PATH,
&accepted_before,
derivation,
)
.map_err(QueryError::execute)?;
(rows_scanned, 0)
}
crate::db::sql::ddl::BoundSqlDdlStatement::CreateIndex(create)
if create.candidate_index().key().is_field_path_only() =>
{
execute_sql_ddl_field_path_index_addition(
store,
E::ENTITY_TAG,
E::PATH,
&accepted_before,
derivation,
)
.map_err(QueryError::execute)?
}
crate::db::sql::ddl::BoundSqlDdlStatement::CreateIndex(_) => {
execute_sql_ddl_expression_index_addition(
store,
E::ENTITY_TAG,
E::PATH,
&accepted_before,
derivation,
)
.map_err(QueryError::execute)?
}
crate::db::sql::ddl::BoundSqlDdlStatement::DropIndex(_) => {
execute_sql_ddl_secondary_index_drop(
store,
E::ENTITY_TAG,
E::PATH,
&accepted_before,
derivation,
)
.map_err(QueryError::execute)?;
(0, 0)
}
crate::db::sql::ddl::BoundSqlDdlStatement::NoOp(_) => (0, 0),
};
Ok(SqlStatementResult::Ddl(
prepared
.report()
.clone()
.with_execution_status(SqlDdlExecutionStatus::Published)
.with_execution_metrics(rows_scanned, index_keys_written),
))
}
}