mod attribution;
mod cache;
mod compile;
mod compile_cache;
mod compiled;
mod delete_policy;
mod execute;
mod projection;
mod result;
mod update_policy;
mod write_policy;
#[cfg(feature = "diagnostics")]
use crate::db::diagnostics::StoreCounterSnapshot;
#[cfg(feature = "diagnostics")]
use crate::db::executor::{
current_pure_covering_decode_local_instructions,
current_pure_covering_row_assembly_local_instructions,
};
#[cfg(feature = "sql-explain")]
use crate::db::sql::parser::SqlExplainTarget;
#[cfg(test)]
use crate::db::sql::parser::parse_sql;
use crate::{
db::{
DbSession, PersistedRow, QueryError,
schema::AcceptedSchemaSnapshot,
schema::{
execute_sql_ddl_expression_index_addition, execute_sql_ddl_field_addition,
execute_sql_ddl_field_default_change, execute_sql_ddl_field_drop,
execute_sql_ddl_field_nullability_change, execute_sql_ddl_field_path_index_addition,
execute_sql_ddl_field_rename, execute_sql_ddl_secondary_index_drop,
},
session::AcceptedSchemaCatalogContext,
sql::{
ddl::{PreparedSqlDdlCommand, prepare_sql_ddl_statement},
parser::{SqlDdlStatement, SqlStatement, parse_sql_with_attribution},
},
},
traits::{CanisterKind, EntityValue, Path},
};
use icydb_diagnostic_code::{SqlLoweringCode, SqlSurfaceMismatchCode};
pub(in crate::db::session::sql) use crate::db::diagnostics::measure_local_instruction_delta as measure_sql_stage;
pub use crate::db::sql::ddl::{SqlDdlExecutionStatus, SqlDdlMutationKind, SqlDdlPreparationReport};
#[cfg(feature = "diagnostics")]
pub(in crate::db) use attribution::SqlExecutePhaseAttribution;
#[cfg(feature = "diagnostics")]
pub(in crate::db::session::sql) use attribution::SqlQueryExecutionAttributionInputs;
#[cfg(feature = "diagnostics")]
pub use attribution::{
SqlCompileAttribution, SqlExecutionAttribution, SqlHybridCoveringAttribution,
SqlOutputBlobAttribution, SqlPureCoveringAttribution, SqlQueryCacheAttribution,
SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
};
pub(in crate::db) use cache::{SqlCacheAttribution, SqlCompiledCommandCacheKey};
pub(in crate::db::session::sql) use cache::{
SqlCompiledCommandSurface, sql_compiled_command_cache_miss_reason,
};
pub(in crate::db::session::sql) use compile::{
SqlCompileAttributionBuilder, SqlCompilePhaseAttribution,
};
pub(in crate::db) use compiled::{
CompiledSqlCommand, CompiledSqlInsertCommand, SqlCompiledCommandExecutionContext,
SqlCompiledSchemaFingerprint, SqlGlobalAggregateCountPlanCacheEntry,
};
#[cfg(test)]
pub(in crate::db) use delete_policy::{
DEFAULT_PUBLIC_BOUNDED_DELETE_LIMIT, DEFAULT_PUBLIC_DELETE_RETURNING_RESPONSE_BYTES,
};
#[cfg(test)]
pub(in crate::db) const DEFAULT_PUBLIC_INSERT_STAGED_ROWS: u32 =
write_policy::DEFAULT_PUBLIC_BOUNDED_WRITE_LIMIT;
pub use delete_policy::{
SqlAdminBulkDeletePlan, SqlDeleteExposurePolicy, SqlDeletePolicyContext,
SqlDeletePolicyRejection, SqlDeletePolicyReport, SqlDeleteStatementClassification,
SqlPublicBoundedDeletePlan, SqlPublicPrimaryKeyDeletePlan, SqlSessionCurrentDeletePlan,
SqlValidatedDeletePlan, classify_sql_delete_policy,
};
pub(in crate::db) use projection::SqlProjectionContract;
pub use result::SqlStatementResult;
#[cfg(test)]
pub(in crate::db) use update_policy::{
DEFAULT_PUBLIC_BOUNDED_UPDATE_LIMIT, DEFAULT_PUBLIC_UPDATE_RETURNING_RESPONSE_BYTES,
};
pub use update_policy::{
SqlAdminBulkUpdatePlan, SqlPublicBoundedUpdatePlan, SqlPublicPrimaryKeyUpdatePlan,
SqlSessionCurrentUpdatePlan, SqlUpdateAssignmentPolicy, SqlUpdateExposurePolicy,
SqlUpdatePolicyContext, SqlUpdatePolicyRejection, SqlUpdatePolicyReport,
SqlUpdateStatementClassification, SqlValidatedUpdatePlan, classify_sql_update_policy,
};
pub(in crate::db::session::sql) use write_policy::combined_optional_row_bound;
pub use write_policy::{
SqlWriteExecutionBounds, SqlWriteOrderProof, SqlWriteReturningBounds, SqlWriteReturningShape,
SqlWriteStatementShape, SqlWriteWhereProof,
};
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
#[doc(hidden)]
pub enum SqlStatementSurface {
Query,
Ddl,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
#[doc(hidden)]
pub enum SqlStatementShellSurface {
Query,
Ddl,
Update,
}
#[derive(Clone, Debug, Eq, PartialEq)]
#[doc(hidden)]
pub struct SqlStatementDispatch {
entity_name: Option<String>,
requires_introspection: bool,
}
impl SqlStatementDispatch {
#[must_use]
const fn new(entity_name: Option<String>, requires_introspection: bool) -> Self {
Self {
entity_name,
requires_introspection,
}
}
#[must_use]
pub fn entity_name(&self) -> Option<&str> {
self.entity_name.as_deref()
}
#[must_use]
pub const fn requires_introspection(&self) -> bool {
self.requires_introspection
}
}
#[cfg(all(test, not(feature = "diagnostics")))]
pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
#[cfg(feature = "diagnostics")]
pub use crate::db::session::sql::projection::{
SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
};
#[cfg(test)]
pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
parse_sql(sql).map_err(QueryError::from_sql_parse_error)
}
#[doc(hidden)]
pub fn sql_statement_entity_name(sql: &str) -> Result<Option<String>, QueryError> {
let (statement, _) =
parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
Ok(sql_statement_entity_name_from_statement(&statement).map(str::to_string))
}
#[doc(hidden)]
pub fn sql_statement_surface(sql: &str) -> Result<SqlStatementSurface, QueryError> {
let (statement, _) =
parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
Ok(sql_statement_surface_from_statement(&statement))
}
#[doc(hidden)]
pub fn sql_statement_shell_surface(sql: &str) -> Result<SqlStatementShellSurface, QueryError> {
let (statement, _) =
parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
Ok(sql_statement_shell_surface_from_statement(&statement))
}
#[doc(hidden)]
pub fn sql_statement_dispatch(sql: &str) -> Result<SqlStatementDispatch, QueryError> {
let (statement, _) =
parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
Ok(SqlStatementDispatch::new(
sql_statement_entity_name_from_statement(&statement).map(str::to_string),
sql_statement_requires_introspection_from_statement(&statement),
))
}
const fn sql_statement_surface_from_statement(statement: &SqlStatement) -> SqlStatementSurface {
match statement {
SqlStatement::Ddl(_) => SqlStatementSurface::Ddl,
SqlStatement::Select(_)
| SqlStatement::Delete(_)
| SqlStatement::Insert(_)
| SqlStatement::Update(_)
| SqlStatement::Describe(_)
| SqlStatement::ShowIndexes(_)
| SqlStatement::ShowColumns(_)
| SqlStatement::ShowEntities(_)
| SqlStatement::ShowStores(_)
| SqlStatement::ShowMemory(_) => SqlStatementSurface::Query,
#[cfg(feature = "sql-explain")]
SqlStatement::Explain(_) => SqlStatementSurface::Query,
}
}
const fn sql_statement_shell_surface_from_statement(
statement: &SqlStatement,
) -> SqlStatementShellSurface {
match statement {
SqlStatement::Ddl(_) => SqlStatementShellSurface::Ddl,
SqlStatement::Update(_) => SqlStatementShellSurface::Update,
SqlStatement::Select(_)
| SqlStatement::Delete(_)
| SqlStatement::Insert(_)
| SqlStatement::Describe(_)
| SqlStatement::ShowIndexes(_)
| SqlStatement::ShowColumns(_)
| SqlStatement::ShowEntities(_)
| SqlStatement::ShowStores(_)
| SqlStatement::ShowMemory(_) => SqlStatementShellSurface::Query,
#[cfg(feature = "sql-explain")]
SqlStatement::Explain(_) => SqlStatementShellSurface::Query,
}
}
const fn sql_statement_requires_introspection_from_statement(statement: &SqlStatement) -> bool {
match statement {
#[cfg(feature = "sql-explain")]
SqlStatement::Explain(_) => true,
SqlStatement::Describe(_)
| SqlStatement::ShowIndexes(_)
| SqlStatement::ShowColumns(_)
| SqlStatement::ShowEntities(_)
| SqlStatement::ShowStores(_)
| SqlStatement::ShowMemory(_) => true,
SqlStatement::Select(_)
| SqlStatement::Delete(_)
| SqlStatement::Insert(_)
| SqlStatement::Update(_)
| SqlStatement::Ddl(_) => false,
}
}
const fn sql_statement_entity_name_from_statement(statement: &SqlStatement) -> Option<&str> {
match statement {
SqlStatement::Select(statement) => Some(statement.entity.as_str()),
SqlStatement::Delete(statement) => Some(statement.entity.as_str()),
SqlStatement::Insert(statement) => Some(statement.entity.as_str()),
SqlStatement::Update(statement) => Some(statement.entity.as_str()),
SqlStatement::Ddl(SqlDdlStatement::CreateIndex(statement)) => {
Some(statement.entity.as_str())
}
SqlStatement::Ddl(SqlDdlStatement::DropIndex(statement)) => match &statement.entity {
Some(entity) => Some(entity.as_str()),
None => None,
},
SqlStatement::Ddl(SqlDdlStatement::AlterTableAddColumn(statement)) => {
Some(statement.entity.as_str())
}
SqlStatement::Ddl(SqlDdlStatement::AlterTableAlterColumn(statement)) => {
Some(statement.entity.as_str())
}
SqlStatement::Ddl(SqlDdlStatement::AlterTableDropColumn(statement)) => {
Some(statement.entity.as_str())
}
SqlStatement::Ddl(SqlDdlStatement::AlterTableRenameColumn(statement)) => {
Some(statement.entity.as_str())
}
#[cfg(feature = "sql-explain")]
SqlStatement::Explain(statement) => match &statement.statement {
SqlExplainTarget::Select(statement) => Some(statement.entity.as_str()),
SqlExplainTarget::Delete(statement) => Some(statement.entity.as_str()),
},
SqlStatement::Describe(statement) => Some(statement.entity.as_str()),
SqlStatement::ShowIndexes(statement) => Some(statement.entity.as_str()),
SqlStatement::ShowColumns(statement) => Some(statement.entity.as_str()),
SqlStatement::ShowEntities(_)
| SqlStatement::ShowStores(_)
| SqlStatement::ShowMemory(_) => None,
}
}
fn measured<T>(stage: impl FnOnce() -> Result<T, QueryError>) -> Result<(u64, T), QueryError> {
let (local_instructions, result) = measure_sql_stage(stage);
let value = result?;
Ok((local_instructions, value))
}
impl<C: CanisterKind> DbSession<C> {
fn ensure_sql_statement_supported_for_surface(
statement: &SqlStatement,
surface: SqlCompiledCommandSurface,
) -> Result<(), QueryError> {
match (surface, statement) {
(
SqlCompiledCommandSurface::Query,
SqlStatement::Select(_)
| SqlStatement::Describe(_)
| SqlStatement::ShowIndexes(_)
| SqlStatement::ShowColumns(_)
| SqlStatement::ShowEntities(_)
| SqlStatement::ShowStores(_)
| SqlStatement::ShowMemory(_),
) => Ok(()),
#[cfg(feature = "sql-explain")]
(SqlCompiledCommandSurface::Query, SqlStatement::Explain(_)) => Ok(()),
(
SqlCompiledCommandSurface::Update,
SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
) => Ok(()),
(_, SqlStatement::Ddl(_)) => Err(QueryError::sql_lowering(
SqlLoweringCode::SqlDdlExecutionUnsupported,
)),
(SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => Err(
QueryError::sql_surface_mismatch(SqlSurfaceMismatchCode::QueryRejectsInsert),
),
(SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => Err(
QueryError::sql_surface_mismatch(SqlSurfaceMismatchCode::QueryRejectsUpdate),
),
(SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => Err(
QueryError::sql_surface_mismatch(SqlSurfaceMismatchCode::QueryRejectsDelete),
),
(SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => Err(
QueryError::sql_surface_mismatch(SqlSurfaceMismatchCode::UpdateRejectsSelect),
),
#[cfg(feature = "sql-explain")]
(SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => Err(
QueryError::sql_surface_mismatch(SqlSurfaceMismatchCode::UpdateRejectsExplain),
),
(SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => Err(
QueryError::sql_surface_mismatch(SqlSurfaceMismatchCode::UpdateRejectsDescribe),
),
(SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => Err(
QueryError::sql_surface_mismatch(SqlSurfaceMismatchCode::UpdateRejectsShowIndexes),
),
(SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => Err(
QueryError::sql_surface_mismatch(SqlSurfaceMismatchCode::UpdateRejectsShowColumns),
),
(SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => Err(
QueryError::sql_surface_mismatch(SqlSurfaceMismatchCode::UpdateRejectsShowEntities),
),
(SqlCompiledCommandSurface::Update, SqlStatement::ShowStores(_)) => Err(
QueryError::sql_surface_mismatch(SqlSurfaceMismatchCode::UpdateRejectsShowStores),
),
(SqlCompiledCommandSurface::Update, SqlStatement::ShowMemory(_)) => Err(
QueryError::sql_surface_mismatch(SqlSurfaceMismatchCode::UpdateRejectsShowMemory),
),
}
}
pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let (compiled, _, _) = self.compile_sql_query_with_execution_context::<E>(sql)?;
self.execute_compiled_sql_context_owned::<E>(compiled)
}
#[cfg(feature = "diagnostics")]
#[doc(hidden)]
pub fn execute_sql_query_with_attribution<E>(
&self,
sql: &str,
) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let (compile_local_instructions, compiled) =
measure_sql_stage(|| self.compile_sql_query_with_execution_context::<E>(sql));
let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
let store_counters_before = StoreCounterSnapshot::capture();
let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
let pure_covering_row_assembly_before =
current_pure_covering_row_assembly_local_instructions();
let (executed, projection_materialization) =
with_sql_projection_materialization_metrics(|| {
self.execute_compiled_sql_context_with_phase_attribution::<E>(&compiled)
});
let (result, execute_cache_attribution, execute_phase_attribution) = executed?;
let store_counters = store_counters_before.delta_since();
let pure_covering_decode_local_instructions =
current_pure_covering_decode_local_instructions()
.saturating_sub(pure_covering_decode_before);
let pure_covering_row_assembly_local_instructions =
current_pure_covering_row_assembly_local_instructions()
.saturating_sub(pure_covering_row_assembly_before);
let attribution = SqlQueryExecutionAttribution::from_inputs(
&result,
&SqlQueryExecutionAttributionInputs {
compile_local_instructions,
compile_phase_attribution,
compile_cache_attribution,
execute_cache_attribution,
execute_phase_attribution,
pure_covering_decode_local_instructions,
pure_covering_row_assembly_local_instructions,
projection_materialization,
store_counters,
},
);
Ok((result, attribution))
}
pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let compiled = self.compile_sql_update::<E>(sql)?;
self.execute_update_surface_compiled_sql_owned::<E>(compiled)
}
pub fn prepare_sql_ddl<E>(&self, sql: &str) -> Result<SqlDdlPreparationReport, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let (_, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
Ok(prepared.report().clone())
}
fn prepare_sql_ddl_command<E>(
&self,
sql: &str,
) -> Result<(AcceptedSchemaCatalogContext, PreparedSqlDdlCommand), QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let (statement, _) =
parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
let catalog = self
.accepted_schema_catalog_context_for_query::<E>()
.map_err(QueryError::execute)?;
let schema_info = catalog.accepted_schema_info_for::<E>();
let prepared = match prepare_sql_ddl_statement(
&statement,
catalog.snapshot(),
&schema_info,
E::Store::PATH,
) {
Ok(prepared) => prepared,
Err(err) => return Err(QueryError::from_sql_ddl_prepare_error(err)),
};
Ok((catalog, prepared))
}
pub fn execute_sql_ddl<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let (accepted_before, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
if !prepared.mutates_schema() {
return Ok(SqlStatementResult::Ddl(
prepared
.report()
.clone()
.with_execution_status(SqlDdlExecutionStatus::NoOp),
));
}
let Some(derivation) = prepared.derivation() else {
return Err(QueryError::unsupported_query());
};
let store = self
.db
.recovered_store(E::Store::PATH)
.map_err(QueryError::execute)?;
let (rows_scanned, index_keys_written) = Self::execute_prepared_sql_ddl_mutation::<E>(
store,
accepted_before.snapshot(),
accepted_before.identity(),
derivation,
&prepared,
)?;
self.invalidate_accepted_schema_query_cache_for_entity::<E>();
Ok(SqlStatementResult::Ddl(
prepared
.report()
.clone()
.with_execution_status(SqlDdlExecutionStatus::Published)
.with_execution_metrics(rows_scanned, index_keys_written),
))
}
fn execute_prepared_sql_ddl_mutation<E>(
store: crate::db::registry::StoreHandle,
accepted_before: &AcceptedSchemaSnapshot,
accepted_before_identity: crate::db::schema::AcceptedCatalogIdentity,
derivation: &crate::db::schema::SchemaDdlAcceptedSnapshotDerivation,
prepared: &PreparedSqlDdlCommand,
) -> Result<(usize, usize), QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let metrics = match prepared.bound().statement() {
crate::db::sql::ddl::BoundSqlDdlStatement::AddColumn(_) => {
execute_sql_ddl_field_addition(
store,
E::ENTITY_TAG,
E::PATH,
accepted_before,
accepted_before_identity,
derivation,
)
.map_err(QueryError::from_sql_ddl_execution_error)?;
(0, 0)
}
crate::db::sql::ddl::BoundSqlDdlStatement::AlterColumnDefault(_) => {
execute_sql_ddl_field_default_change(
store,
E::ENTITY_TAG,
E::PATH,
accepted_before,
accepted_before_identity,
derivation,
)
.map_err(QueryError::from_sql_ddl_execution_error)?;
(0, 0)
}
crate::db::sql::ddl::BoundSqlDdlStatement::AlterColumnNullability(_) => {
let rows_scanned = execute_sql_ddl_field_nullability_change(
store,
E::ENTITY_TAG,
E::PATH,
accepted_before,
accepted_before_identity,
derivation,
)
.map_err(QueryError::from_sql_ddl_execution_error)?;
(rows_scanned, 0)
}
crate::db::sql::ddl::BoundSqlDdlStatement::DropColumn(_) => {
execute_sql_ddl_field_drop(
store,
E::ENTITY_TAG,
E::PATH,
accepted_before,
accepted_before_identity,
derivation,
)
.map_err(QueryError::from_sql_ddl_execution_error)?;
(0, 0)
}
crate::db::sql::ddl::BoundSqlDdlStatement::RenameColumn(_) => {
execute_sql_ddl_field_rename(
store,
E::ENTITY_TAG,
E::PATH,
accepted_before,
accepted_before_identity,
derivation,
)
.map_err(QueryError::from_sql_ddl_execution_error)?;
(0, 0)
}
crate::db::sql::ddl::BoundSqlDdlStatement::CreateIndex(create)
if create.candidate_index().key().is_field_path_only() =>
{
execute_sql_ddl_field_path_index_addition(
store,
E::ENTITY_TAG,
E::PATH,
accepted_before,
accepted_before_identity,
derivation,
)
.map_err(QueryError::from_sql_ddl_execution_error)?
}
crate::db::sql::ddl::BoundSqlDdlStatement::CreateIndex(_) => {
execute_sql_ddl_expression_index_addition(
store,
E::ENTITY_TAG,
E::PATH,
accepted_before,
accepted_before_identity,
derivation,
)
.map_err(QueryError::from_sql_ddl_execution_error)?
}
crate::db::sql::ddl::BoundSqlDdlStatement::DropIndex(_) => {
execute_sql_ddl_secondary_index_drop(
store,
E::ENTITY_TAG,
E::PATH,
accepted_before,
accepted_before_identity,
derivation,
)
.map_err(QueryError::from_sql_ddl_execution_error)?;
(0, 0)
}
crate::db::sql::ddl::BoundSqlDdlStatement::NoOp(_) => (0, 0),
};
Ok(metrics)
}
}