mod cache;
mod direct;
mod metrics;
mod primary_key;
use crate::{
db::data::{
DataKey, RawRow, StructuralRowContract, StructuralRowDecodeError, StructuralRowFieldBytes,
persisted_row::{
codec::{ScalarSlotValueRef, decode_scalar_slot_value},
contract::{decode_slot_value_for_field, validate_non_scalar_slot_value},
reader::{
cache::{
ValidatedScalarSlotValue, build_initial_slot_cache,
materialize_validated_scalar_slot_value, scalar_slot_value_ref_from_validated,
validated_scalar_slot_value,
},
primary_key::validate_storage_key_from_primary_key_bytes_with_field,
},
types::{CanonicalSlotReader, SlotReader},
},
},
error::InternalError,
model::{
entity::EntityModel,
field::{FieldModel, LeafCodec},
},
value::{StorageKey, Value},
};
use std::{borrow::Cow, cell::OnceCell};
pub(in crate::db::data::persisted_row) use cache::CachedSlotValue;
pub(in crate::db) use direct::{
decode_dense_raw_row_with_contract, decode_sparse_indexed_raw_row_with_contract,
decode_sparse_raw_row_with_contract, decode_sparse_required_slot_with_contract,
decode_sparse_required_slot_with_contract_and_fields,
};
#[cfg(feature = "diagnostics")]
#[cfg_attr(all(test, not(feature = "diagnostics")), allow(unreachable_pub))]
pub use metrics::{StructuralReadMetrics, with_structural_read_metrics};
#[cfg(all(test, not(feature = "diagnostics")))]
pub(crate) use metrics::{StructuralReadMetrics, with_structural_read_metrics};
pub(in crate::db) struct StructuralSlotReader<'a> {
model: Option<&'static EntityModel>,
contract: StructuralRowContract,
field_bytes: StructuralRowFieldBytes<'a>,
pub(in crate::db::data::persisted_row) cached_values: Vec<CachedSlotValue>,
#[cfg(any(test, feature = "diagnostics"))]
metrics: metrics::StructuralReadProbe,
}
impl<'a> StructuralSlotReader<'a> {
pub(in crate::db) fn from_raw_row(
raw_row: &'a RawRow,
model: &'static EntityModel,
) -> Result<Self, InternalError> {
let reader = Self::from_raw_row_with_model(raw_row, model)?;
reader.validate_all_declared_slots()?;
Ok(reader)
}
pub(in crate::db) fn from_raw_row_with_contract(
raw_row: &'a RawRow,
contract: StructuralRowContract,
) -> Result<Self, InternalError> {
let field_bytes = StructuralRowFieldBytes::from_raw_row_with_contract(raw_row, contract)
.map_err(StructuralRowDecodeError::into_internal_error)?;
let reader = Self {
model: None,
contract,
field_bytes,
cached_values: build_initial_slot_cache(contract),
#[cfg(any(test, feature = "diagnostics"))]
metrics: metrics::StructuralReadProbe::begin(contract.field_count()),
};
Ok(reader)
}
fn from_raw_row_with_model(
raw_row: &'a RawRow,
model: &'static EntityModel,
) -> Result<Self, InternalError> {
let contract = StructuralRowContract::from_model(model);
let field_bytes = StructuralRowFieldBytes::from_raw_row_with_contract(raw_row, contract)
.map_err(StructuralRowDecodeError::into_internal_error)?;
let reader = Self {
model: Some(model),
contract,
field_bytes,
cached_values: build_initial_slot_cache(contract),
#[cfg(any(test, feature = "diagnostics"))]
metrics: metrics::StructuralReadProbe::begin(contract.field_count()),
};
Ok(reader)
}
#[must_use]
pub(in crate::db) const fn model(&self) -> &'static EntityModel {
self.model
.expect("model-backed structural slot reader required by typed slot-reader seam")
}
pub(in crate::db) fn validate_storage_key(
&self,
data_key: &DataKey,
) -> Result<(), InternalError> {
self.validate_storage_key_value(data_key.storage_key())
}
fn validate_storage_key_value(&self, expected_key: StorageKey) -> Result<(), InternalError> {
let primary_key_slot = self.model.map_or_else(
|| self.contract.primary_key_slot(),
EntityModel::primary_key_slot,
);
let field = self.field_model(primary_key_slot)?;
if let (LeafCodec::Scalar(_), Some(CachedSlotValue::Scalar { validated, .. })) =
(field.leaf_codec(), self.cached_values.get(primary_key_slot))
{
let _ = self.required_validated_scalar_slot_value(primary_key_slot, validated)?;
}
let raw_value = self.required_field_bytes(primary_key_slot, field.name())?;
validate_storage_key_from_primary_key_bytes_with_field(raw_value, field, expected_key)
}
fn field_model(&self, slot: usize) -> Result<&FieldModel, InternalError> {
self.contract.fields().get(slot).ok_or_else(|| {
InternalError::persisted_row_slot_lookup_out_of_bounds(
self.contract.entity_path(),
slot,
)
})
}
fn required_validated_scalar_slot_value(
&self,
slot: usize,
validated: &OnceCell<ValidatedScalarSlotValue>,
) -> Result<ValidatedScalarSlotValue, InternalError> {
if let Some(validated) = validated.get() {
return Ok(*validated);
}
let field = self.field_model(slot)?;
let raw_value = self.required_field_bytes(slot, field.name())?;
let LeafCodec::Scalar(codec) = field.leaf_codec() else {
return Err(InternalError::persisted_row_decode_failed(format!(
"validated scalar cache routed through non-scalar field contract: slot={slot}",
)));
};
#[cfg(any(test, feature = "diagnostics"))]
self.metrics.record_validated_slot();
let validated_value =
validated_scalar_slot_value(decode_scalar_slot_value(raw_value, codec, field.name())?);
let _ = validated.set(validated_value);
Ok(validated_value)
}
pub(in crate::db::data::persisted_row) fn required_cached_value(
&self,
slot: usize,
) -> Result<&Value, InternalError> {
let cached = self.cached_values.get(slot).ok_or_else(|| {
InternalError::persisted_row_slot_cache_lookup_out_of_bounds(
self.contract.entity_path(),
slot,
)
})?;
match cached {
CachedSlotValue::Scalar {
validated,
materialized,
} => {
let validated = self.required_validated_scalar_slot_value(slot, validated)?;
if materialized.get().is_none() {
let value = materialize_validated_scalar_slot_value(
validated,
self.contract,
&self.field_bytes,
slot,
)?;
let _ = materialized.set(value);
}
materialized.get().ok_or_else(|| {
InternalError::persisted_row_decode_failed(format!(
"structural scalar cache failed to materialize deferred value: slot={slot}",
))
})
}
CachedSlotValue::Deferred { materialized } => {
let field = self.field_model(slot)?;
let raw_value = self.required_field_bytes(slot, field.name())?;
if materialized.get().is_none() {
#[cfg(any(test, feature = "diagnostics"))]
{
self.metrics.record_validated_slot();
self.metrics.record_validated_non_scalar();
self.metrics.record_materialized_non_scalar();
}
validate_non_scalar_slot_value(raw_value, field)?;
let value = decode_slot_value_for_field(field, raw_value)?;
let _ = materialized.set(value);
}
materialized.get().ok_or_else(|| {
InternalError::persisted_row_decode_failed(format!(
"structural slot cache failed to materialize deferred value: slot={slot}",
))
})
}
}
}
pub(in crate::db) fn required_field_bytes(
&self,
slot: usize,
field_name: &str,
) -> Result<&[u8], InternalError> {
self.field_bytes
.field(slot)
.ok_or_else(|| InternalError::persisted_row_declared_field_missing(field_name))
}
fn validate_all_declared_slots(&self) -> Result<(), InternalError> {
for (slot, field) in self.contract.fields().iter().enumerate() {
let raw_value = self.required_field_bytes(slot, field.name())?;
match field.leaf_codec() {
LeafCodec::Scalar(codec) => {
#[cfg(any(test, feature = "diagnostics"))]
self.metrics.record_validated_slot();
decode_scalar_slot_value(raw_value, codec, field.name())?;
}
LeafCodec::StructuralFallback => {
#[cfg(any(test, feature = "diagnostics"))]
{
self.metrics.record_validated_slot();
self.metrics.record_validated_non_scalar();
}
validate_non_scalar_slot_value(raw_value, field)?;
}
}
}
Ok(())
}
}
impl SlotReader for StructuralSlotReader<'_> {
fn model(&self) -> &'static EntityModel {
self.model()
}
fn has(&self, slot: usize) -> bool {
self.field_bytes.field(slot).is_some()
}
fn get_bytes(&self, slot: usize) -> Option<&[u8]> {
self.field_bytes.field(slot)
}
fn get_scalar(&self, slot: usize) -> Result<Option<ScalarSlotValueRef<'_>>, InternalError> {
let field = self.field_model(slot)?;
match field.leaf_codec() {
LeafCodec::Scalar(_codec) => match self.cached_values.get(slot) {
Some(CachedSlotValue::Scalar { validated, .. }) => {
let validated = self.required_validated_scalar_slot_value(slot, validated)?;
scalar_slot_value_ref_from_validated(
validated,
self.contract,
&self.field_bytes,
slot,
)
.map(Some)
}
Some(CachedSlotValue::Deferred { .. }) => {
Err(InternalError::persisted_row_decode_failed(format!(
"structural scalar slot routed through non-scalar cache variant: slot={slot}",
)))
}
None => Err(
InternalError::persisted_row_slot_cache_lookup_out_of_bounds(
self.contract.entity_path(),
slot,
),
),
},
LeafCodec::StructuralFallback => Ok(None),
}
}
fn get_value(&mut self, slot: usize) -> Result<Option<Value>, InternalError> {
Ok(Some(self.required_cached_value(slot)?.clone()))
}
}
impl CanonicalSlotReader for StructuralSlotReader<'_> {
fn required_bytes(&self, slot: usize) -> Result<&[u8], InternalError> {
let field = self.field_model(slot)?;
self.get_bytes(slot)
.ok_or_else(|| InternalError::persisted_row_declared_field_missing(field.name()))
}
fn required_scalar(&self, slot: usize) -> Result<ScalarSlotValueRef<'_>, InternalError> {
let field = self.field_model(slot)?;
debug_assert!(matches!(field.leaf_codec(), LeafCodec::Scalar(_)));
self.get_scalar(slot)?
.ok_or_else(|| InternalError::persisted_row_declared_field_missing(field.name()))
}
fn required_value_by_contract(&self, slot: usize) -> Result<Value, InternalError> {
Ok(self.required_cached_value(slot)?.clone())
}
fn required_value_by_contract_cow(&self, slot: usize) -> Result<Cow<'_, Value>, InternalError> {
Ok(Cow::Borrowed(self.required_cached_value(slot)?))
}
}