mod materialize;
#[cfg(all(feature = "sql", test))]
mod tests;
#[cfg(any(test, feature = "diagnostics"))]
use crate::db::executor::{
CoveringProjectionMetricsRecorder, ProjectionMaterializationMetricsRecorder,
};
#[cfg(feature = "sql")]
use crate::{
db::{
Db,
executor::{
SharedPreparedExecutionPlan, SharedPreparedProjectionRuntimeParts,
pipeline::execute_initial_scalar_retained_slot_page_for_canister,
project_structural_projection_page, try_execute_covering_projection_rows_for_canister,
},
query::plan::LogicalPlan,
session::sql::projection::runtime::materialize::{
finalize_sql_projection_rows, project_distinct_sql_projection_from_structural_page,
},
},
error::InternalError,
traits::CanisterKind,
value::Value,
};
#[cfg(all(test, not(feature = "diagnostics")))]
pub(crate) use crate::db::session::sql::projection::runtime::materialize::{
SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
};
#[cfg(feature = "diagnostics")]
pub use crate::db::session::sql::projection::runtime::materialize::{
SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
};
#[cfg(feature = "sql")]
#[derive(Debug)]
pub(in crate::db) struct SqlProjectionRows {
rows: Vec<Vec<Value>>,
row_count: u32,
}
#[cfg(feature = "sql")]
impl SqlProjectionRows {
#[must_use]
pub(in crate::db) const fn new(rows: Vec<Vec<Value>>, row_count: u32) -> Self {
Self { rows, row_count }
}
#[must_use]
pub(in crate::db) fn into_parts(self) -> (Vec<Vec<Value>>, u32) {
(self.rows, self.row_count)
}
}
#[cfg(all(feature = "sql", feature = "diagnostics"))]
pub(in crate::db) fn current_pure_covering_decode_local_instructions() -> u64 {
crate::db::executor::current_pure_covering_decode_local_instructions()
}
#[cfg(all(feature = "sql", feature = "diagnostics"))]
pub(in crate::db) fn current_pure_covering_row_assembly_local_instructions() -> u64 {
crate::db::executor::current_pure_covering_row_assembly_local_instructions()
}
#[cfg(any(test, feature = "diagnostics"))]
fn covering_projection_metrics_recorder() -> CoveringProjectionMetricsRecorder {
CoveringProjectionMetricsRecorder::new(
materialize::record_sql_projection_hybrid_covering_path_hit,
materialize::record_sql_projection_hybrid_covering_index_field_access,
materialize::record_sql_projection_hybrid_covering_row_field_access,
)
}
#[cfg(not(any(test, feature = "diagnostics")))]
const fn covering_projection_metrics_recorder()
-> crate::db::executor::CoveringProjectionMetricsRecorder {
crate::db::executor::CoveringProjectionMetricsRecorder::new()
}
#[cfg(any(test, feature = "diagnostics"))]
fn projection_materialization_metrics_recorder() -> ProjectionMaterializationMetricsRecorder {
ProjectionMaterializationMetricsRecorder::new(
materialize::record_sql_projection_slot_rows_path_hit,
materialize::record_sql_projection_data_rows_path_hit,
materialize::record_sql_projection_data_rows_scalar_fallback_hit,
materialize::record_sql_projection_data_rows_slot_access,
)
}
#[cfg(not(any(test, feature = "diagnostics")))]
const fn projection_materialization_metrics_recorder()
-> crate::db::executor::ProjectionMaterializationMetricsRecorder {
crate::db::executor::ProjectionMaterializationMetricsRecorder::new()
}
#[cfg(feature = "sql")]
pub(in crate::db) fn execute_sql_projection_rows_for_canister<C>(
db: &Db<C>,
debug: bool,
prepared_plan: SharedPreparedExecutionPlan,
) -> Result<SqlProjectionRows, InternalError>
where
C: CanisterKind,
{
let SharedPreparedProjectionRuntimeParts {
authority,
plan,
prepared_projection_shape,
} = prepared_plan.into_projection_runtime_parts();
let mut execution_plan = plan.clone();
if execution_plan.scalar_plan().distinct {
match &mut execution_plan.logical {
LogicalPlan::Scalar(scalar) => scalar.page = None,
LogicalPlan::Grouped(grouped) => grouped.scalar.page = None,
}
}
let distinct = execution_plan.scalar_plan().distinct;
if !distinct
&& let Some(projected) = try_execute_covering_projection_rows_for_canister(
db,
authority,
&execution_plan,
covering_projection_metrics_recorder(),
)?
{
let projected = finalize_sql_projection_rows(&plan, projected.into_value_rows())?;
let row_count = u32::try_from(projected.len()).unwrap_or(u32::MAX);
return Ok(SqlProjectionRows::new(projected, row_count));
}
let row_layout = authority.row_layout();
let prepared_projection = prepared_projection_shape.as_deref().ok_or_else(|| {
InternalError::query_executor_invariant(
"SQL projection runtime requires one frozen structural projection shape",
)
})?;
let page = execute_initial_scalar_retained_slot_page_for_canister(
db,
debug,
authority,
execution_plan,
)?;
let projected = if distinct {
project_distinct_sql_projection_from_structural_page(
row_layout,
prepared_projection,
&plan,
page,
)?
} else {
let projected = project_structural_projection_page(
row_layout,
prepared_projection,
page,
projection_materialization_metrics_recorder(),
)?;
finalize_sql_projection_rows(&plan, projected.into_value_rows())?
};
let row_count = u32::try_from(projected.len()).unwrap_or(u32::MAX);
Ok(SqlProjectionRows::new(projected, row_count))
}