use std::sync::Arc;
use crate::{
db::{
DbSession, MissingRowPolicy, QueryError,
executor::EntityAuthority,
session::sql::{
CompiledSqlCommand, SqlCompiledCommandCacheKey, SqlCompiledCommandSurface,
compile::{SqlCompileArtifacts, SqlCompilePhaseAttribution, SqlQueryShape},
measured,
},
sql::{
lowering::{
PreparedSqlStatement, bind_lowered_sql_delete_query_structural,
bind_lowered_sql_select_query_structural,
compile_sql_global_aggregate_command_core_from_prepared,
extract_prepared_sql_insert_statement, extract_prepared_sql_update_statement,
lower_prepared_sql_delete_statement, lower_prepared_sql_select_statement,
lower_sql_command_from_prepared_statement, prepare_sql_statement,
},
parser::SqlStatement,
},
},
model::entity::EntityModel,
traits::CanisterKind,
};
impl<C: CanisterKind> DbSession<C> {
fn compile_sql_statement_core(
statement: &SqlStatement,
authority: EntityAuthority,
compiled_cache_key: SqlCompiledCommandCacheKey,
) -> Result<SqlCompileArtifacts, QueryError> {
let model = authority.model();
match statement {
SqlStatement::Select(_) => Self::compile_select(statement, model, compiled_cache_key),
SqlStatement::Delete(_) => Self::compile_delete(statement, model),
SqlStatement::Insert(_) => Self::compile_insert(statement, model),
SqlStatement::Update(_) => Self::compile_update(statement, model),
SqlStatement::Explain(_) => Self::compile_explain(statement, model),
SqlStatement::Describe(_) => Self::compile_describe(statement, model),
SqlStatement::ShowIndexes(_) => Self::compile_show_indexes(statement, model),
SqlStatement::ShowColumns(_) => Self::compile_show_columns(statement, model),
SqlStatement::ShowEntities(_) => Ok(Self::compile_show_entities()),
}
}
fn prepare_statement_for_model(
statement: &SqlStatement,
model: &'static EntityModel,
) -> Result<(u64, PreparedSqlStatement), QueryError> {
measured(|| {
prepare_sql_statement(statement, model.name())
.map_err(QueryError::from_sql_lowering_error)
})
}
fn compile_select(
statement: &SqlStatement,
model: &'static EntityModel,
compiled_cache_key: SqlCompiledCommandCacheKey,
) -> Result<SqlCompileArtifacts, QueryError> {
let (prepare_local_instructions, prepared) =
Self::prepare_statement_for_model(statement, model)?;
let (aggregate_lane_check_local_instructions, requires_aggregate_lane) =
measured(|| Ok(prepared.statement().is_global_aggregate_lane_shape()))?;
if requires_aggregate_lane {
Self::compile_select_global_aggregate(
prepared,
model,
aggregate_lane_check_local_instructions,
prepare_local_instructions,
)
} else {
Self::compile_select_non_aggregate(
prepared,
model,
compiled_cache_key,
aggregate_lane_check_local_instructions,
prepare_local_instructions,
)
}
}
fn compile_select_global_aggregate(
prepared: PreparedSqlStatement,
model: &'static EntityModel,
aggregate_lane_check_local_instructions: u64,
prepare_local_instructions: u64,
) -> Result<SqlCompileArtifacts, QueryError> {
let (lower_local_instructions, command) = measured(|| {
compile_sql_global_aggregate_command_core_from_prepared(
prepared,
model,
MissingRowPolicy::Ignore,
)
.map_err(QueryError::from_sql_lowering_error)
})?;
Ok(SqlCompileArtifacts::new(
CompiledSqlCommand::GlobalAggregate {
command: Box::new(command),
},
SqlQueryShape::read_rows(true),
aggregate_lane_check_local_instructions,
prepare_local_instructions,
lower_local_instructions,
0,
))
}
fn compile_select_non_aggregate(
prepared: PreparedSqlStatement,
model: &'static EntityModel,
compiled_cache_key: SqlCompiledCommandCacheKey,
aggregate_lane_check_local_instructions: u64,
prepare_local_instructions: u64,
) -> Result<SqlCompileArtifacts, QueryError> {
let (lower_local_instructions, select) = measured(|| {
lower_prepared_sql_select_statement(prepared, model)
.map_err(QueryError::from_sql_lowering_error)
})?;
let (bind_local_instructions, query) = measured(|| {
bind_lowered_sql_select_query_structural(model, select, MissingRowPolicy::Ignore)
.map_err(QueryError::from_sql_lowering_error)
})?;
Ok(SqlCompileArtifacts::new(
CompiledSqlCommand::Select {
query: Arc::new(query),
compiled_cache_key,
},
SqlQueryShape::read_rows(false),
aggregate_lane_check_local_instructions,
prepare_local_instructions,
lower_local_instructions,
bind_local_instructions,
))
}
fn compile_delete(
statement: &SqlStatement,
model: &'static EntityModel,
) -> Result<SqlCompileArtifacts, QueryError> {
let (prepare_local_instructions, prepared) =
Self::prepare_statement_for_model(statement, model)?;
let (lower_local_instructions, delete) = measured(|| {
lower_prepared_sql_delete_statement(prepared)
.map_err(QueryError::from_sql_lowering_error)
})?;
let returning = delete.returning().cloned();
let query = delete.into_base_query();
let (bind_local_instructions, query) = measured(|| {
Ok(bind_lowered_sql_delete_query_structural(
model,
query,
MissingRowPolicy::Ignore,
))
})?;
let shape = SqlQueryShape::mutation(returning.is_some());
Ok(SqlCompileArtifacts::new(
CompiledSqlCommand::Delete {
query: Arc::new(query),
returning,
},
shape,
0,
prepare_local_instructions,
lower_local_instructions,
bind_local_instructions,
))
}
fn compile_insert(
statement: &SqlStatement,
model: &'static EntityModel,
) -> Result<SqlCompileArtifacts, QueryError> {
let (prepare_local_instructions, prepared) =
Self::prepare_statement_for_model(statement, model)?;
let statement = extract_prepared_sql_insert_statement(prepared)
.map_err(QueryError::from_sql_lowering_error)?;
let shape = SqlQueryShape::mutation(statement.returning.is_some());
Ok(SqlCompileArtifacts::new(
CompiledSqlCommand::Insert(statement),
shape,
0,
prepare_local_instructions,
0,
0,
))
}
fn compile_update(
statement: &SqlStatement,
model: &'static EntityModel,
) -> Result<SqlCompileArtifacts, QueryError> {
let (prepare_local_instructions, prepared) =
Self::prepare_statement_for_model(statement, model)?;
let statement = extract_prepared_sql_update_statement(prepared)
.map_err(QueryError::from_sql_lowering_error)?;
let shape = SqlQueryShape::mutation(statement.returning.is_some());
Ok(SqlCompileArtifacts::new(
CompiledSqlCommand::Update(statement),
shape,
0,
prepare_local_instructions,
0,
0,
))
}
fn compile_explain(
statement: &SqlStatement,
model: &'static EntityModel,
) -> Result<SqlCompileArtifacts, QueryError> {
let (prepare_local_instructions, prepared) =
Self::prepare_statement_for_model(statement, model)?;
let (lower_local_instructions, lowered) = measured(|| {
lower_sql_command_from_prepared_statement(prepared, model)
.map_err(QueryError::from_sql_lowering_error)
})?;
Ok(SqlCompileArtifacts::new(
CompiledSqlCommand::Explain(Box::new(lowered)),
SqlQueryShape::metadata(),
0,
prepare_local_instructions,
lower_local_instructions,
0,
))
}
fn compile_describe(
statement: &SqlStatement,
model: &'static EntityModel,
) -> Result<SqlCompileArtifacts, QueryError> {
let (prepare_local_instructions, _prepared) =
Self::prepare_statement_for_model(statement, model)?;
Ok(SqlCompileArtifacts::new(
CompiledSqlCommand::DescribeEntity,
SqlQueryShape::metadata(),
0,
prepare_local_instructions,
0,
0,
))
}
fn compile_show_indexes(
statement: &SqlStatement,
model: &'static EntityModel,
) -> Result<SqlCompileArtifacts, QueryError> {
let (prepare_local_instructions, _prepared) =
Self::prepare_statement_for_model(statement, model)?;
Ok(SqlCompileArtifacts::new(
CompiledSqlCommand::ShowIndexesEntity,
SqlQueryShape::metadata(),
0,
prepare_local_instructions,
0,
0,
))
}
fn compile_show_columns(
statement: &SqlStatement,
model: &'static EntityModel,
) -> Result<SqlCompileArtifacts, QueryError> {
let (prepare_local_instructions, _prepared) =
Self::prepare_statement_for_model(statement, model)?;
Ok(SqlCompileArtifacts::new(
CompiledSqlCommand::ShowColumnsEntity,
SqlQueryShape::metadata(),
0,
prepare_local_instructions,
0,
0,
))
}
fn compile_show_entities() -> SqlCompileArtifacts {
SqlCompileArtifacts::new(
CompiledSqlCommand::ShowEntities,
SqlQueryShape::metadata(),
0,
0,
0,
0,
)
}
fn compile_sql_statement_entry(
statement: &SqlStatement,
surface: SqlCompiledCommandSurface,
authority: EntityAuthority,
compiled_cache_key: SqlCompiledCommandCacheKey,
) -> Result<SqlCompileArtifacts, QueryError> {
Self::ensure_sql_statement_supported_for_surface(statement, surface)?;
Self::compile_sql_statement_core(statement, authority, compiled_cache_key)
}
pub(in crate::db::session::sql) fn compile_sql_statement_measured(
statement: &SqlStatement,
surface: SqlCompiledCommandSurface,
authority: EntityAuthority,
compiled_cache_key: SqlCompiledCommandCacheKey,
) -> Result<(SqlCompileArtifacts, SqlCompilePhaseAttribution), QueryError> {
let artifacts =
Self::compile_sql_statement_entry(statement, surface, authority, compiled_cache_key)?;
debug_assert!(
!artifacts.shape.is_aggregate || artifacts.bind == 0,
"aggregate SQL artifacts must not report scalar bind work"
);
debug_assert!(
!artifacts.shape.is_mutation || artifacts.aggregate_lane_check == 0,
"mutation SQL artifacts must not report SELECT lane checks"
);
let attribution = artifacts.phase_attribution();
Ok((artifacts, attribution))
}
}