use crate::{
db::{
executor::projection::eval::{
ProjectionEvalError, eval_compiled_expr_with_value_ref_reader,
},
query::plan::{
AccessPlannedQuery,
expr::{CompiledExpr, ProjectionSpec},
},
},
error::InternalError,
model::{
entity::EntityModel,
field::{LeafCodec, ScalarCodec},
},
value::Value,
};
#[derive(Debug)]
pub(in crate::db) enum PreparedProjectionPlan {
Scalar(Vec<CompiledExpr>),
}
#[derive(Debug)]
pub(in crate::db) struct PreparedProjectionShape {
projection: ProjectionSpec,
prepared: PreparedProjectionPlan,
projection_is_model_identity: bool,
retained_slot_direct_projection_field_slots: Option<Vec<(String, usize)>>,
retained_slot_direct_octet_length_projection_slots: Vec<Option<usize>>,
data_row_direct_projection_field_slots: Option<Vec<(String, usize)>>,
#[cfg(any(test, feature = "diagnostics"))]
projected_slot_mask: Vec<bool>,
}
impl PreparedProjectionShape {
#[must_use]
pub(in crate::db) const fn projection(&self) -> &ProjectionSpec {
&self.projection
}
#[must_use]
pub(in crate::db) const fn prepared(&self) -> &PreparedProjectionPlan {
&self.prepared
}
#[must_use]
pub(in crate::db) const fn scalar_projection_exprs(&self) -> &[CompiledExpr] {
let PreparedProjectionPlan::Scalar(compiled_fields) = self.prepared();
compiled_fields.as_slice()
}
#[must_use]
pub(in crate::db::executor) const fn projection_is_model_identity(&self) -> bool {
self.projection_is_model_identity
}
#[must_use]
pub(in crate::db) fn retained_slot_direct_projection_field_slots(
&self,
) -> Option<&[(String, usize)]> {
self.retained_slot_direct_projection_field_slots.as_deref()
}
#[must_use]
pub(in crate::db) const fn retained_slot_direct_octet_length_projection_slots(
&self,
) -> &[Option<usize>] {
self.retained_slot_direct_octet_length_projection_slots
.as_slice()
}
#[must_use]
pub(in crate::db) fn data_row_direct_projection_field_slots(
&self,
) -> Option<&[(String, usize)]> {
self.data_row_direct_projection_field_slots.as_deref()
}
#[cfg(any(test, feature = "diagnostics"))]
#[must_use]
pub(in crate::db) const fn projected_slot_mask(&self) -> &[bool] {
self.projected_slot_mask.as_slice()
}
#[cfg(test)]
#[must_use]
pub(in crate::db) const fn from_test_parts(
projection: ProjectionSpec,
prepared: PreparedProjectionPlan,
projection_is_model_identity: bool,
retained_slot_direct_projection_field_slots: Option<Vec<(String, usize)>>,
data_row_direct_projection_field_slots: Option<Vec<(String, usize)>>,
projected_slot_mask: Vec<bool>,
) -> Self {
Self {
projection,
prepared,
projection_is_model_identity,
retained_slot_direct_projection_field_slots,
retained_slot_direct_octet_length_projection_slots: Vec::new(),
data_row_direct_projection_field_slots,
projected_slot_mask,
}
}
}
pub(in crate::db::executor) type PreparedSlotProjectionValidation = PreparedProjectionShape;
pub(in crate::db::executor) trait ProjectionValidationRow {
#[must_use]
fn projection_validation_slot_value(&self, slot: usize) -> Option<&Value>;
}
#[must_use]
pub(in crate::db) fn prepare_projection_shape_from_plan(
model: &'static EntityModel,
plan: &AccessPlannedQuery,
) -> PreparedProjectionShape {
let projection = plan.frozen_projection_spec().clone();
let compiled_projection = plan
.scalar_projection_plan()
.expect("scalar execution projection shapes must carry one planner-compiled scalar program")
.to_vec();
let retained_slot_direct_projection_field_slots =
retained_slot_direct_projection_field_slots_from_projection(
&projection,
plan.frozen_direct_projection_slots(),
);
let retained_slot_direct_octet_length_projection_slots =
retained_slot_direct_octet_length_projection_slots_from_compiled(
model,
&compiled_projection,
);
let data_row_direct_projection_field_slots =
data_row_direct_projection_field_slots_from_projection(model, &projection);
#[cfg(any(test, feature = "diagnostics"))]
let projected_slot_mask =
projected_slot_mask_from_slots(model.fields().len(), plan.projected_slot_mask());
PreparedProjectionShape {
projection,
prepared: PreparedProjectionPlan::Scalar(compiled_projection),
projection_is_model_identity: plan.projection_is_model_identity(),
retained_slot_direct_projection_field_slots,
retained_slot_direct_octet_length_projection_slots,
data_row_direct_projection_field_slots,
#[cfg(any(test, feature = "diagnostics"))]
projected_slot_mask,
}
}
pub(in crate::db::executor) fn validate_prepared_projection_row(
prepared_validation: &PreparedSlotProjectionValidation,
row: &impl ProjectionValidationRow,
) -> Result<(), InternalError> {
if prepared_validation.projection_is_model_identity() {
return Ok(());
}
let PreparedProjectionPlan::Scalar(compiled_fields) = prepared_validation.prepared();
for compiled in compiled_fields {
let mut read_slot = |slot| row.projection_validation_slot_value(slot);
eval_compiled_expr_with_value_ref_reader(compiled, &mut read_slot)
.map_err(ProjectionEvalError::into_invalid_logical_plan_internal_error)?;
}
Ok(())
}
fn retained_slot_direct_projection_field_slots_from_projection(
projection: &ProjectionSpec,
direct_projection_slots: Option<&[usize]>,
) -> Option<Vec<(String, usize)>> {
let direct_projection_slots = direct_projection_slots?;
let mut field_slots = Vec::with_capacity(direct_projection_slots.len());
for (field, slot) in projection
.fields()
.zip(direct_projection_slots.iter().copied())
{
let field_name = field.direct_field_name()?;
field_slots.push((field_name.to_string(), slot));
}
Some(field_slots)
}
fn data_row_direct_projection_field_slots_from_projection(
model: &EntityModel,
projection: &ProjectionSpec,
) -> Option<Vec<(String, usize)>> {
let mut field_slots = Vec::with_capacity(projection.len());
for field in projection.fields() {
let field_name = field.direct_field_name()?;
let slot = model.resolve_field_slot(field_name)?;
field_slots.push((field_name.to_string(), slot));
}
Some(field_slots)
}
fn retained_slot_direct_octet_length_projection_slots_from_compiled(
model: &EntityModel,
compiled_projection: &[CompiledExpr],
) -> Vec<Option<usize>> {
let mut slots = Vec::with_capacity(compiled_projection.len());
let mut has_direct_octet_length = false;
for expr in compiled_projection {
let slot = expr.direct_octet_length_slot().and_then(|(slot, _field)| {
slot_uses_scalar_byte_length_codec(model, slot).then_some(slot)
});
has_direct_octet_length |= slot.is_some();
slots.push(slot);
}
if has_direct_octet_length {
slots
} else {
Vec::new()
}
}
fn slot_uses_scalar_byte_length_codec(model: &EntityModel, slot: usize) -> bool {
model.fields().get(slot).is_some_and(|field| {
matches!(
field.leaf_codec(),
LeafCodec::Scalar(ScalarCodec::Blob | ScalarCodec::Text)
)
})
}
#[cfg(any(test, feature = "diagnostics"))]
fn projected_slot_mask_from_slots(field_count: usize, projected_slots: &[bool]) -> Vec<bool> {
let mut mask = vec![false; field_count];
for (slot, projected) in projected_slots.iter().copied().enumerate() {
if projected && let Some(entry) = mask.get_mut(slot) {
*entry = true;
}
}
mask
}