use crate::{
db::{
Db,
executor::{
CoveringProjectionMetricsRecorder, ProjectionMaterializationMetricsRecorder,
SharedPreparedExecutionPlan, SharedPreparedProjectionRuntimeParts,
pipeline::execute_initial_scalar_retained_slot_page_for_canister,
projection::{
MaterializedProjectionRows, project_distinct_structural_projection_page,
project_structural_projection_page,
try_execute_covering_projection_rows_for_canister,
},
},
query::plan::LogicalPlan,
},
error::InternalError,
traits::CanisterKind,
value::Value,
};
#[cfg(feature = "sql")]
pub(in crate::db) struct StructuralProjectionRequest {
debug: bool,
prepared_plan: SharedPreparedExecutionPlan,
covering_metrics: CoveringProjectionMetricsRecorder,
materialization_metrics: ProjectionMaterializationMetricsRecorder,
}
#[cfg(feature = "sql")]
impl StructuralProjectionRequest {
pub(in crate::db) const fn new(
debug: bool,
prepared_plan: SharedPreparedExecutionPlan,
covering_metrics: CoveringProjectionMetricsRecorder,
materialization_metrics: ProjectionMaterializationMetricsRecorder,
) -> Self {
Self {
debug,
prepared_plan,
covering_metrics,
materialization_metrics,
}
}
}
#[cfg(feature = "sql")]
#[derive(Debug)]
pub(in crate::db) struct StructuralProjectionResult {
rows: MaterializedProjectionRows,
}
#[cfg(feature = "sql")]
impl StructuralProjectionResult {
const fn new(rows: MaterializedProjectionRows) -> Self {
Self { rows }
}
#[must_use]
pub(in crate::db) fn into_value_rows(self) -> Vec<Vec<Value>> {
self.rows.into_value_rows()
}
#[must_use]
pub(in crate::db) fn row_count(&self) -> u32 {
u32::try_from(self.rows.len()).unwrap_or(u32::MAX)
}
}
#[cfg(feature = "sql")]
pub(in crate::db) fn execute_structural_projection_result<C>(
db: &Db<C>,
request: StructuralProjectionRequest,
) -> Result<StructuralProjectionResult, InternalError>
where
C: CanisterKind,
{
let StructuralProjectionRequest {
debug,
prepared_plan,
covering_metrics,
materialization_metrics,
} = request;
let SharedPreparedProjectionRuntimeParts {
authority,
plan,
prepared_projection_shape,
} = prepared_plan.into_projection_runtime_parts();
let mut execution_plan = plan.clone();
if execution_plan.scalar_plan().distinct {
match &mut execution_plan.logical {
LogicalPlan::Scalar(scalar) => scalar.page = None,
LogicalPlan::Grouped(grouped) => grouped.scalar.page = None,
}
}
let distinct = execution_plan.scalar_plan().distinct;
if !distinct
&& let Some(projected) = try_execute_covering_projection_rows_for_canister(
db,
authority,
&execution_plan,
covering_metrics,
)?
{
let rows = MaterializedProjectionRows::from_value_rows(projected.into_value_rows());
return Ok(StructuralProjectionResult::new(rows));
}
let row_layout = authority.row_layout();
let prepared_projection = prepared_projection_shape.as_deref().ok_or_else(|| {
InternalError::query_executor_invariant(
"structural projection runtime requires one frozen projection shape",
)
})?;
let page = execute_initial_scalar_retained_slot_page_for_canister(
db,
debug,
authority,
execution_plan,
)?;
let rows = if distinct {
project_distinct_structural_projection_page(
row_layout,
prepared_projection,
&plan,
page,
materialization_metrics,
)?
} else {
project_structural_projection_page(
row_layout,
prepared_projection,
page,
materialization_metrics,
)?
};
Ok(StructuralProjectionResult::new(rows))
}