use crate::{
db::{
Db,
data::{CanonicalSlotReader, DataRow, StructuralSlotReader},
executor::{
EntityAuthority,
pipeline::entrypoints::execute_initial_scalar_rows_for_canister,
projection::{
ProjectionEvalError,
eval::eval_canonical_scalar_projection_expr,
materialize::{
prepare_projection_plan, visit_prepared_projection_values_with_value_reader,
visit_projection_values_with_required_value_reader,
},
},
},
query::plan::{
AccessPlannedQuery,
expr::{ProjectionField, ProjectionSpec, projection_field_direct_field_name},
},
},
error::InternalError,
model::entity::{EntityModel, resolve_field_slot},
traits::CanisterKind,
value::Value,
};
#[cfg(feature = "sql")]
#[derive(Debug)]
pub(in crate::db) struct SqlProjectionRows {
rows: Vec<Vec<Value>>,
row_count: u32,
}
#[cfg(feature = "sql")]
impl SqlProjectionRows {
#[must_use]
pub(in crate::db) const fn new(rows: Vec<Vec<Value>>, row_count: u32) -> Self {
Self { rows, row_count }
}
#[must_use]
pub(in crate::db) fn into_parts(self) -> (Vec<Vec<Value>>, u32) {
(self.rows, self.row_count)
}
}
#[cfg(feature = "sql")]
pub(in crate::db) fn execute_sql_projection_rows_for_canister<C>(
db: &Db<C>,
debug: bool,
model: &'static EntityModel,
projection: ProjectionSpec,
authority: EntityAuthority,
plan: AccessPlannedQuery,
) -> Result<SqlProjectionRows, InternalError>
where
C: CanisterKind,
{
let page = execute_initial_scalar_rows_for_canister(db, debug, authority, plan)?;
let (slot_rows, data_rows) = page.into_sql_parts();
let projected = if let Some(slot_rows) = slot_rows {
project_slot_rows_from_projection_structural(model, &projection, slot_rows)?
} else {
project_data_rows_from_projection_structural(model, &projection, data_rows.as_slice())?
};
let row_count = u32::try_from(projected.len()).unwrap_or(u32::MAX);
Ok(SqlProjectionRows::new(projected, row_count))
}
fn project_slot_rows_from_projection_structural(
model: &'static EntityModel,
projection: &ProjectionSpec,
rows: Vec<Vec<Option<Value>>>,
) -> Result<Vec<Vec<Value>>, InternalError> {
if let Some(field_slots) = direct_projection_field_slots(model, projection) {
return project_slot_rows_from_direct_field_slots(rows, field_slots.as_slice());
}
let prepared = prepare_projection_plan(model, projection);
let mut projected_rows = Vec::with_capacity(rows.len());
for row in &rows {
let mut values = Vec::with_capacity(projection.len());
let mut read_slot = |slot: usize| row.get(slot).cloned().flatten();
visit_prepared_projection_values_with_value_reader(
&prepared,
projection,
model,
&mut read_slot,
&mut |value| values.push(value),
)
.map_err(
crate::db::executor::projection::ProjectionEvalError::into_invalid_logical_plan_internal_error,
)?;
projected_rows.push(values);
}
Ok(projected_rows)
}
#[cfg(feature = "sql")]
fn direct_projection_field_slots(
model: &'static EntityModel,
projection: &ProjectionSpec,
) -> Option<Vec<(String, usize)>> {
let mut field_slots = Vec::with_capacity(projection.len());
for field in projection.fields() {
match field {
ProjectionField::Scalar { .. } => {
let field_name = projection_field_direct_field_name(field)?;
let slot = resolve_field_slot(model, field_name)?;
if field_slots
.iter()
.any(|(_, existing_slot)| *existing_slot == slot)
{
return None;
}
field_slots.push((field_name.to_string(), slot));
}
}
}
Some(field_slots)
}
#[cfg(feature = "sql")]
fn project_slot_rows_from_direct_field_slots(
rows: Vec<Vec<Option<Value>>>,
field_slots: &[(String, usize)],
) -> Result<Vec<Vec<Value>>, InternalError> {
let mut projected_rows = Vec::with_capacity(rows.len());
for mut row in rows {
let mut values = Vec::with_capacity(field_slots.len());
for (field_name, slot) in field_slots {
let value = row
.get_mut(*slot)
.and_then(Option::take)
.ok_or_else(|| ProjectionEvalError::MissingFieldValue {
field: field_name.clone(),
index: *slot,
})
.map_err(ProjectionEvalError::into_invalid_logical_plan_internal_error)?;
values.push(value);
}
projected_rows.push(values);
}
Ok(projected_rows)
}
#[cfg(feature = "sql")]
fn project_data_rows_from_projection_structural(
model: &'static EntityModel,
projection: &ProjectionSpec,
rows: &[DataRow],
) -> Result<Vec<Vec<Value>>, InternalError> {
match prepare_projection_plan(model, projection) {
super::PreparedProjectionPlan::Generic => {
project_generic_data_rows_from_projection_structural(model, projection, rows)
}
super::PreparedProjectionPlan::Scalar(compiled_fields) => {
project_scalar_data_rows_from_projection_structural(
compiled_fields.as_slice(),
rows,
model,
)
}
}
}
#[cfg(feature = "sql")]
fn project_scalar_data_rows_from_projection_structural(
compiled_fields: &[crate::db::executor::projection::ScalarProjectionExpr],
rows: &[DataRow],
model: &'static EntityModel,
) -> Result<Vec<Vec<Value>>, InternalError> {
let mut projected_rows = Vec::with_capacity(rows.len());
for (data_key, raw_row) in rows {
let row_fields = StructuralSlotReader::from_raw_row(raw_row, model)?;
row_fields.validate_storage_key(data_key)?;
let mut values = Vec::with_capacity(compiled_fields.len());
for compiled in compiled_fields {
let value = eval_canonical_scalar_projection_expr(compiled, &row_fields)?;
values.push(value);
}
projected_rows.push(values);
}
Ok(projected_rows)
}
#[cfg(feature = "sql")]
fn project_generic_data_rows_from_projection_structural(
model: &'static EntityModel,
projection: &ProjectionSpec,
rows: &[DataRow],
) -> Result<Vec<Vec<Value>>, InternalError> {
let mut projected_rows = Vec::with_capacity(rows.len());
for (data_key, raw_row) in rows {
let row_fields = StructuralSlotReader::from_raw_row(raw_row, model)?;
row_fields.validate_storage_key(data_key)?;
let mut values = Vec::with_capacity(projection.len());
let mut slot_cache: Vec<Option<Value>> = vec![None; model.fields().len()];
let mut read_slot = |slot: usize| {
if slot_cache[slot].is_none() {
slot_cache[slot] = Some(row_fields.required_value_by_contract(slot)?);
}
slot_cache[slot].clone().ok_or_else(|| {
InternalError::executor_internal(format!(
"structural projection slot cache missing decoded value: slot={slot}",
))
})
};
visit_projection_values_with_required_value_reader(
projection,
model,
&mut read_slot,
&mut |value| values.push(value),
)?;
projected_rows.push(values);
}
Ok(projected_rows)
}