use crate::{
db::{
DbSession, MissingRowPolicy, QueryError,
executor::{
EntityAuthority, execute_initial_grouped_rows_for_canister,
execute_sql_delete_projection_for_canister,
},
session::sql::{
SqlDispatchResult,
projection::{
SqlProjectionPayload, projection_labels_from_fields,
sql_projection_rows_from_kernel_rows,
},
surface::{SqlSurface, session_sql_lane, unsupported_sql_lane_message},
},
sql::lowering::{
LoweredBaseQueryShape, LoweredSelectShape, LoweredSqlCommand, LoweredSqlQuery,
bind_lowered_sql_delete_query_structural, bind_lowered_sql_select_query_structural,
},
},
traits::CanisterKind,
value::Value,
};
type SqlQuerySurfaceRowParts = (Vec<String>, Vec<Vec<Value>>, u32);
impl<C: CanisterKind> DbSession<C> {
fn structural_query_from_lowered_select(
select: LoweredSelectShape,
authority: EntityAuthority,
) -> Result<crate::db::query::intent::StructuralQuery, QueryError> {
bind_lowered_sql_select_query_structural(
authority.model(),
select,
MissingRowPolicy::Ignore,
)
.map_err(QueryError::from_sql_lowering_error)
}
#[inline(never)]
fn execute_lowered_sql_projection_core(
&self,
select: LoweredSelectShape,
authority: EntityAuthority,
) -> Result<SqlProjectionPayload, QueryError> {
let structural = Self::structural_query_from_lowered_select(select, authority)?;
self.execute_structural_sql_projection(structural, authority)
}
#[inline(never)]
pub(in crate::db::session::sql::dispatch) fn execute_lowered_sql_dispatch_select_core(
&self,
select: LoweredSelectShape,
authority: EntityAuthority,
) -> Result<SqlDispatchResult, QueryError> {
self.execute_lowered_sql_projection_core(select, authority)
.map(SqlProjectionPayload::into_dispatch_result)
}
#[inline(never)]
pub(in crate::db::session::sql::dispatch) fn execute_lowered_sql_dispatch_select_text_core(
&self,
select: LoweredSelectShape,
authority: EntityAuthority,
) -> Result<SqlDispatchResult, QueryError> {
let structural = Self::structural_query_from_lowered_select(select, authority)?;
self.execute_structural_sql_projection_text(structural, authority)
}
#[inline(never)]
pub(in crate::db::session::sql::dispatch) fn execute_lowered_sql_grouped_dispatch_select_core(
&self,
select: LoweredSelectShape,
authority: EntityAuthority,
columns: Vec<String>,
) -> Result<SqlDispatchResult, QueryError> {
let structural = Self::structural_query_from_lowered_select(select, authority)?;
let visible_indexes =
self.visible_indexes_for_store_model(authority.store_path(), authority.model())?;
let page = execute_initial_grouped_rows_for_canister(
&self.db,
self.debug,
authority,
structural.build_plan_with_visible_indexes(&visible_indexes)?,
)
.map_err(QueryError::execute)?;
let next_cursor = page
.next_cursor
.map(|cursor| {
let Some(token) = cursor.as_grouped() else {
return Err(QueryError::grouped_paged_emitted_scalar_continuation());
};
token.encode_hex().map_err(|err| {
QueryError::serialize_internal(format!(
"failed to serialize grouped continuation cursor: {err}"
))
})
})
.transpose()?;
let row_count = u32::try_from(page.rows.len()).unwrap_or(u32::MAX);
Ok(SqlDispatchResult::Grouped {
columns,
rows: page.rows,
row_count,
next_cursor,
})
}
pub(in crate::db::session::sql::dispatch) fn execute_lowered_sql_dispatch_delete_core(
&self,
delete: &LoweredBaseQueryShape,
authority: EntityAuthority,
) -> Result<SqlDispatchResult, QueryError> {
let structural = bind_lowered_sql_delete_query_structural(
authority.model(),
delete.clone(),
MissingRowPolicy::Ignore,
);
let visible_indexes =
self.visible_indexes_for_store_model(authority.store_path(), authority.model())?;
let deleted = execute_sql_delete_projection_for_canister(
&self.db,
authority,
structural.build_plan_with_visible_indexes(&visible_indexes)?,
)
.map_err(QueryError::execute)?;
let (rows, row_count) = deleted.into_parts();
let rows = sql_projection_rows_from_kernel_rows(rows);
Ok(SqlProjectionPayload::new(
projection_labels_from_fields(authority.fields()),
rows,
row_count,
)
.into_dispatch_result())
}
#[doc(hidden)]
pub fn execute_lowered_sql_dispatch_query_for_authority(
&self,
lowered: LoweredSqlCommand,
authority: EntityAuthority,
) -> Result<SqlDispatchResult, QueryError> {
self.execute_lowered_sql_dispatch_query_text_core(lowered, authority)
}
#[doc(hidden)]
pub fn execute_lowered_sql_projection_for_authority(
&self,
lowered: &LoweredSqlCommand,
authority: EntityAuthority,
) -> Result<SqlQuerySurfaceRowParts, QueryError> {
let Some(query) = lowered.query() else {
return Err(QueryError::unsupported_query(unsupported_sql_lane_message(
SqlSurface::QueryFrom,
session_sql_lane(lowered),
)));
};
match query {
LoweredSqlQuery::Select(select) => self
.execute_lowered_sql_projection_core(select.clone(), authority)
.map(SqlProjectionPayload::into_parts),
LoweredSqlQuery::Delete(delete) => self
.execute_lowered_sql_dispatch_delete_core(delete, authority)
.and_then(|dispatch| match dispatch {
SqlDispatchResult::Projection {
columns,
rows,
row_count,
} => Ok((columns, rows, row_count)),
SqlDispatchResult::Grouped { .. }
| SqlDispatchResult::ProjectionText { .. }
| SqlDispatchResult::Explain(_)
| SqlDispatchResult::Describe(_)
| SqlDispatchResult::ShowIndexes(_)
| SqlDispatchResult::ShowColumns(_)
| SqlDispatchResult::ShowEntities(_) => Err(QueryError::unsupported_query(
"generated SQL query dispatch requires row-shaped SELECT or DELETE",
)),
}),
}
}
fn execute_lowered_sql_dispatch_query_text_core(
&self,
lowered: LoweredSqlCommand,
authority: EntityAuthority,
) -> Result<SqlDispatchResult, QueryError> {
let lane = session_sql_lane(&lowered);
let Some(query) = lowered.into_query() else {
return Err(QueryError::unsupported_query(unsupported_sql_lane_message(
SqlSurface::QueryFrom,
lane,
)));
};
match query {
LoweredSqlQuery::Select(select) => {
self.execute_lowered_sql_dispatch_select_text_core(select, authority)
}
LoweredSqlQuery::Delete(delete) => {
self.execute_lowered_sql_dispatch_delete_core(&delete, authority)
}
}
}
}