mod computed;
mod lowered;
use crate::{
db::{
DbSession, MissingRowPolicy, PersistedRow, Query, QueryError,
executor::{
EntityAuthority, execute_sql_projection_rows_for_canister,
execute_sql_projection_text_rows_for_canister,
},
identifiers_tail_match,
query::intent::StructuralQuery,
session::sql::{
SqlDispatchResult, SqlParsedStatement, SqlStatementRoute,
aggregate::{
SqlAggregateSurface, parsed_requires_dedicated_sql_aggregate_lane,
unsupported_sql_aggregate_lane_message,
},
computed_projection,
projection::{
SqlProjectionPayload, projection_labels_from_entity_model,
projection_labels_from_projection_spec, sql_projection_rows_from_kernel_rows,
},
},
sql::lowering::{
LoweredSqlCommand, LoweredSqlQuery, bind_lowered_sql_query,
lower_sql_command_from_prepared_statement,
},
},
traits::{CanisterKind, EntityKind, EntityValue},
};
#[doc(hidden)]
pub struct GeneratedSqlDispatchAttempt {
entity_name: &'static str,
explain_order_field: Option<&'static str>,
result: Result<SqlDispatchResult, QueryError>,
}
impl GeneratedSqlDispatchAttempt {
const fn new(
entity_name: &'static str,
explain_order_field: Option<&'static str>,
result: Result<SqlDispatchResult, QueryError>,
) -> Self {
Self {
entity_name,
explain_order_field,
result,
}
}
#[must_use]
pub const fn entity_name(&self) -> &'static str {
self.entity_name
}
#[must_use]
pub const fn explain_order_field(&self) -> Option<&'static str> {
self.explain_order_field
}
pub fn into_result(self) -> Result<SqlDispatchResult, QueryError> {
self.result
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db::session::sql) enum SqlGroupingSurface {
Scalar,
Dispatch,
GeneratedQuerySurface,
Grouped,
}
const fn unsupported_sql_grouping_message(surface: SqlGroupingSurface) -> &'static str {
match surface {
SqlGroupingSurface::Scalar => {
"execute_sql rejects grouped SELECT; use execute_sql_grouped(...)"
}
SqlGroupingSurface::Dispatch => {
"execute_sql_dispatch rejects grouped SELECT execution; use execute_sql_grouped(...)"
}
SqlGroupingSurface::GeneratedQuerySurface => {
"generated SQL query surface rejects grouped SELECT execution; use execute_sql_grouped(...)"
}
SqlGroupingSurface::Grouped => "execute_sql_grouped requires grouped SQL query intent",
}
}
fn trim_generated_query_sql_input(sql: &str) -> Result<&str, QueryError> {
let sql_trimmed = sql.trim();
if sql_trimmed.is_empty() {
return Err(QueryError::unsupported_query(
"query endpoint requires a non-empty SQL string",
));
}
Ok(sql_trimmed)
}
fn generated_sql_entities(authorities: &[EntityAuthority]) -> Vec<String> {
let mut entities = Vec::with_capacity(authorities.len());
for authority in authorities {
entities.push(authority.model().name().to_string());
}
entities
}
fn authority_for_generated_sql_route(
route: &SqlStatementRoute,
authorities: &[EntityAuthority],
) -> Result<EntityAuthority, QueryError> {
let sql_entity = route.entity();
for authority in authorities {
if identifiers_tail_match(sql_entity, authority.model().name()) {
return Ok(*authority);
}
}
Err(unsupported_generated_sql_entity_error(
sql_entity,
authorities,
))
}
fn unsupported_generated_sql_entity_error(
entity_name: &str,
authorities: &[EntityAuthority],
) -> QueryError {
let mut supported = String::new();
for (index, authority) in authorities.iter().enumerate() {
if index != 0 {
supported.push_str(", ");
}
supported.push_str(authority.model().name());
}
QueryError::unsupported_query(format!(
"query endpoint does not support entity '{entity_name}'; supported: {supported}"
))
}
impl<C: CanisterKind> DbSession<C> {
fn execute_structural_sql_projection(
&self,
query: StructuralQuery,
authority: EntityAuthority,
) -> Result<SqlProjectionPayload, QueryError> {
let plan = query.build_plan()?;
let projection = plan.projection_spec(authority.model());
let columns = projection_labels_from_projection_spec(&projection);
let projected = execute_sql_projection_rows_for_canister(
&self.db,
self.debug,
authority.model(),
projection,
authority,
plan,
)
.map_err(QueryError::execute)?;
let (rows, row_count) = projected.into_parts();
Ok(SqlProjectionPayload::new(columns, rows, row_count))
}
fn execute_structural_sql_projection_text(
&self,
query: StructuralQuery,
authority: EntityAuthority,
) -> Result<SqlDispatchResult, QueryError> {
let plan = query.build_plan()?;
let projection = plan.projection_spec(authority.model());
let columns = projection_labels_from_projection_spec(&projection);
let projected = execute_sql_projection_text_rows_for_canister(
&self.db,
self.debug,
authority.model(),
projection,
authority,
plan,
)
.map_err(QueryError::execute)?;
let (rows, row_count) = projected.into_parts();
Ok(SqlDispatchResult::ProjectionText {
columns,
rows,
row_count,
})
}
fn execute_typed_sql_delete<E>(&self, query: &Query<E>) -> Result<SqlDispatchResult, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let plan = query.plan()?.into_executable();
let deleted = self
.with_metrics(|| self.delete_executor::<E>().execute_sql_projection(plan))
.map_err(QueryError::execute)?;
let (rows, row_count) = deleted.into_parts();
let rows = sql_projection_rows_from_kernel_rows(rows);
Ok(SqlProjectionPayload::new(
projection_labels_from_entity_model(E::MODEL),
rows,
row_count,
)
.into_dispatch_result())
}
pub(in crate::db::session::sql) fn ensure_sql_query_grouping<E>(
query: &Query<E>,
surface: SqlGroupingSurface,
) -> Result<(), QueryError>
where
E: EntityKind,
{
match (surface, query.has_grouping()) {
(
SqlGroupingSurface::Scalar
| SqlGroupingSurface::Dispatch
| SqlGroupingSurface::GeneratedQuerySurface,
false,
)
| (SqlGroupingSurface::Grouped, true) => Ok(()),
(
SqlGroupingSurface::Scalar
| SqlGroupingSurface::Dispatch
| SqlGroupingSurface::GeneratedQuerySurface,
true,
)
| (SqlGroupingSurface::Grouped, false) => Err(QueryError::unsupported_query(
unsupported_sql_grouping_message(surface),
)),
}
}
pub(in crate::db::session::sql) fn ensure_lowered_sql_query_grouping(
lowered: &LoweredSqlCommand,
surface: SqlGroupingSurface,
) -> Result<(), QueryError> {
let Some(query) = lowered.query() else {
return Ok(());
};
match (surface, query.has_grouping()) {
(
SqlGroupingSurface::Scalar
| SqlGroupingSurface::Dispatch
| SqlGroupingSurface::GeneratedQuerySurface,
false,
)
| (SqlGroupingSurface::Grouped, true) => Ok(()),
(
SqlGroupingSurface::Scalar
| SqlGroupingSurface::Dispatch
| SqlGroupingSurface::GeneratedQuerySurface,
true,
)
| (SqlGroupingSurface::Grouped, false) => Err(QueryError::unsupported_query(
unsupported_sql_grouping_message(surface),
)),
}
}
pub fn execute_sql_dispatch<E>(&self, sql: &str) -> Result<SqlDispatchResult, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let parsed = self.parse_sql_statement(sql)?;
self.execute_sql_dispatch_parsed::<E>(&parsed)
}
pub fn execute_sql_dispatch_parsed<E>(
&self,
parsed: &SqlParsedStatement,
) -> Result<SqlDispatchResult, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
match parsed.route() {
SqlStatementRoute::Query { .. } => {
if parsed_requires_dedicated_sql_aggregate_lane(parsed) {
return Err(QueryError::unsupported_query(
unsupported_sql_aggregate_lane_message(
SqlAggregateSurface::ExecuteSqlDispatch,
),
));
}
if let Some(plan) =
computed_projection::computed_sql_projection_plan(&parsed.statement)?
{
return self.execute_computed_sql_projection_dispatch::<E>(plan);
}
let lowered = parsed
.lower_query_lane_for_entity(E::MODEL.name(), E::MODEL.primary_key.name)?;
Self::ensure_lowered_sql_query_grouping(&lowered, SqlGroupingSurface::Dispatch)?;
match lowered.query() {
Some(LoweredSqlQuery::Select(select)) => self
.execute_lowered_sql_dispatch_select_core(
select,
EntityAuthority::for_type::<E>(),
),
Some(LoweredSqlQuery::Delete(delete)) => {
let typed_query = bind_lowered_sql_query::<E>(
LoweredSqlQuery::Delete(delete.clone()),
MissingRowPolicy::Ignore,
)
.map_err(QueryError::from_sql_lowering_error)?;
self.execute_typed_sql_delete(&typed_query)
}
None => Err(QueryError::unsupported_query(
"execute_sql_dispatch accepts SELECT or DELETE only",
)),
}
}
SqlStatementRoute::Explain { .. } => {
if let Some((mode, plan)) =
computed_projection::computed_sql_projection_explain_plan(&parsed.statement)?
{
return Self::explain_computed_sql_projection_dispatch::<E>(mode, plan)
.map(SqlDispatchResult::Explain);
}
let lowered = lower_sql_command_from_prepared_statement(
parsed.prepare(E::MODEL.name())?,
E::MODEL.primary_key.name,
)
.map_err(QueryError::from_sql_lowering_error)?;
if let Some(explain) = self.explain_lowered_sql_execution_for_authority(
&lowered,
EntityAuthority::for_type::<E>(),
)? {
return Ok(SqlDispatchResult::Explain(explain));
}
lowered
.explain_for_model(E::MODEL)
.map(SqlDispatchResult::Explain)
}
SqlStatementRoute::Describe { .. } => {
Ok(SqlDispatchResult::Describe(self.describe_entity::<E>()))
}
SqlStatementRoute::ShowIndexes { .. } => {
Ok(SqlDispatchResult::ShowIndexes(self.show_indexes::<E>()))
}
SqlStatementRoute::ShowColumns { .. } => {
Ok(SqlDispatchResult::ShowColumns(self.show_columns::<E>()))
}
SqlStatementRoute::ShowEntities => {
Ok(SqlDispatchResult::ShowEntities(self.show_entities()))
}
}
}
#[doc(hidden)]
pub fn execute_generated_query_surface_dispatch_for_authority(
&self,
parsed: &SqlParsedStatement,
authority: EntityAuthority,
) -> Result<SqlDispatchResult, QueryError> {
match parsed.route() {
SqlStatementRoute::Query { .. } => {
if parsed_requires_dedicated_sql_aggregate_lane(parsed) {
return Err(QueryError::unsupported_query(
unsupported_sql_aggregate_lane_message(
SqlAggregateSurface::GeneratedQuerySurface,
),
));
}
if let Some(plan) =
computed_projection::computed_sql_projection_plan(&parsed.statement)?
{
return self
.execute_computed_sql_projection_dispatch_for_authority(plan, authority);
}
let lowered = parsed.lower_query_lane_for_entity(
authority.model().name(),
authority.model().primary_key.name,
)?;
Self::ensure_lowered_sql_query_grouping(
&lowered,
SqlGroupingSurface::GeneratedQuerySurface,
)?;
self.execute_lowered_sql_dispatch_query_for_authority(&lowered, authority)
}
SqlStatementRoute::Explain { .. } => {
if let Some((mode, plan)) =
computed_projection::computed_sql_projection_explain_plan(&parsed.statement)?
{
return Self::explain_computed_sql_projection_dispatch_for_authority(
mode, plan, authority,
)
.map(SqlDispatchResult::Explain);
}
let lowered = parsed.lower_query_lane_for_entity(
authority.model().name(),
authority.model().primary_key.name,
)?;
if let Some(explain) =
self.explain_lowered_sql_execution_for_authority(&lowered, authority)?
{
return Ok(SqlDispatchResult::Explain(explain));
}
lowered
.explain_for_model(authority.model())
.map(SqlDispatchResult::Explain)
}
SqlStatementRoute::Describe { .. }
| SqlStatementRoute::ShowIndexes { .. }
| SqlStatementRoute::ShowColumns { .. }
| SqlStatementRoute::ShowEntities => Err(QueryError::unsupported_query(
"generated SQL query surface requires query or EXPLAIN statement lanes",
)),
}
}
#[doc(hidden)]
#[must_use]
pub fn execute_generated_query_surface_sql(
&self,
sql: &str,
authorities: &[EntityAuthority],
) -> GeneratedSqlDispatchAttempt {
let sql_trimmed = match trim_generated_query_sql_input(sql) {
Ok(sql_trimmed) => sql_trimmed,
Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
};
let parsed = match self.parse_sql_statement(sql_trimmed) {
Ok(parsed) => parsed,
Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
};
if matches!(parsed.route(), SqlStatementRoute::ShowEntities) {
return GeneratedSqlDispatchAttempt::new(
"",
None,
Ok(SqlDispatchResult::ShowEntities(generated_sql_entities(
authorities,
))),
);
}
let authority = match authority_for_generated_sql_route(parsed.route(), authorities) {
Ok(authority) => authority,
Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
};
let entity_name = authority.model().name();
let explain_order_field = parsed
.route()
.is_explain()
.then_some(authority.model().primary_key.name);
let result = match parsed.route() {
SqlStatementRoute::Query { .. } | SqlStatementRoute::Explain { .. } => {
self.execute_generated_query_surface_dispatch_for_authority(&parsed, authority)
}
SqlStatementRoute::Describe { .. } => Ok(SqlDispatchResult::Describe(
self.describe_entity_model(authority.model()),
)),
SqlStatementRoute::ShowIndexes { .. } => Ok(SqlDispatchResult::ShowIndexes(
self.show_indexes_for_store_model(authority.store_path(), authority.model()),
)),
SqlStatementRoute::ShowColumns { .. } => Ok(SqlDispatchResult::ShowColumns(
self.show_columns_for_model(authority.model()),
)),
SqlStatementRoute::ShowEntities => unreachable!(
"SHOW ENTITIES is handled before authority resolution for generated query dispatch"
),
};
GeneratedSqlDispatchAttempt::new(entity_name, explain_order_field, result)
}
}