use crate::{
db::{
Db,
data::{CanonicalSlotReader, DataRow, StructuralSlotReader},
executor::{
EntityAuthority,
pipeline::entrypoints::{
execute_initial_scalar_rows_for_canister,
execute_initial_scalar_text_rows_for_canister,
},
projection::{
ProjectionEvalError, direct_projection_field_slots,
eval::eval_canonical_scalar_projection_expr_with_required_value_reader,
materialize::{
prepare_projection_plan, visit_prepared_projection_values_with_value_reader,
visit_projection_values_with_required_value_reader,
},
},
terminal::RetainedSlotRow,
},
query::plan::{
AccessPlannedQuery,
expr::{ProjectionSpec, projection_field_direct_field_name},
},
},
error::InternalError,
model::entity::{EntityModel, resolve_field_slot},
traits::CanisterKind,
value::{Value, ValueEnum},
};
#[cfg(any(test, feature = "structural-read-metrics"))]
use std::cell::RefCell;
#[cfg(feature = "sql")]
#[derive(Debug)]
pub(in crate::db) struct SqlProjectionRows {
rows: Vec<Vec<Value>>,
row_count: u32,
}
#[cfg(feature = "sql")]
impl SqlProjectionRows {
#[must_use]
pub(in crate::db) const fn new(rows: Vec<Vec<Value>>, row_count: u32) -> Self {
Self { rows, row_count }
}
#[must_use]
pub(in crate::db) fn into_parts(self) -> (Vec<Vec<Value>>, u32) {
(self.rows, self.row_count)
}
}
#[cfg(feature = "sql")]
#[derive(Debug)]
pub(in crate::db) struct SqlProjectionTextRows {
rows: Vec<Vec<String>>,
row_count: u32,
}
#[cfg(feature = "sql")]
impl SqlProjectionTextRows {
#[must_use]
pub(in crate::db) const fn new(rows: Vec<Vec<String>>, row_count: u32) -> Self {
Self { rows, row_count }
}
#[must_use]
pub(in crate::db) fn into_parts(self) -> (Vec<Vec<String>>, u32) {
(self.rows, self.row_count)
}
}
#[cfg(feature = "sql")]
pub(in crate::db) fn execute_sql_projection_rows_for_canister<C>(
db: &Db<C>,
debug: bool,
model: &'static EntityModel,
projection: ProjectionSpec,
authority: EntityAuthority,
plan: AccessPlannedQuery,
) -> Result<SqlProjectionRows, InternalError>
where
C: CanisterKind,
{
let page = execute_initial_scalar_rows_for_canister(db, debug, authority, plan)?;
let (slot_rows, projected_rows, rendered_projected_rows, data_rows) = page.into_sql_parts();
let projected = if rendered_projected_rows.is_some() {
return Err(InternalError::query_executor_invariant(
"value SQL projection path must not receive rendered-only projected rows",
));
} else if let Some(projected_rows) = projected_rows {
#[cfg(any(test, feature = "structural-read-metrics"))]
record_sql_projection_projected_rows_path_hit();
projected_rows
} else if let Some(slot_rows) = slot_rows {
#[cfg(any(test, feature = "structural-read-metrics"))]
record_sql_projection_slot_rows_path_hit();
project_slot_rows_from_projection_structural(model, &projection, slot_rows)?
} else {
#[cfg(any(test, feature = "structural-read-metrics"))]
record_sql_projection_data_rows_path_hit();
project_data_rows_from_projection_structural(model, &projection, data_rows.as_slice())?
};
let row_count = u32::try_from(projected.len()).unwrap_or(u32::MAX);
Ok(SqlProjectionRows::new(projected, row_count))
}
#[cfg(feature = "sql")]
pub(in crate::db) fn execute_sql_projection_text_rows_for_canister<C>(
db: &Db<C>,
debug: bool,
model: &'static EntityModel,
projection: ProjectionSpec,
authority: EntityAuthority,
plan: AccessPlannedQuery,
) -> Result<SqlProjectionTextRows, InternalError>
where
C: CanisterKind,
{
let page = execute_initial_scalar_text_rows_for_canister(db, debug, authority, plan)?;
let (slot_rows, projected_rows, rendered_projected_rows, data_rows) = page.into_sql_parts();
let rendered_rows = if let Some(rendered_projected_rows) = rendered_projected_rows {
rendered_projected_rows
} else {
let projected = if let Some(projected_rows) = projected_rows {
#[cfg(any(test, feature = "structural-read-metrics"))]
record_sql_projection_projected_rows_path_hit();
projected_rows
} else if let Some(slot_rows) = slot_rows {
#[cfg(any(test, feature = "structural-read-metrics"))]
record_sql_projection_slot_rows_path_hit();
project_slot_rows_from_projection_structural(model, &projection, slot_rows)?
} else {
#[cfg(any(test, feature = "structural-read-metrics"))]
record_sql_projection_data_rows_path_hit();
project_data_rows_from_projection_structural(model, &projection, data_rows.as_slice())?
};
render_sql_projection_rows_from_values(projected)
};
let row_count = u32::try_from(rendered_rows.len()).unwrap_or(u32::MAX);
Ok(SqlProjectionTextRows::new(rendered_rows, row_count))
}
#[cfg(feature = "sql")]
fn render_sql_projection_rows_from_values(rows: Vec<Vec<Value>>) -> Vec<Vec<String>> {
let mut rendered_rows = Vec::with_capacity(rows.len());
for row in rows {
let rendered_row = row
.iter()
.map(render_sql_projection_value_text)
.collect::<Vec<_>>();
rendered_rows.push(rendered_row);
}
rendered_rows
}
#[cfg(feature = "sql")]
fn render_sql_projection_value_text(value: &Value) -> String {
match value {
Value::Account(v) => v.to_string(),
Value::Blob(v) => render_sql_projection_blob(v.as_slice()),
Value::Bool(v) => v.to_string(),
Value::Date(v) => v.to_string(),
Value::Decimal(v) => v.to_string(),
Value::Duration(v) => render_sql_projection_duration(v.as_millis()),
Value::Enum(v) => render_sql_projection_enum(v),
Value::Float32(v) => v.to_string(),
Value::Float64(v) => v.to_string(),
Value::Int(v) => v.to_string(),
Value::Int128(v) => v.to_string(),
Value::IntBig(v) => v.to_string(),
Value::List(items) => render_sql_projection_list(items.as_slice()),
Value::Map(entries) => render_sql_projection_map(entries.as_slice()),
Value::Null => "null".to_string(),
Value::Principal(v) => v.to_string(),
Value::Subaccount(v) => v.to_string(),
Value::Text(v) => v.clone(),
Value::Timestamp(v) => v.as_millis().to_string(),
Value::Uint(v) => v.to_string(),
Value::Uint128(v) => v.to_string(),
Value::UintBig(v) => v.to_string(),
Value::Ulid(v) => v.to_string(),
Value::Unit => "()".to_string(),
}
}
#[cfg(feature = "sql")]
fn render_sql_projection_blob(bytes: &[u8]) -> String {
let mut rendered = String::from("0x");
rendered.push_str(sql_projection_hex_encode(bytes).as_str());
rendered
}
#[cfg(feature = "sql")]
fn render_sql_projection_duration(millis: u64) -> String {
let mut rendered = millis.to_string();
rendered.push_str("ms");
rendered
}
#[cfg(feature = "sql")]
fn render_sql_projection_list(items: &[Value]) -> String {
let mut rendered = String::from("[");
for (index, item) in items.iter().enumerate() {
if index != 0 {
rendered.push_str(", ");
}
rendered.push_str(render_sql_projection_value_text(item).as_str());
}
rendered.push(']');
rendered
}
#[cfg(feature = "sql")]
fn render_sql_projection_map(entries: &[(Value, Value)]) -> String {
let mut rendered = String::from("{");
for (index, (key, value)) in entries.iter().enumerate() {
if index != 0 {
rendered.push_str(", ");
}
rendered.push_str(render_sql_projection_value_text(key).as_str());
rendered.push_str(": ");
rendered.push_str(render_sql_projection_value_text(value).as_str());
}
rendered.push('}');
rendered
}
#[cfg(feature = "sql")]
fn sql_projection_hex_encode(bytes: &[u8]) -> String {
const HEX: &[u8; 16] = b"0123456789abcdef";
let mut out = String::with_capacity(bytes.len().saturating_mul(2));
for byte in bytes {
out.push(HEX[(byte >> 4) as usize] as char);
out.push(HEX[(byte & 0x0f) as usize] as char);
}
out
}
#[cfg(feature = "sql")]
fn render_sql_projection_enum(value: &ValueEnum) -> String {
let mut rendered = String::new();
if let Some(path) = value.path() {
rendered.push_str(path);
rendered.push_str("::");
}
rendered.push_str(value.variant());
if let Some(payload) = value.payload() {
rendered.push('(');
rendered.push_str(render_sql_projection_value_text(payload).as_str());
rendered.push(')');
}
rendered
}
fn project_slot_rows_from_projection_structural(
model: &'static EntityModel,
projection: &ProjectionSpec,
rows: Vec<RetainedSlotRow>,
) -> Result<Vec<Vec<Value>>, InternalError> {
if let Some(field_slots) = direct_projection_field_slots(model, projection) {
return project_slot_rows_from_direct_field_slots(rows, field_slots.as_slice());
}
project_dense_slot_rows_from_projection_structural(model, projection, rows)
}
#[cfg(feature = "sql")]
fn project_dense_slot_rows_from_projection_structural(
model: &'static EntityModel,
projection: &ProjectionSpec,
rows: Vec<RetainedSlotRow>,
) -> Result<Vec<Vec<Value>>, InternalError> {
let prepared = prepare_projection_plan(model, projection);
let mut projected_rows = Vec::with_capacity(rows.len());
for row in &rows {
let mut values = Vec::with_capacity(projection.len());
let mut read_slot = |slot: usize| row.slot(slot);
visit_prepared_projection_values_with_value_reader(
&prepared,
projection,
model,
&mut read_slot,
&mut |value| values.push(value),
)
.map_err(
crate::db::executor::projection::ProjectionEvalError::into_invalid_logical_plan_internal_error,
)?;
projected_rows.push(values);
}
Ok(projected_rows)
}
#[cfg(feature = "sql")]
fn project_slot_rows_from_direct_field_slots(
rows: Vec<RetainedSlotRow>,
field_slots: &[(String, usize)],
) -> Result<Vec<Vec<Value>>, InternalError> {
let mut projected_rows = Vec::with_capacity(rows.len());
for mut row in rows {
let mut values = Vec::with_capacity(field_slots.len());
for (field_name, slot) in field_slots {
let value = row
.take_slot(*slot)
.ok_or_else(|| ProjectionEvalError::MissingFieldValue {
field: field_name.clone(),
index: *slot,
})
.map_err(ProjectionEvalError::into_invalid_logical_plan_internal_error)?;
values.push(value);
}
projected_rows.push(values);
}
Ok(projected_rows)
}
#[cfg(feature = "sql")]
fn project_data_rows_from_projection_structural(
model: &'static EntityModel,
projection: &ProjectionSpec,
rows: &[DataRow],
) -> Result<Vec<Vec<Value>>, InternalError> {
let projected_slot_mask = direct_projection_slot_mask(model, projection);
match prepare_projection_plan(model, projection) {
super::PreparedProjectionPlan::Generic => {
#[cfg(any(test, feature = "structural-read-metrics"))]
record_sql_projection_data_rows_generic_fallback_hit();
project_generic_data_rows_from_projection_structural(
model,
projection,
rows,
projected_slot_mask.as_slice(),
)
}
super::PreparedProjectionPlan::Scalar(compiled_fields) => {
#[cfg(any(test, feature = "structural-read-metrics"))]
record_sql_projection_data_rows_scalar_fallback_hit();
project_scalar_data_rows_from_projection_structural(
compiled_fields.as_slice(),
rows,
model,
projected_slot_mask.as_slice(),
)
}
}
}
#[cfg(feature = "sql")]
fn project_scalar_data_rows_from_projection_structural(
compiled_fields: &[crate::db::executor::projection::ScalarProjectionExpr],
rows: &[DataRow],
model: &'static EntityModel,
projected_slot_mask: &[bool],
) -> Result<Vec<Vec<Value>>, InternalError> {
let mut projected_rows = Vec::with_capacity(rows.len());
#[cfg(not(any(test, feature = "structural-read-metrics")))]
let _ = projected_slot_mask;
for (data_key, raw_row) in rows {
let row_fields = StructuralSlotReader::from_raw_row(raw_row, model)?;
row_fields.validate_storage_key(data_key)?;
let mut values = Vec::with_capacity(compiled_fields.len());
for compiled in compiled_fields {
let value = eval_canonical_scalar_projection_expr_with_required_value_reader(
compiled,
&mut |slot| {
#[cfg(any(test, feature = "structural-read-metrics"))]
record_sql_projection_data_rows_slot_access(
projected_slot_mask.get(slot).copied().unwrap_or(false),
);
row_fields.required_value_by_contract(slot)
},
)?;
values.push(value);
}
projected_rows.push(values);
}
Ok(projected_rows)
}
#[cfg(feature = "sql")]
fn project_generic_data_rows_from_projection_structural(
model: &'static EntityModel,
projection: &ProjectionSpec,
rows: &[DataRow],
projected_slot_mask: &[bool],
) -> Result<Vec<Vec<Value>>, InternalError> {
let mut projected_rows = Vec::with_capacity(rows.len());
#[cfg(not(any(test, feature = "structural-read-metrics")))]
let _ = projected_slot_mask;
for (data_key, raw_row) in rows {
let row_fields = StructuralSlotReader::from_raw_row(raw_row, model)?;
row_fields.validate_storage_key(data_key)?;
let mut values = Vec::with_capacity(projection.len());
let mut slot_cache: Vec<Option<Value>> = vec![None; model.fields().len()];
let mut read_slot = |slot: usize| {
#[cfg(any(test, feature = "structural-read-metrics"))]
record_sql_projection_data_rows_slot_access(
projected_slot_mask.get(slot).copied().unwrap_or(false),
);
if slot_cache[slot].is_none() {
slot_cache[slot] = Some(row_fields.required_value_by_contract(slot)?);
}
slot_cache[slot].clone().ok_or_else(|| {
InternalError::executor_internal(format!(
"structural projection slot cache missing decoded value: slot={slot}",
))
})
};
visit_projection_values_with_required_value_reader(
projection,
model,
&mut read_slot,
&mut |value| values.push(value),
)?;
projected_rows.push(values);
}
Ok(projected_rows)
}
#[cfg(feature = "sql")]
fn direct_projection_slot_mask(
model: &'static EntityModel,
projection: &ProjectionSpec,
) -> Vec<bool> {
let mut projected_slots = vec![false; model.fields().len()];
for field in projection.fields() {
let Some(field_name) = projection_field_direct_field_name(field) else {
continue;
};
let Some(slot) = resolve_field_slot(model, field_name) else {
continue;
};
projected_slots[slot] = true;
}
projected_slots
}
#[cfg(any(test, feature = "structural-read-metrics"))]
#[cfg_attr(
all(test, not(feature = "structural-read-metrics")),
allow(unreachable_pub)
)]
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub struct SqlProjectionMaterializationMetrics {
pub projected_rows_path_hits: u64,
pub slot_rows_path_hits: u64,
pub data_rows_path_hits: u64,
pub data_rows_scalar_fallback_hits: u64,
pub data_rows_generic_fallback_hits: u64,
pub data_rows_projected_slot_accesses: u64,
pub data_rows_non_projected_slot_accesses: u64,
pub full_row_decode_materializations: u64,
}
#[cfg(any(test, feature = "structural-read-metrics"))]
std::thread_local! {
static SQL_PROJECTION_MATERIALIZATION_METRICS: RefCell<Option<SqlProjectionMaterializationMetrics>> = const {
RefCell::new(None)
};
}
#[cfg(any(test, feature = "structural-read-metrics"))]
fn update_sql_projection_materialization_metrics(
update: impl FnOnce(&mut SqlProjectionMaterializationMetrics),
) {
SQL_PROJECTION_MATERIALIZATION_METRICS.with(|metrics| {
let mut metrics = metrics.borrow_mut();
let Some(metrics) = metrics.as_mut() else {
return;
};
update(metrics);
});
}
#[cfg(any(test, feature = "structural-read-metrics"))]
fn record_sql_projection_projected_rows_path_hit() {
update_sql_projection_materialization_metrics(|metrics| {
metrics.projected_rows_path_hits = metrics.projected_rows_path_hits.saturating_add(1);
});
}
#[cfg(any(test, feature = "structural-read-metrics"))]
fn record_sql_projection_slot_rows_path_hit() {
update_sql_projection_materialization_metrics(|metrics| {
metrics.slot_rows_path_hits = metrics.slot_rows_path_hits.saturating_add(1);
});
}
#[cfg(any(test, feature = "structural-read-metrics"))]
fn record_sql_projection_data_rows_path_hit() {
update_sql_projection_materialization_metrics(|metrics| {
metrics.data_rows_path_hits = metrics.data_rows_path_hits.saturating_add(1);
});
}
#[cfg(any(test, feature = "structural-read-metrics"))]
fn record_sql_projection_data_rows_scalar_fallback_hit() {
update_sql_projection_materialization_metrics(|metrics| {
metrics.data_rows_scalar_fallback_hits =
metrics.data_rows_scalar_fallback_hits.saturating_add(1);
});
}
#[cfg(any(test, feature = "structural-read-metrics"))]
fn record_sql_projection_data_rows_generic_fallback_hit() {
update_sql_projection_materialization_metrics(|metrics| {
metrics.data_rows_generic_fallback_hits =
metrics.data_rows_generic_fallback_hits.saturating_add(1);
});
}
#[cfg(any(test, feature = "structural-read-metrics"))]
fn record_sql_projection_data_rows_slot_access(projected_slot: bool) {
update_sql_projection_materialization_metrics(|metrics| {
if projected_slot {
metrics.data_rows_projected_slot_accesses =
metrics.data_rows_projected_slot_accesses.saturating_add(1);
} else {
metrics.data_rows_non_projected_slot_accesses = metrics
.data_rows_non_projected_slot_accesses
.saturating_add(1);
}
});
}
#[cfg(any(test, feature = "structural-read-metrics"))]
pub(in crate::db::executor) fn record_sql_projection_full_row_decode_materialization() {
update_sql_projection_materialization_metrics(|metrics| {
metrics.full_row_decode_materializations =
metrics.full_row_decode_materializations.saturating_add(1);
});
}
#[cfg(any(test, feature = "structural-read-metrics"))]
#[cfg_attr(
all(test, not(feature = "structural-read-metrics")),
allow(dead_code, unreachable_pub)
)]
pub fn with_sql_projection_materialization_metrics<T>(
f: impl FnOnce() -> T,
) -> (T, SqlProjectionMaterializationMetrics) {
SQL_PROJECTION_MATERIALIZATION_METRICS.with(|metrics| {
debug_assert!(
metrics.borrow().is_none(),
"sql projection metrics captures should not nest"
);
*metrics.borrow_mut() = Some(SqlProjectionMaterializationMetrics::default());
});
let result = f();
let metrics = SQL_PROJECTION_MATERIALIZATION_METRICS
.with(|metrics| metrics.borrow_mut().take().unwrap_or_default());
(result, metrics)
}