use crate::{
db::{
Db,
executor::{
CoveringProjectionMetricsRecorder, ProjectionMaterializationMetricsRecorder,
SharedPreparedExecutionPlan, SharedPreparedProjectionRuntimeParts,
pipeline::execute_initial_scalar_retained_slot_page_from_runtime_parts_for_canister,
projection::{
MaterializedProjectionRows, project, project_distinct,
try_execute_covering_projection_rows_for_canister,
},
saturating_u32_len,
},
},
error::InternalError,
traits::CanisterKind,
value::Value,
};
#[cfg(feature = "sql")]
pub(in crate::db) struct StructuralProjectionRequest {
debug: bool,
prepared_plan: SharedPreparedExecutionPlan,
covering_metrics: CoveringProjectionMetricsRecorder,
materialization_metrics: ProjectionMaterializationMetricsRecorder,
}
#[cfg(feature = "sql")]
impl StructuralProjectionRequest {
pub(in crate::db) const fn new(
debug: bool,
prepared_plan: SharedPreparedExecutionPlan,
covering_metrics: CoveringProjectionMetricsRecorder,
materialization_metrics: ProjectionMaterializationMetricsRecorder,
) -> Self {
Self {
debug,
prepared_plan,
covering_metrics,
materialization_metrics,
}
}
}
#[cfg(feature = "sql")]
#[derive(Debug)]
pub(in crate::db) struct StructuralProjectionResult {
rows: MaterializedProjectionRows,
}
#[cfg(feature = "sql")]
impl StructuralProjectionResult {
const fn new(rows: MaterializedProjectionRows) -> Self {
Self { rows }
}
#[must_use]
pub(in crate::db) fn into_value_rows(self) -> Vec<Vec<Value>> {
self.rows.into_value_rows()
}
#[must_use]
pub(in crate::db) fn row_count(&self) -> u32 {
saturating_u32_len(self.rows.len())
}
}
#[cfg(feature = "sql")]
pub(in crate::db) fn execute_structural_projection_result<C>(
db: &Db<C>,
request: StructuralProjectionRequest,
) -> Result<StructuralProjectionResult, InternalError>
where
C: CanisterKind,
{
let StructuralProjectionRequest {
debug,
prepared_plan,
covering_metrics,
materialization_metrics,
} = request;
let SharedPreparedProjectionRuntimeParts {
authority,
prepared_projection_shape,
scalar_runtime,
} = prepared_plan.into_projection_runtime_parts()?;
let distinct = scalar_runtime.plan_core.plan().scalar_plan().distinct;
let distinct_plan = distinct.then(|| scalar_runtime.plan_core.plan().clone());
let scalar_runtime = if distinct {
scalar_runtime.into_scalar_page_suppressed()
} else {
scalar_runtime
};
let execution_plan = scalar_runtime.plan_core.plan();
if !distinct
&& let Some(projected) = try_execute_covering_projection_rows_for_canister(
db,
authority,
execution_plan,
covering_metrics,
)?
{
let rows = MaterializedProjectionRows::from_value_rows(projected.into_value_rows());
return Ok(StructuralProjectionResult::new(rows));
}
let row_layout = authority.row_layout();
let prepared_projection = prepared_projection_shape.as_deref().ok_or_else(|| {
InternalError::query_executor_invariant(
"structural projection runtime requires one frozen projection shape",
)
})?;
let page = execute_initial_scalar_retained_slot_page_from_runtime_parts_for_canister(
db,
debug,
scalar_runtime,
distinct,
)?;
let rows = if distinct {
project_distinct(
row_layout,
prepared_projection,
distinct_plan.as_ref().ok_or_else(|| {
InternalError::query_executor_invariant(
"distinct projection materialization requires logical plan metadata",
)
})?,
page,
materialization_metrics,
)?
} else {
project(
row_layout,
prepared_projection,
page,
materialization_metrics,
)?
};
Ok(StructuralProjectionResult::new(rows))
}