use crate::{
db::{
data::{CanonicalSlotReader, DataRow},
executor::{
StructuralCursorPage, StructuralCursorPagePayload,
projection::{
PreparedProjectionShape, ProjectionEvalError, ScalarProjectionExpr,
eval_canonical_scalar_projection_expr_with_required_value_reader_cow,
visit_prepared_projection_values_with_required_value_reader_cow,
},
terminal::{RetainedSlotRow, RowLayout},
},
query::plan::AccessPlannedQuery,
},
error::InternalError,
value::Value,
};
use std::borrow::Cow;
#[cfg(any(test, feature = "diagnostics"))]
use std::cell::RefCell;
#[cfg(feature = "sql")]
pub(in crate::db::session::sql::projection::runtime) fn project_structural_sql_projection_page(
row_layout: RowLayout,
prepared_projection: &PreparedProjectionShape,
page: StructuralCursorPage,
) -> Result<Vec<Vec<Value>>, InternalError> {
shape_structural_sql_projection_page(
row_layout,
prepared_projection,
page,
project_slot_rows_from_projection_structural,
project_data_rows_from_projection_structural,
)
}
#[cfg(feature = "sql")]
fn shape_structural_sql_projection_page<T>(
row_layout: RowLayout,
prepared_projection: &PreparedProjectionShape,
page: StructuralCursorPage,
shape_slot_rows: impl FnOnce(
&PreparedProjectionShape,
Vec<RetainedSlotRow>,
) -> Result<Vec<Vec<T>>, InternalError>,
shape_data_rows: impl FnOnce(
RowLayout,
&PreparedProjectionShape,
&[DataRow],
) -> Result<Vec<Vec<T>>, InternalError>,
) -> Result<Vec<Vec<T>>, InternalError> {
let payload = page.into_payload();
match payload {
StructuralCursorPagePayload::SlotRows(slot_rows) => {
#[cfg(any(test, feature = "diagnostics"))]
record_sql_projection_slot_rows_path_hit();
shape_slot_rows(prepared_projection, slot_rows)
}
StructuralCursorPagePayload::DataRows(data_rows) => {
#[cfg(any(test, feature = "diagnostics"))]
record_sql_projection_data_rows_path_hit();
shape_data_rows(row_layout, prepared_projection, data_rows.as_slice())
}
}
}
fn project_slot_rows_from_projection_structural(
prepared_projection: &PreparedProjectionShape,
rows: Vec<RetainedSlotRow>,
) -> Result<Vec<Vec<Value>>, InternalError> {
let mut emit_value = std::convert::identity;
shape_slot_rows_from_projection_structural(prepared_projection, rows, &mut emit_value)
}
#[cfg(feature = "sql")]
fn shape_slot_rows_from_projection_structural<T>(
prepared_projection: &PreparedProjectionShape,
rows: Vec<RetainedSlotRow>,
emit_value: &mut impl FnMut(Value) -> T,
) -> Result<Vec<Vec<T>>, InternalError> {
if let Some(field_slots) = prepared_projection.retained_slot_direct_projection_field_slots() {
return shape_slot_rows_from_direct_field_slots(rows, field_slots, emit_value);
}
shape_dense_slot_rows_from_projection_structural(prepared_projection, rows, emit_value)
}
#[cfg(feature = "sql")]
fn shape_dense_slot_rows_from_projection_structural<T>(
prepared_projection: &PreparedProjectionShape,
rows: Vec<RetainedSlotRow>,
emit_value: &mut impl FnMut(Value) -> T,
) -> Result<Vec<Vec<T>>, InternalError> {
let projection = prepared_projection.projection();
let mut shaped_rows = Vec::with_capacity(rows.len());
for row in &rows {
let mut shaped = Vec::with_capacity(projection.len());
let mut read_slot = |slot: usize| {
row.slot_ref(slot).map(Cow::Borrowed).ok_or_else(|| {
ProjectionEvalError::MissingFieldValue {
field: format!("slot[{slot}]"),
index: slot,
}
.into_invalid_logical_plan_internal_error()
})
};
visit_prepared_projection_values_with_required_value_reader_cow(
prepared_projection.prepared(),
&mut read_slot,
&mut |value| shaped.push(emit_value(value)),
)?;
shaped_rows.push(shaped);
}
Ok(shaped_rows)
}
#[cfg(feature = "sql")]
fn shape_slot_rows_from_direct_field_slots<T>(
rows: Vec<RetainedSlotRow>,
field_slots: &[(String, usize)],
emit_value: &mut impl FnMut(Value) -> T,
) -> Result<Vec<Vec<T>>, InternalError> {
let mut shaped_rows = Vec::with_capacity(rows.len());
for mut row in rows {
let mut shaped = Vec::with_capacity(field_slots.len());
for (field_name, slot) in field_slots {
let value = row
.take_slot(*slot)
.ok_or_else(|| ProjectionEvalError::MissingFieldValue {
field: field_name.clone(),
index: *slot,
})
.map_err(ProjectionEvalError::into_invalid_logical_plan_internal_error)?;
shaped.push(emit_value(value));
}
shaped_rows.push(shaped);
}
Ok(shaped_rows)
}
#[cfg(feature = "sql")]
fn project_data_rows_from_projection_structural(
row_layout: RowLayout,
prepared_projection: &PreparedProjectionShape,
rows: &[DataRow],
) -> Result<Vec<Vec<Value>>, InternalError> {
if let Some(field_slots) = prepared_projection.data_row_direct_projection_field_slots() {
let mut emit_value = std::convert::identity;
return shape_data_rows_from_direct_field_slots(
rows,
row_layout,
field_slots,
&mut emit_value,
);
}
let compiled_fields = prepared_projection.scalar_projection_exprs();
#[cfg(any(test, feature = "diagnostics"))]
let projected_slot_mask = prepared_projection.projected_slot_mask();
#[cfg(not(any(test, feature = "diagnostics")))]
let projected_slot_mask = &[];
#[cfg(any(test, feature = "diagnostics"))]
record_sql_projection_data_rows_scalar_fallback_hit();
let mut emit_value = std::convert::identity;
shape_scalar_data_rows_from_projection_structural(
compiled_fields,
rows,
row_layout,
projected_slot_mask,
&mut emit_value,
)
}
#[cfg(feature = "sql")]
fn shape_data_rows_from_direct_field_slots<T>(
rows: &[DataRow],
row_layout: RowLayout,
field_slots: &[(String, usize)],
emit_value: &mut impl FnMut(Value) -> T,
) -> Result<Vec<Vec<T>>, InternalError> {
let mut shaped_rows = Vec::with_capacity(rows.len());
for (data_key, raw_row) in rows {
let row_fields = row_layout.open_raw_row(raw_row)?;
row_fields.validate_storage_key(data_key)?;
let mut shaped = Vec::with_capacity(field_slots.len());
for (_field_name, slot) in field_slots {
#[cfg(any(test, feature = "diagnostics"))]
record_sql_projection_data_rows_slot_access(true);
let value = row_fields.required_value_by_contract(*slot)?;
shaped.push(emit_value(value));
}
shaped_rows.push(shaped);
}
Ok(shaped_rows)
}
#[cfg(feature = "sql")]
fn shape_scalar_data_rows_from_projection_structural<T>(
compiled_fields: &[ScalarProjectionExpr],
rows: &[DataRow],
row_layout: RowLayout,
projected_slot_mask: &[bool],
emit_value: &mut impl FnMut(Value) -> T,
) -> Result<Vec<Vec<T>>, InternalError> {
let mut shaped_rows = Vec::with_capacity(rows.len());
#[cfg(not(any(test, feature = "diagnostics")))]
let _ = projected_slot_mask;
for (data_key, raw_row) in rows {
let row_fields = row_layout.open_raw_row(raw_row)?;
row_fields.validate_storage_key(data_key)?;
let mut shaped = Vec::with_capacity(compiled_fields.len());
for compiled in compiled_fields {
let value = eval_canonical_scalar_projection_expr_with_required_value_reader_cow(
compiled,
&mut |slot| {
#[cfg(any(test, feature = "diagnostics"))]
record_sql_projection_data_rows_slot_access(
projected_slot_mask.get(slot).copied().unwrap_or(false),
);
row_fields.required_value_by_contract_cow(slot)
},
)?;
shaped.push(emit_value(value.into_owned()));
}
shaped_rows.push(shaped);
}
Ok(shaped_rows)
}
#[cfg(feature = "sql")]
pub(in crate::db::session::sql::projection::runtime) fn finalize_sql_projection_rows(
plan: &AccessPlannedQuery,
rows: Vec<Vec<Value>>,
) -> Result<Vec<Vec<Value>>, InternalError> {
if !plan.scalar_plan().distinct {
return Ok(rows);
}
let mut distinct_rows = crate::db::executor::group::GroupKeySet::new();
let mut deduped_rows = Vec::with_capacity(rows.len());
for row in rows {
if distinct_rows
.insert_value(&Value::List(row.clone()))
.map_err(crate::db::executor::group::KeyCanonicalError::into_internal_error)?
{
deduped_rows.push(row);
}
}
if let Some(page) = plan.scalar_plan().page.as_ref() {
apply_sql_projection_page_window(&mut deduped_rows, page.offset, page.limit);
}
Ok(deduped_rows)
}
#[cfg(feature = "sql")]
pub(in crate::db::session::sql::projection::runtime) fn apply_sql_projection_page_window<T>(
rows: &mut Vec<T>,
offset: u32,
limit: Option<u32>,
) {
let offset = usize::min(rows.len(), usize::try_from(offset).unwrap_or(usize::MAX));
if offset > 0 {
rows.drain(..offset);
}
if let Some(limit) = limit {
let limit = usize::try_from(limit).unwrap_or(usize::MAX);
if rows.len() > limit {
rows.truncate(limit);
}
}
}
#[cfg(any(test, feature = "diagnostics"))]
#[cfg_attr(all(test, not(feature = "diagnostics")), allow(unreachable_pub))]
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub struct SqlProjectionMaterializationMetrics {
pub hybrid_covering_path_hits: u64,
pub hybrid_covering_index_field_accesses: u64,
pub hybrid_covering_row_field_accesses: u64,
pub projected_rows_path_hits: u64,
pub slot_rows_path_hits: u64,
pub data_rows_path_hits: u64,
pub data_rows_scalar_fallback_hits: u64,
pub data_rows_generic_fallback_hits: u64,
pub data_rows_projected_slot_accesses: u64,
pub data_rows_non_projected_slot_accesses: u64,
pub full_row_decode_materializations: u64,
}
#[cfg(any(test, feature = "diagnostics"))]
std::thread_local! {
static SQL_PROJECTION_MATERIALIZATION_METRICS: RefCell<Option<SqlProjectionMaterializationMetrics>> = const {
RefCell::new(None)
};
}
#[cfg(any(test, feature = "diagnostics"))]
fn update_sql_projection_materialization_metrics(
update: impl FnOnce(&mut SqlProjectionMaterializationMetrics),
) {
SQL_PROJECTION_MATERIALIZATION_METRICS.with(|metrics| {
let mut metrics = metrics.borrow_mut();
let Some(metrics) = metrics.as_mut() else {
return;
};
update(metrics);
});
}
#[cfg(any(test, feature = "diagnostics"))]
fn record_sql_projection_slot_rows_path_hit() {
update_sql_projection_materialization_metrics(|metrics| {
metrics.slot_rows_path_hits = metrics.slot_rows_path_hits.saturating_add(1);
});
}
#[cfg(any(test, feature = "diagnostics"))]
fn record_sql_projection_data_rows_path_hit() {
update_sql_projection_materialization_metrics(|metrics| {
metrics.data_rows_path_hits = metrics.data_rows_path_hits.saturating_add(1);
});
}
#[cfg(any(test, feature = "diagnostics"))]
pub(in crate::db::session::sql::projection::runtime) fn record_sql_projection_hybrid_covering_path_hit()
{
update_sql_projection_materialization_metrics(|metrics| {
metrics.hybrid_covering_path_hits = metrics.hybrid_covering_path_hits.saturating_add(1);
});
}
#[cfg(any(test, feature = "diagnostics"))]
pub(in crate::db::session::sql::projection::runtime) fn record_sql_projection_hybrid_covering_index_field_access()
{
update_sql_projection_materialization_metrics(|metrics| {
metrics.hybrid_covering_index_field_accesses = metrics
.hybrid_covering_index_field_accesses
.saturating_add(1);
});
}
#[cfg(any(test, feature = "diagnostics"))]
pub(in crate::db::session::sql::projection::runtime) fn record_sql_projection_hybrid_covering_row_field_access()
{
update_sql_projection_materialization_metrics(|metrics| {
metrics.hybrid_covering_row_field_accesses =
metrics.hybrid_covering_row_field_accesses.saturating_add(1);
});
}
#[cfg(any(test, feature = "diagnostics"))]
fn record_sql_projection_data_rows_scalar_fallback_hit() {
update_sql_projection_materialization_metrics(|metrics| {
metrics.data_rows_scalar_fallback_hits =
metrics.data_rows_scalar_fallback_hits.saturating_add(1);
});
}
#[cfg(any(test, feature = "diagnostics"))]
fn record_sql_projection_data_rows_slot_access(projected_slot: bool) {
update_sql_projection_materialization_metrics(|metrics| {
if projected_slot {
metrics.data_rows_projected_slot_accesses =
metrics.data_rows_projected_slot_accesses.saturating_add(1);
} else {
metrics.data_rows_non_projected_slot_accesses = metrics
.data_rows_non_projected_slot_accesses
.saturating_add(1);
}
});
}
#[cfg(feature = "diagnostics")]
pub fn with_sql_projection_materialization_metrics<T>(
f: impl FnOnce() -> T,
) -> (T, SqlProjectionMaterializationMetrics) {
SQL_PROJECTION_MATERIALIZATION_METRICS.with(|metrics| {
debug_assert!(
metrics.borrow().is_none(),
"sql projection metrics captures should not nest"
);
*metrics.borrow_mut() = Some(SqlProjectionMaterializationMetrics::default());
});
let result = f();
let metrics = SQL_PROJECTION_MATERIALIZATION_METRICS
.with(|metrics| metrics.borrow_mut().take().unwrap_or_default());
(result, metrics)
}
#[cfg(all(test, not(feature = "diagnostics")))]
pub(crate) fn with_sql_projection_materialization_metrics<T>(
f: impl FnOnce() -> T,
) -> (T, SqlProjectionMaterializationMetrics) {
SQL_PROJECTION_MATERIALIZATION_METRICS.with(|metrics| {
debug_assert!(
metrics.borrow().is_none(),
"sql projection metrics captures should not nest"
);
*metrics.borrow_mut() = Some(SqlProjectionMaterializationMetrics::default());
});
let result = f();
let metrics = SQL_PROJECTION_MATERIALIZATION_METRICS
.with(|metrics| metrics.borrow_mut().take().unwrap_or_default());
(result, metrics)
}