mod computed;
mod lowered;
use crate::{
db::{
DbSession, MissingRowPolicy, PersistedRow, Query, QueryError,
data::UpdatePatch,
executor::{EntityAuthority, MutationMode},
identifiers_tail_match,
query::{intent::StructuralQuery, plan::AccessPlannedQuery},
session::sql::{
SqlDispatchResult, SqlParsedStatement, SqlStatementRoute,
aggregate::parsed_requires_dedicated_sql_aggregate_lane,
computed_projection,
projection::{
SqlProjectionPayload, execute_sql_projection_rows_for_canister,
execute_sql_projection_text_rows_for_canister, projection_labels_from_fields,
projection_labels_from_projection_spec, sql_projection_rows_from_kernel_rows,
},
},
sql::lowering::{
LoweredBaseQueryShape, LoweredSelectShape, LoweredSqlQuery, SqlLoweringError,
bind_lowered_sql_query, canonicalize_sql_predicate_for_model,
},
sql::parser::{
SqlAggregateCall, SqlAggregateKind, SqlInsertStatement, SqlOrderDirection,
SqlProjection, SqlSelectItem, SqlStatement, SqlTextFunction, SqlUpdateStatement,
},
},
model::{entity::resolve_field_slot, field::FieldKind},
traits::{CanisterKind, EntityKind, EntityValue},
types::{Timestamp, Ulid},
value::Value,
};
#[cfg(feature = "perf-attribution")]
pub use lowered::LoweredSqlDispatchExecutorAttribution;
#[doc(hidden)]
pub struct GeneratedSqlDispatchAttempt {
entity_name: &'static str,
explain_order_field: Option<&'static str>,
result: Result<SqlDispatchResult, QueryError>,
}
impl GeneratedSqlDispatchAttempt {
const fn new(
entity_name: &'static str,
explain_order_field: Option<&'static str>,
result: Result<SqlDispatchResult, QueryError>,
) -> Self {
Self {
entity_name,
explain_order_field,
result,
}
}
#[must_use]
pub const fn entity_name(&self) -> &'static str {
self.entity_name
}
#[must_use]
pub const fn explain_order_field(&self) -> Option<&'static str> {
self.explain_order_field
}
pub fn into_result(self) -> Result<SqlDispatchResult, QueryError> {
self.result
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db::session::sql) enum SqlGroupingSurface {
Scalar,
Grouped,
}
const fn unsupported_sql_grouping_message(surface: SqlGroupingSurface) -> &'static str {
match surface {
SqlGroupingSurface::Scalar => {
"execute_sql rejects grouped SELECT; use execute_sql_grouped(...)"
}
SqlGroupingSurface::Grouped => "execute_sql_grouped requires grouped SQL query intent",
}
}
fn trim_generated_query_sql_input(sql: &str) -> Result<&str, QueryError> {
let sql_trimmed = sql.trim();
if sql_trimmed.is_empty() {
return Err(QueryError::unsupported_query(
"query endpoint requires a non-empty SQL string",
));
}
Ok(sql_trimmed)
}
fn generated_sql_entities(authorities: &[EntityAuthority]) -> Vec<String> {
let mut entities = Vec::with_capacity(authorities.len());
for authority in authorities {
entities.push(authority.model().name().to_string());
}
entities
}
fn sql_projection_labels_from_select_statement(
statement: &SqlStatement,
) -> Result<Option<Vec<String>>, QueryError> {
let SqlStatement::Select(select) = statement else {
return Err(QueryError::invariant(
"SQL projection labels require SELECT statement shape",
));
};
let SqlProjection::Items(items) = &select.projection else {
return Ok(None);
};
Ok(Some(
items
.iter()
.enumerate()
.map(|(index, item)| {
select
.projection_alias(index)
.map_or_else(|| grouped_sql_projection_item_label(item), str::to_string)
})
.collect(),
))
}
fn grouped_sql_projection_item_label(item: &SqlSelectItem) -> String {
match item {
SqlSelectItem::Field(field) => field.clone(),
SqlSelectItem::Aggregate(aggregate) => grouped_sql_aggregate_call_label(aggregate),
SqlSelectItem::TextFunction(call) => {
format!(
"{}({})",
grouped_sql_text_function_name(call.function),
call.field
)
}
}
}
fn sql_aggregate_dispatch_label_override(statement: &SqlStatement) -> Option<String> {
let SqlStatement::Select(select) = statement else {
return None;
};
select.projection_alias(0).map(str::to_string)
}
fn grouped_sql_aggregate_call_label(aggregate: &SqlAggregateCall) -> String {
let kind = match aggregate.kind {
SqlAggregateKind::Count => "COUNT",
SqlAggregateKind::Sum => "SUM",
SqlAggregateKind::Avg => "AVG",
SqlAggregateKind::Min => "MIN",
SqlAggregateKind::Max => "MAX",
};
match aggregate.field.as_deref() {
Some(field) => format!("{kind}({field})"),
None => format!("{kind}(*)"),
}
}
const fn grouped_sql_text_function_name(function: SqlTextFunction) -> &'static str {
match function {
SqlTextFunction::Trim => "TRIM",
SqlTextFunction::Ltrim => "LTRIM",
SqlTextFunction::Rtrim => "RTRIM",
SqlTextFunction::Lower => "LOWER",
SqlTextFunction::Upper => "UPPER",
SqlTextFunction::Length => "LENGTH",
SqlTextFunction::Left => "LEFT",
SqlTextFunction::Right => "RIGHT",
SqlTextFunction::StartsWith => "STARTS_WITH",
SqlTextFunction::EndsWith => "ENDS_WITH",
SqlTextFunction::Contains => "CONTAINS",
SqlTextFunction::Position => "POSITION",
SqlTextFunction::Replace => "REPLACE",
SqlTextFunction::Substring => "SUBSTRING",
}
}
fn authority_for_generated_sql_route(
route: &SqlStatementRoute,
authorities: &[EntityAuthority],
) -> Result<EntityAuthority, QueryError> {
let sql_entity = route.entity();
for authority in authorities {
if identifiers_tail_match(sql_entity, authority.model().name()) {
return Ok(*authority);
}
}
Err(unsupported_generated_sql_entity_error(
sql_entity,
authorities,
))
}
fn unsupported_generated_sql_entity_error(
entity_name: &str,
authorities: &[EntityAuthority],
) -> QueryError {
let mut supported = String::new();
for (index, authority) in authorities.iter().enumerate() {
if index != 0 {
supported.push_str(", ");
}
supported.push_str(authority.model().name());
}
QueryError::unsupported_query(format!(
"query endpoint does not support entity '{entity_name}'; supported: {supported}"
))
}
fn ensure_sql_write_entity_matches<E>(sql_entity: &str) -> Result<(), QueryError>
where
E: EntityKind,
{
if identifiers_tail_match(sql_entity, E::MODEL.name()) {
return Ok(());
}
Err(QueryError::from_sql_lowering_error(
SqlLoweringError::EntityMismatch {
sql_entity: sql_entity.to_string(),
expected_entity: E::MODEL.name(),
},
))
}
fn sql_write_key_from_literal<E>(value: &Value, pk_name: &str) -> Result<E::Key, QueryError>
where
E: EntityKind,
{
if let Some(key) = <E::Key as crate::traits::FieldValue>::from_value(value) {
return Ok(key);
}
let widened = match value {
Value::Int(v) if *v >= 0 => Value::Uint(v.cast_unsigned()),
Value::Uint(v) if i64::try_from(*v).is_ok() => Value::Int(v.cast_signed()),
_ => {
return Err(QueryError::unsupported_query(format!(
"SQL write primary key literal for '{pk_name}' is not compatible with entity key type"
)));
}
};
<E::Key as crate::traits::FieldValue>::from_value(&widened).ok_or_else(|| {
QueryError::unsupported_query(format!(
"SQL write primary key literal for '{pk_name}' is not compatible with entity key type"
))
})
}
fn sql_write_generated_primary_key_value<E>() -> Option<Value>
where
E: EntityKind,
{
matches!(E::MODEL.primary_key.kind(), FieldKind::Ulid).then(|| Value::Ulid(Ulid::generate()))
}
fn sql_write_value_for_field<E>(field_name: &str, value: &Value) -> Result<Value, QueryError>
where
E: EntityKind,
{
let field_slot = resolve_field_slot(E::MODEL, field_name).ok_or_else(|| {
QueryError::invariant("SQL write field must resolve against the target entity model")
})?;
let field_kind = E::MODEL.fields()[field_slot].kind();
let normalized = match (field_kind, value) {
(FieldKind::Uint, Value::Int(v)) if *v >= 0 => Value::Uint(v.cast_unsigned()),
(FieldKind::Int, Value::Uint(v)) if i64::try_from(*v).is_ok() => {
Value::Int(v.cast_signed())
}
_ => value.clone(),
};
Ok(normalized)
}
fn sql_write_system_timestamp_fields<E>() -> Option<(&'static str, &'static str)>
where
E: EntityKind,
{
if resolve_field_slot(E::MODEL, "created_at").is_some()
&& resolve_field_slot(E::MODEL, "updated_at").is_some()
{
return Some(("created_at", "updated_at"));
}
None
}
fn sql_insert_columns<E>(statement: &SqlInsertStatement) -> Vec<String>
where
E: EntityKind,
{
if !statement.columns.is_empty() {
return statement.columns.clone();
}
let timestamp_fields = sql_write_system_timestamp_fields::<E>();
let columns: Vec<String> = E::MODEL
.fields()
.iter()
.filter(|field| {
!matches!(
timestamp_fields,
Some((created_at, updated_at))
if field.name() == created_at || field.name() == updated_at
)
})
.map(|field| field.name().to_string())
.collect();
let pk_name = E::MODEL.primary_key.name;
if sql_write_generated_primary_key_value::<E>().is_none() {
return columns;
}
let generated_key_omitted_columns: Vec<String> = columns
.iter()
.filter(|field| field.as_str() != pk_name)
.cloned()
.collect();
let first_width = statement.values.first().map(Vec::len);
if first_width == Some(generated_key_omitted_columns.len()) {
return generated_key_omitted_columns;
}
columns
}
fn validate_sql_insert_tuple_lengths(
columns: &[String],
values: &[Vec<Value>],
) -> Result<(), QueryError> {
for tuple in values {
if tuple.len() != columns.len() {
return Err(QueryError::from_sql_parse_error(
crate::db::sql::parser::SqlParseError::invalid_syntax(
"INSERT column list and VALUES tuple length must match",
),
));
}
}
Ok(())
}
impl<C: CanisterKind> DbSession<C> {
fn sql_write_dispatch_row<E>(entity: E) -> Result<Vec<Value>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let mut row = Vec::with_capacity(E::MODEL.fields().len());
for index in 0..E::MODEL.fields().len() {
let value = entity.get_value_by_index(index).ok_or_else(|| {
QueryError::invariant(
"SQL write dispatch projection row must include every declared field",
)
})?;
row.push(value);
}
Ok(row)
}
fn sql_write_dispatch_projection<E>(entities: Vec<E>) -> Result<SqlDispatchResult, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let columns = projection_labels_from_fields(E::MODEL.fields());
let rows = entities
.into_iter()
.map(Self::sql_write_dispatch_row)
.collect::<Result<Vec<_>, _>>()?;
let row_count = u32::try_from(rows.len()).unwrap_or(u32::MAX);
Ok(SqlDispatchResult::Projection {
columns,
rows,
row_count,
})
}
fn sql_insert_patch_and_key<E>(
columns: &[String],
values: &[Value],
) -> Result<(E::Key, UpdatePatch), QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let pk_name = E::MODEL.primary_key.name;
let generated_pk = sql_write_generated_primary_key_value::<E>();
let (key, generated_pk_value) =
if let Some(pk_index) = columns.iter().position(|field| field == pk_name) {
let pk_value = values.get(pk_index).ok_or_else(|| {
QueryError::invariant(
"INSERT primary key column must align with one VALUES literal",
)
})?;
(sql_write_key_from_literal::<E>(pk_value, pk_name)?, None)
} else if let Some(pk_value) = generated_pk {
(
sql_write_key_from_literal::<E>(&pk_value, pk_name)?,
Some(pk_value),
)
} else {
return Err(QueryError::unsupported_query(format!(
"SQL INSERT requires primary key column '{pk_name}' in this release"
)));
};
let mut patch = UpdatePatch::new();
if let Some(pk_value) = generated_pk_value {
patch = patch
.set_field(E::MODEL, pk_name, pk_value)
.map_err(QueryError::execute)?;
}
for (field, value) in columns.iter().zip(values.iter()) {
let normalized = sql_write_value_for_field::<E>(field, value)?;
patch = patch
.set_field(E::MODEL, field, normalized)
.map_err(QueryError::execute)?;
}
if let Some((created_at, updated_at)) = sql_write_system_timestamp_fields::<E>() {
let now = Value::Timestamp(Timestamp::now());
patch = patch
.set_field(E::MODEL, created_at, now.clone())
.map_err(QueryError::execute)?;
patch = patch
.set_field(E::MODEL, updated_at, now)
.map_err(QueryError::execute)?;
}
Ok((key, patch))
}
fn sql_update_patch<E>(statement: &SqlUpdateStatement) -> Result<UpdatePatch, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let pk_name = E::MODEL.primary_key.name;
let mut patch = UpdatePatch::new();
for assignment in &statement.assignments {
if assignment.field == pk_name {
return Err(QueryError::unsupported_query(format!(
"SQL UPDATE does not allow primary key mutation for '{pk_name}' in this release"
)));
}
let normalized =
sql_write_value_for_field::<E>(assignment.field.as_str(), &assignment.value)?;
patch = patch
.set_field(E::MODEL, assignment.field.as_str(), normalized)
.map_err(QueryError::execute)?;
}
if let Some((_, updated_at)) = sql_write_system_timestamp_fields::<E>() {
patch = patch
.set_field(E::MODEL, updated_at, Value::Timestamp(Timestamp::now()))
.map_err(QueryError::execute)?;
}
Ok(patch)
}
fn sql_update_selector_query<E>(statement: &SqlUpdateStatement) -> Result<Query<E>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let Some(predicate) = statement.predicate.clone() else {
return Err(QueryError::unsupported_query(
"SQL UPDATE requires WHERE predicate in this release",
));
};
let predicate = canonicalize_sql_predicate_for_model(E::MODEL, predicate);
let pk_name = E::MODEL.primary_key.name;
let mut selector = Query::<E>::new(MissingRowPolicy::Ignore).filter(predicate);
if statement.order_by.is_empty() {
selector = selector.order_by(pk_name);
} else {
let mut orders_primary_key = false;
for term in &statement.order_by {
if term.field == pk_name {
orders_primary_key = true;
}
selector = match term.direction {
SqlOrderDirection::Asc => selector.order_by(term.field.as_str()),
SqlOrderDirection::Desc => selector.order_by_desc(term.field.as_str()),
};
}
if !orders_primary_key {
selector = selector.order_by(pk_name);
}
}
if let Some(limit) = statement.limit {
selector = selector.limit(limit);
}
if let Some(offset) = statement.offset {
selector = selector.offset(offset);
}
Ok(selector)
}
fn execute_sql_insert_dispatch<E>(
&self,
statement: &SqlInsertStatement,
) -> Result<SqlDispatchResult, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
ensure_sql_write_entity_matches::<E>(statement.entity.as_str())?;
let columns = sql_insert_columns::<E>(statement);
validate_sql_insert_tuple_lengths(columns.as_slice(), statement.values.as_slice())?;
let mut entities = Vec::with_capacity(statement.values.len());
for values in &statement.values {
let (key, patch) = Self::sql_insert_patch_and_key::<E>(columns.as_slice(), values)?;
let entity = self
.mutate_structural::<E>(key, patch, MutationMode::Insert)
.map_err(QueryError::execute)?;
entities.push(entity);
}
Self::sql_write_dispatch_projection(entities)
}
fn execute_sql_update_dispatch<E>(
&self,
statement: &SqlUpdateStatement,
) -> Result<SqlDispatchResult, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
ensure_sql_write_entity_matches::<E>(statement.entity.as_str())?;
let selector = Self::sql_update_selector_query::<E>(statement)?;
let patch = Self::sql_update_patch::<E>(statement)?;
let matched = self.execute_query(&selector)?;
let mut entities = Vec::with_capacity(matched.len());
for entity in matched.entities() {
let updated = self
.mutate_structural::<E>(entity.id().key(), patch.clone(), MutationMode::Update)
.map_err(QueryError::execute)?;
entities.push(updated);
}
Self::sql_write_dispatch_projection(entities)
}
fn prepare_structural_sql_projection_execution(
&self,
query: StructuralQuery,
authority: EntityAuthority,
) -> Result<(Vec<String>, AccessPlannedQuery), QueryError> {
let (_, plan) =
self.build_structural_plan_with_visible_indexes_for_authority(query, authority)?;
let projection = plan.projection_spec(authority.model());
let columns = projection_labels_from_projection_spec(&projection);
Ok((columns, plan))
}
pub(in crate::db::session::sql) fn execute_structural_sql_projection(
&self,
query: StructuralQuery,
authority: EntityAuthority,
) -> Result<SqlProjectionPayload, QueryError> {
let (columns, plan) = self.prepare_structural_sql_projection_execution(query, authority)?;
let projected =
execute_sql_projection_rows_for_canister(&self.db, self.debug, authority, plan)
.map_err(QueryError::execute)?;
let (rows, row_count) = projected.into_parts();
Ok(SqlProjectionPayload::new(columns, rows, row_count))
}
fn execute_structural_sql_projection_text(
&self,
query: StructuralQuery,
authority: EntityAuthority,
) -> Result<SqlDispatchResult, QueryError> {
let (columns, plan) = self.prepare_structural_sql_projection_execution(query, authority)?;
let projected =
execute_sql_projection_text_rows_for_canister(&self.db, self.debug, authority, plan)
.map_err(QueryError::execute)?;
let (rows, row_count) = projected.into_parts();
Ok(SqlDispatchResult::ProjectionText {
columns,
rows,
row_count,
})
}
fn execute_typed_sql_delete<E>(&self, query: &Query<E>) -> Result<SqlDispatchResult, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let plan = self
.compile_query_with_visible_indexes(query)?
.into_prepared_execution_plan();
let deleted = self
.with_metrics(|| {
self.delete_executor::<E>()
.execute_structural_projection(plan)
})
.map_err(QueryError::execute)?;
let (rows, row_count) = deleted.into_parts();
let rows = sql_projection_rows_from_kernel_rows(rows).map_err(QueryError::execute)?;
Ok(SqlProjectionPayload::new(
projection_labels_from_fields(E::MODEL.fields()),
rows,
row_count,
)
.into_dispatch_result())
}
fn lowered_sql_query_dispatch_inputs_for_authority(
parsed: &SqlParsedStatement,
authority: EntityAuthority,
unsupported_message: &'static str,
) -> Result<(LoweredSqlQuery, Option<Vec<String>>), QueryError> {
let lowered = parsed.lower_query_lane_for_entity(
authority.model().name(),
authority.model().primary_key.name,
)?;
let projection_columns = matches!(lowered.query(), Some(LoweredSqlQuery::Select(_)))
.then(|| sql_projection_labels_from_select_statement(&parsed.statement))
.transpose()?;
let query = lowered
.into_query()
.ok_or_else(|| QueryError::unsupported_query(unsupported_message))?;
Ok((query, projection_columns.flatten()))
}
fn dispatch_sql_query_route_for_authority(
&self,
parsed: &SqlParsedStatement,
authority: EntityAuthority,
unsupported_message: &'static str,
dispatch_select: impl FnOnce(
&Self,
LoweredSelectShape,
EntityAuthority,
bool,
Option<Vec<String>>,
) -> Result<SqlDispatchResult, QueryError>,
dispatch_delete: impl FnOnce(
&Self,
LoweredBaseQueryShape,
EntityAuthority,
) -> Result<SqlDispatchResult, QueryError>,
) -> Result<SqlDispatchResult, QueryError> {
if parsed_requires_dedicated_sql_aggregate_lane(parsed) {
let command =
Self::compile_sql_aggregate_command_core_for_authority(parsed, authority)?;
return self.execute_sql_aggregate_dispatch_for_authority(
command,
authority,
sql_aggregate_dispatch_label_override(&parsed.statement),
);
}
if let Some(plan) = computed_projection::computed_sql_projection_plan(&parsed.statement)? {
return self.execute_computed_sql_projection_dispatch_for_authority(plan, authority);
}
let (query, projection_columns) = Self::lowered_sql_query_dispatch_inputs_for_authority(
parsed,
authority,
unsupported_message,
)?;
let grouped_surface = query.has_grouping();
match query {
LoweredSqlQuery::Select(select) => {
dispatch_select(self, select, authority, grouped_surface, projection_columns)
}
LoweredSqlQuery::Delete(delete) => dispatch_delete(self, delete, authority),
}
}
fn dispatch_sql_explain_route_for_authority(
&self,
parsed: &SqlParsedStatement,
authority: EntityAuthority,
) -> Result<SqlDispatchResult, QueryError> {
if let Some((mode, plan)) =
computed_projection::computed_sql_projection_explain_plan(&parsed.statement)?
{
return self
.explain_computed_sql_projection_dispatch_for_authority(mode, plan, authority)
.map(SqlDispatchResult::Explain);
}
let lowered = parsed.lower_query_lane_for_entity(
authority.model().name(),
authority.model().primary_key.name,
)?;
if let Some(explain) =
self.explain_lowered_sql_execution_for_authority(&lowered, authority)?
{
return Ok(SqlDispatchResult::Explain(explain));
}
self.explain_lowered_sql_for_authority(&lowered, authority)
.map(SqlDispatchResult::Explain)
}
pub(in crate::db::session::sql) fn ensure_sql_query_grouping<E>(
query: &Query<E>,
surface: SqlGroupingSurface,
) -> Result<(), QueryError>
where
E: EntityKind,
{
match (surface, query.has_grouping()) {
(SqlGroupingSurface::Scalar, false) | (SqlGroupingSurface::Grouped, true) => Ok(()),
(SqlGroupingSurface::Scalar, true) | (SqlGroupingSurface::Grouped, false) => Err(
QueryError::unsupported_query(unsupported_sql_grouping_message(surface)),
),
}
}
pub fn execute_sql_dispatch<E>(&self, sql: &str) -> Result<SqlDispatchResult, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let parsed = self.parse_sql_statement(sql)?;
self.execute_sql_dispatch_parsed::<E>(&parsed)
}
pub fn execute_sql_dispatch_parsed<E>(
&self,
parsed: &SqlParsedStatement,
) -> Result<SqlDispatchResult, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
match parsed.route() {
SqlStatementRoute::Query { .. } => self.dispatch_sql_query_route_for_authority(
parsed,
EntityAuthority::for_type::<E>(),
"execute_sql_dispatch accepts SELECT or DELETE only",
|session, select, authority, grouped_surface, projection_columns| {
if grouped_surface {
let columns = projection_columns.ok_or_else(|| {
QueryError::unsupported_query(
"grouped SQL dispatch requires explicit grouped projection items",
)
})?;
return session.execute_lowered_sql_grouped_dispatch_select_core(
select, authority, columns,
);
}
let payload = session.execute_lowered_sql_projection_core(select, authority)?;
if let Some(columns) = projection_columns {
let (_, rows, row_count) = payload.into_parts();
return Ok(SqlProjectionPayload::new(columns, rows, row_count)
.into_dispatch_result());
}
Ok(payload.into_dispatch_result())
},
|session, delete, _authority| {
let typed_query = bind_lowered_sql_query::<E>(
LoweredSqlQuery::Delete(delete),
MissingRowPolicy::Ignore,
)
.map_err(QueryError::from_sql_lowering_error)?;
session.execute_typed_sql_delete(&typed_query)
},
),
SqlStatementRoute::Insert { .. } => {
let SqlStatement::Insert(statement) = &parsed.statement else {
return Err(QueryError::invariant(
"INSERT SQL route must carry parsed INSERT statement",
));
};
self.execute_sql_insert_dispatch::<E>(statement)
}
SqlStatementRoute::Update { .. } => {
let SqlStatement::Update(statement) = &parsed.statement else {
return Err(QueryError::invariant(
"UPDATE SQL route must carry parsed UPDATE statement",
));
};
self.execute_sql_update_dispatch::<E>(statement)
}
SqlStatementRoute::Explain { .. } => self
.dispatch_sql_explain_route_for_authority(parsed, EntityAuthority::for_type::<E>()),
SqlStatementRoute::Describe { .. } => {
Ok(SqlDispatchResult::Describe(self.describe_entity::<E>()))
}
SqlStatementRoute::ShowIndexes { .. } => {
Ok(SqlDispatchResult::ShowIndexes(self.show_indexes::<E>()))
}
SqlStatementRoute::ShowColumns { .. } => {
Ok(SqlDispatchResult::ShowColumns(self.show_columns::<E>()))
}
SqlStatementRoute::ShowEntities => {
Ok(SqlDispatchResult::ShowEntities(self.show_entities()))
}
}
}
#[doc(hidden)]
pub fn execute_generated_query_surface_dispatch_for_authority(
&self,
parsed: &SqlParsedStatement,
authority: EntityAuthority,
) -> Result<SqlDispatchResult, QueryError> {
match parsed.route() {
SqlStatementRoute::Query { .. } => self.dispatch_sql_query_route_for_authority(
parsed,
authority,
"generated SQL query surface requires query or EXPLAIN statement lanes",
|session, select, authority, grouped_surface, projection_columns| {
if grouped_surface {
let columns = projection_columns.ok_or_else(|| {
QueryError::unsupported_query(
"grouped SQL dispatch requires explicit grouped projection items",
)
})?;
return session
.execute_lowered_sql_grouped_dispatch_select_core(select, authority, columns);
}
let result =
session.execute_lowered_sql_dispatch_select_text_core(select, authority)?;
if let Some(columns) = projection_columns {
let SqlDispatchResult::ProjectionText {
rows, row_count, ..
} = result
else {
return Err(QueryError::invariant(
"generated scalar SQL dispatch text path must emit projection text rows",
));
};
return Ok(SqlDispatchResult::ProjectionText {
columns,
rows,
row_count,
});
}
Ok(result)
},
|session, delete, authority| {
session.execute_lowered_sql_dispatch_delete_core(&delete, authority)
},
),
SqlStatementRoute::Explain { .. } => {
self.dispatch_sql_explain_route_for_authority(parsed, authority)
}
SqlStatementRoute::Insert { .. } | SqlStatementRoute::Update { .. }
| SqlStatementRoute::Describe { .. }
| SqlStatementRoute::ShowIndexes { .. }
| SqlStatementRoute::ShowColumns { .. }
| SqlStatementRoute::ShowEntities => Err(QueryError::unsupported_query(
"generated SQL query surface requires SELECT, DELETE, or EXPLAIN statement lanes",
)),
}
}
#[doc(hidden)]
#[must_use]
pub fn execute_generated_query_surface_sql(
&self,
sql: &str,
authorities: &[EntityAuthority],
) -> GeneratedSqlDispatchAttempt {
let sql_trimmed = match trim_generated_query_sql_input(sql) {
Ok(sql_trimmed) => sql_trimmed,
Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
};
let parsed = match self.parse_sql_statement(sql_trimmed) {
Ok(parsed) => parsed,
Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
};
if matches!(parsed.route(), SqlStatementRoute::ShowEntities) {
return GeneratedSqlDispatchAttempt::new(
"",
None,
Ok(SqlDispatchResult::ShowEntities(generated_sql_entities(
authorities,
))),
);
}
let authority = match authority_for_generated_sql_route(parsed.route(), authorities) {
Ok(authority) => authority,
Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
};
let entity_name = authority.model().name();
let explain_order_field = parsed
.route()
.is_explain()
.then_some(authority.model().primary_key.name);
let result = match parsed.route() {
SqlStatementRoute::Query { .. } | SqlStatementRoute::Explain { .. } => {
self.execute_generated_query_surface_dispatch_for_authority(&parsed, authority)
}
SqlStatementRoute::Insert { .. } | SqlStatementRoute::Update { .. } => {
Err(QueryError::unsupported_query(
"generated SQL query surface requires SELECT, DELETE, or EXPLAIN statement lanes",
))
}
SqlStatementRoute::Describe { .. } => Ok(SqlDispatchResult::Describe(
self.describe_entity_model(authority.model()),
)),
SqlStatementRoute::ShowIndexes { .. } => Ok(SqlDispatchResult::ShowIndexes(
self.show_indexes_for_store_model(authority.store_path(), authority.model()),
)),
SqlStatementRoute::ShowColumns { .. } => Ok(SqlDispatchResult::ShowColumns(
self.show_columns_for_model(authority.model()),
)),
SqlStatementRoute::ShowEntities => unreachable!(
"SHOW ENTITIES is handled before authority resolution for generated query dispatch"
),
};
GeneratedSqlDispatchAttempt::new(entity_name, explain_order_field, result)
}
}