use crate::{
db::{
DbSession, MissingRowPolicy, QueryError,
executor::{EntityAuthority, pipeline::execute_initial_grouped_rows_for_canister},
query::intent::StructuralQuery,
session::sql::{
SqlCacheAttribution, SqlCompiledCommandCacheKey, SqlStatementResult,
projection::{SqlProjectionPayload, grouped_sql_statement_result},
},
sql::lowering::{LoweredSelectShape, bind_lowered_sql_select_query_structural},
},
traits::CanisterKind,
};
impl<C: CanisterKind> DbSession<C> {
fn structural_query_from_lowered_select(
select: LoweredSelectShape,
authority: EntityAuthority,
) -> Result<crate::db::query::intent::StructuralQuery, QueryError> {
bind_lowered_sql_select_query_structural(
authority.model(),
select,
MissingRowPolicy::Ignore,
)
.map_err(QueryError::from_sql_lowering_error)
}
fn execute_lowered_sql_select_with<T>(
&self,
select: LoweredSelectShape,
authority: EntityAuthority,
execute_structural: impl FnOnce(
&Self,
StructuralQuery,
EntityAuthority,
) -> Result<T, QueryError>,
) -> Result<T, QueryError> {
let structural = Self::structural_query_from_lowered_select(select, authority)?;
execute_structural(self, structural, authority)
}
#[inline(never)]
pub(in crate::db::session::sql::execute) fn execute_lowered_sql_projection_core(
&self,
select: LoweredSelectShape,
authority: EntityAuthority,
) -> Result<SqlProjectionPayload, QueryError> {
self.execute_lowered_sql_select_with(select, authority, |session, query, authority| {
session
.execute_structural_sql_projection(query, authority, None)
.map(|(payload, _)| payload)
})
}
#[inline(never)]
pub(in crate::db::session::sql::execute) fn execute_structural_sql_grouped_statement_select_core(
&self,
structural: StructuralQuery,
authority: EntityAuthority,
compiled_cache_key: Option<&SqlCompiledCommandCacheKey>,
) -> Result<(SqlStatementResult, SqlCacheAttribution), QueryError> {
let (entry, cache_attribution) =
self.planned_sql_select_with_visibility(&structural, authority, compiled_cache_key)?;
let (prepared_plan, columns, fixed_scales) = entry.into_parts();
let plan = prepared_plan.logical_plan().clone();
let page = execute_initial_grouped_rows_for_canister(&self.db, self.debug, authority, plan)
.map_err(QueryError::execute)?;
let next_cursor = page
.next_cursor
.map(|cursor| {
let Some(token) = cursor.as_grouped() else {
return Err(QueryError::grouped_paged_emitted_scalar_continuation());
};
token.encode_hex().map_err(|err| {
QueryError::serialize_internal(format!(
"failed to serialize grouped continuation cursor: {err}"
))
})
})
.transpose()?;
Ok((
grouped_sql_statement_result(columns, fixed_scales, page.rows, next_cursor),
cache_attribution,
))
}
}