mod aggregate;
mod computed_projection;
mod dispatch;
mod explain;
mod projection;
mod surface;
use crate::{
db::{
DbSession, EntityResponse, GroupedTextCursorPageWithTrace, MissingRowPolicy,
PagedGroupedExecutionWithTrace, PersistedRow, Query, QueryError,
executor::EntityAuthority,
query::{
intent::StructuralQuery,
plan::{AccessPlannedQuery, VisibleIndexes},
},
sql::{
lowering::{
bind_lowered_sql_query, lower_sql_command_from_prepared_statement,
prepare_sql_statement,
},
parser::{SqlStatement, parse_sql},
},
},
traits::{CanisterKind, EntityKind, EntityValue},
};
use crate::db::session::sql::aggregate::{
SqlAggregateSurface, parsed_requires_dedicated_sql_aggregate_lane,
unsupported_sql_aggregate_lane_message,
};
use crate::db::session::sql::surface::{
SqlSurface, session_sql_lane, sql_statement_route_from_statement, unsupported_sql_lane_message,
};
#[cfg(feature = "structural-read-metrics")]
pub use crate::db::session::sql::projection::{
SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
};
pub use crate::db::session::sql::surface::{
SqlDispatchResult, SqlParsedStatement, SqlStatementRoute,
};
#[cfg(feature = "perf-attribution")]
pub use crate::db::{
session::sql::dispatch::LoweredSqlDispatchExecutorAttribution,
session::sql::projection::SqlProjectionTextExecutorAttribution,
};
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum SqlComputedProjectionSurface {
QueryFrom,
ExecuteSql,
ExecuteSqlGrouped,
}
const fn unsupported_sql_computed_projection_message(
surface: SqlComputedProjectionSurface,
) -> &'static str {
match surface {
SqlComputedProjectionSurface::QueryFrom => {
"query_from_sql does not accept computed text projection; use execute_sql_dispatch(...)"
}
SqlComputedProjectionSurface::ExecuteSql => {
"execute_sql rejects computed text projection; use execute_sql_dispatch(...)"
}
SqlComputedProjectionSurface::ExecuteSqlGrouped => {
"execute_sql_grouped rejects computed text projection; use execute_sql_dispatch(...)"
}
}
}
const fn unsupported_sql_write_surface_message(
surface: SqlSurface,
statement: &SqlStatement,
) -> &'static str {
match (surface, statement) {
(SqlSurface::QueryFrom, SqlStatement::Insert(_)) => {
"query_from_sql rejects INSERT; use execute_sql_dispatch(...)"
}
(SqlSurface::QueryFrom, SqlStatement::Update(_)) => {
"query_from_sql rejects UPDATE; use execute_sql_dispatch(...)"
}
(SqlSurface::ExecuteSql, SqlStatement::Insert(_)) => {
"execute_sql rejects INSERT; use execute_sql_dispatch(...)"
}
(SqlSurface::ExecuteSql, SqlStatement::Update(_)) => {
"execute_sql rejects UPDATE; use execute_sql_dispatch(...)"
}
(SqlSurface::ExecuteSqlGrouped, SqlStatement::Insert(_)) => {
"execute_sql_grouped rejects INSERT; use execute_sql_dispatch(...)"
}
(SqlSurface::ExecuteSqlGrouped, SqlStatement::Update(_)) => {
"execute_sql_grouped rejects UPDATE; use execute_sql_dispatch(...)"
}
(SqlSurface::Explain, SqlStatement::Insert(_) | SqlStatement::Update(_)) => {
"explain_sql requires EXPLAIN"
}
(
_,
SqlStatement::Select(_)
| SqlStatement::Delete(_)
| SqlStatement::Explain(_)
| SqlStatement::Describe(_)
| SqlStatement::ShowIndexes(_)
| SqlStatement::ShowColumns(_)
| SqlStatement::ShowEntities(_),
) => unreachable!(),
}
}
impl<C: CanisterKind> DbSession<C> {
pub(in crate::db::session::sql) fn build_structural_plan_with_visible_indexes_for_authority(
&self,
query: StructuralQuery,
authority: EntityAuthority,
) -> Result<(VisibleIndexes<'_>, AccessPlannedQuery), QueryError> {
let visible_indexes =
self.visible_indexes_for_store_model(authority.store_path(), authority.model())?;
let plan = query.build_plan_with_visible_indexes(&visible_indexes)?;
Ok((visible_indexes, plan))
}
fn query_from_sql_parsed<E>(
parsed: &SqlParsedStatement,
lane_surface: SqlSurface,
computed_surface: SqlComputedProjectionSurface,
surface: SqlAggregateSurface,
) -> Result<Query<E>, QueryError>
where
E: EntityKind<Canister = C>,
{
if matches!(
&parsed.statement,
SqlStatement::Insert(_) | SqlStatement::Update(_)
) {
return Err(QueryError::unsupported_query(
unsupported_sql_write_surface_message(lane_surface, &parsed.statement),
));
}
if computed_projection::computed_sql_projection_plan(&parsed.statement)?.is_some() {
return Err(QueryError::unsupported_query(
unsupported_sql_computed_projection_message(computed_surface),
));
}
if parsed_requires_dedicated_sql_aggregate_lane(parsed) {
return Err(QueryError::unsupported_query(
unsupported_sql_aggregate_lane_message(surface),
));
}
let lowered = lower_sql_command_from_prepared_statement(
parsed.prepare(E::MODEL.name())?,
E::MODEL.primary_key.name,
)
.map_err(QueryError::from_sql_lowering_error)?;
let lane = session_sql_lane(&lowered);
let Some(query) = lowered.query().cloned() else {
return Err(QueryError::unsupported_query(unsupported_sql_lane_message(
lane_surface,
lane,
)));
};
let query = bind_lowered_sql_query::<E>(query, MissingRowPolicy::Ignore)
.map_err(QueryError::from_sql_lowering_error)?;
Ok(query)
}
fn grouped_query_from_computed_sql_projection_plan<E>(
plan: &computed_projection::SqlComputedProjectionPlan,
) -> Result<Query<E>, QueryError>
where
E: EntityKind<Canister = C>,
{
let lowered = lower_sql_command_from_prepared_statement(
prepare_sql_statement(plan.cloned_base_statement(), E::MODEL.name())
.map_err(QueryError::from_sql_lowering_error)?,
E::MODEL.primary_key.name,
)
.map_err(QueryError::from_sql_lowering_error)?;
let Some(query) = lowered.query().cloned() else {
return Err(QueryError::unsupported_query(unsupported_sql_lane_message(
SqlSurface::ExecuteSqlGrouped,
session_sql_lane(&lowered),
)));
};
let query = bind_lowered_sql_query::<E>(query, MissingRowPolicy::Ignore)
.map_err(QueryError::from_sql_lowering_error)?;
Self::ensure_sql_query_grouping(&query, dispatch::SqlGroupingSurface::Grouped)?;
Ok(query)
}
pub fn parse_sql_statement(&self, sql: &str) -> Result<SqlParsedStatement, QueryError> {
let statement = parse_sql(sql).map_err(QueryError::from_sql_parse_error)?;
let route = sql_statement_route_from_statement(&statement);
Ok(SqlParsedStatement::new(statement, route))
}
pub fn sql_statement_route(&self, sql: &str) -> Result<SqlStatementRoute, QueryError> {
let parsed = self.parse_sql_statement(sql)?;
Ok(parsed.route().clone())
}
pub fn query_from_sql<E>(&self, sql: &str) -> Result<Query<E>, QueryError>
where
E: EntityKind<Canister = C>,
{
let parsed = self.parse_sql_statement(sql)?;
Self::query_from_sql_parsed::<E>(
&parsed,
SqlSurface::QueryFrom,
SqlComputedProjectionSurface::QueryFrom,
SqlAggregateSurface::QueryFrom,
)
}
pub fn execute_sql<E>(&self, sql: &str) -> Result<EntityResponse<E>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let parsed = self.parse_sql_statement(sql)?;
let query = Self::query_from_sql_parsed::<E>(
&parsed,
SqlSurface::ExecuteSql,
SqlComputedProjectionSurface::ExecuteSql,
SqlAggregateSurface::ExecuteSql,
)?;
Self::ensure_sql_query_grouping(&query, dispatch::SqlGroupingSurface::Scalar)?;
self.execute_query(&query)
}
pub fn execute_sql_grouped<E>(
&self,
sql: &str,
cursor_token: Option<&str>,
) -> Result<PagedGroupedExecutionWithTrace, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let parsed = self.parse_sql_statement(sql)?;
if matches!(&parsed.statement, SqlStatement::Delete(_)) {
return Err(QueryError::unsupported_query(
"execute_sql_grouped rejects DELETE; use execute_sql_dispatch(...)",
));
}
if let Some(plan) = computed_projection::computed_sql_projection_plan(&parsed.statement)? {
if !plan.is_grouped() {
return Err(QueryError::unsupported_query(
unsupported_sql_computed_projection_message(
SqlComputedProjectionSurface::ExecuteSqlGrouped,
),
));
}
let query = Self::grouped_query_from_computed_sql_projection_plan::<E>(&plan)?;
let grouped = self.execute_grouped(&query, cursor_token)?;
let (rows, continuation_cursor, execution_trace) = grouped.into_parts();
let rows =
computed_projection::apply_computed_sql_projection_grouped_rows(rows, &plan)?;
return Ok(PagedGroupedExecutionWithTrace::new(
rows,
continuation_cursor,
execution_trace,
));
}
let query = Self::query_from_sql_parsed::<E>(
&parsed,
SqlSurface::ExecuteSqlGrouped,
SqlComputedProjectionSurface::ExecuteSqlGrouped,
SqlAggregateSurface::ExecuteSqlGrouped,
)?;
Self::ensure_sql_query_grouping(&query, dispatch::SqlGroupingSurface::Grouped)?;
self.execute_grouped(&query, cursor_token)
}
#[doc(hidden)]
pub fn execute_sql_grouped_text_cursor<E>(
&self,
sql: &str,
cursor_token: Option<&str>,
) -> Result<GroupedTextCursorPageWithTrace, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let parsed = self.parse_sql_statement(sql)?;
if matches!(&parsed.statement, SqlStatement::Delete(_)) {
return Err(QueryError::unsupported_query(
"execute_sql_grouped rejects DELETE; use execute_sql_dispatch(...)",
));
}
if let Some(plan) = computed_projection::computed_sql_projection_plan(&parsed.statement)? {
if !plan.is_grouped() {
return Err(QueryError::unsupported_query(
unsupported_sql_computed_projection_message(
SqlComputedProjectionSurface::ExecuteSqlGrouped,
),
));
}
let query = Self::grouped_query_from_computed_sql_projection_plan::<E>(&plan)?;
let (rows, continuation_cursor, execution_trace) =
self.execute_grouped_text_cursor(&query, cursor_token)?;
let rows =
computed_projection::apply_computed_sql_projection_grouped_rows(rows, &plan)?;
return Ok((rows, continuation_cursor, execution_trace));
}
let query = Self::query_from_sql_parsed::<E>(
&parsed,
SqlSurface::ExecuteSqlGrouped,
SqlComputedProjectionSurface::ExecuteSqlGrouped,
SqlAggregateSurface::ExecuteSqlGrouped,
)?;
Self::ensure_sql_query_grouping(&query, dispatch::SqlGroupingSurface::Grouped)?;
self.execute_grouped_text_cursor(&query, cursor_token)
}
}