use crate::{
db::{
data::{
RawRow, SparseRequiredRowFieldBytes, StructuralFieldDecodeContract,
StructuralRowContract, StructuralRowDecodeError, StructuralRowFieldBytes,
persisted_row::{
codec::{ScalarSlotValueRef, decode_scalar_slot_value},
contract::{
decode_runtime_value_from_accepted_field_contract,
decode_runtime_value_from_field_contract, validate_non_scalar_slot_value,
},
reader::{
metrics::{StructuralReadProbe, finish_direct_probe},
primary_key::{
materialize_primary_key_slot_value_from_expected_key,
materialize_primary_key_slot_value_from_expected_key_with_accepted_field,
validate_storage_key_from_field_bytes,
validate_storage_key_from_primary_key_bytes_with_contract,
},
},
},
},
schema::AcceptedFieldDecodeContract,
},
error::InternalError,
model::field::LeafCodec,
value::{StorageKey, Value},
};
struct DirectStructuralRowFields<'a> {
contract: StructuralRowContract,
expected_key: StorageKey,
field_bytes: StructuralRowFieldBytes<'a>,
}
impl<'a> DirectStructuralRowFields<'a> {
fn open(
raw_row: &'a RawRow,
contract: StructuralRowContract,
expected_key: StorageKey,
) -> Result<Self, InternalError> {
let field_bytes =
StructuralRowFieldBytes::from_raw_row_with_contract(raw_row, contract.clone())
.map_err(StructuralRowDecodeError::into_internal_error)?;
validate_storage_key_from_field_bytes(contract.clone(), &field_bytes, expected_key)?;
Ok(Self {
contract,
expected_key,
field_bytes,
})
}
fn decode_slot(
&self,
slot: usize,
probe: &StructuralReadProbe,
) -> Result<Value, InternalError> {
decode_selected_slot_value(
self.contract.clone(),
&self.field_bytes,
slot,
self.expected_key,
probe,
)
}
}
struct DirectSparseRequiredRowField<'a> {
contract: StructuralRowContract,
expected_key: StorageKey,
required_slot: usize,
field_bytes: SparseRequiredRowFieldBytes<'a>,
}
impl<'a> DirectSparseRequiredRowField<'a> {
fn open(
raw_row: &'a RawRow,
contract: StructuralRowContract,
expected_key: StorageKey,
required_slot: usize,
) -> Result<Self, InternalError> {
let field_bytes = SparseRequiredRowFieldBytes::from_raw_row_with_contract(
raw_row,
contract.clone(),
required_slot,
)
.map_err(StructuralRowDecodeError::into_internal_error)?;
validate_storage_key_from_primary_key_bytes_with_contract(
&contract,
field_bytes.primary_key_field(),
expected_key,
)?;
Ok(Self {
contract,
expected_key,
required_slot,
field_bytes,
})
}
fn decode(&self, probe: &StructuralReadProbe) -> Result<Value, InternalError> {
let Some(raw_value) = self.field_bytes.required_field() else {
return self.contract.missing_slot_value(self.required_slot);
};
decode_slot_with_contract(
&self.contract,
self.required_slot,
raw_value,
self.expected_key,
self.required_slot == self.contract.primary_key_slot(),
probe,
)
}
}
pub(in crate::db) fn decode_dense_raw_row_with_contract(
raw_row: &RawRow,
contract: StructuralRowContract,
expected_key: StorageKey,
) -> Result<Vec<Option<Value>>, InternalError> {
let fields = DirectStructuralRowFields::open(raw_row, contract.clone(), expected_key)?;
let mut values = Vec::with_capacity(contract.field_count());
let probe = StructuralReadProbe::begin(contract.field_count());
for slot in 0..contract.field_count() {
values.push(Some(fields.decode_slot(slot, &probe)?));
}
Ok(values)
}
pub(in crate::db) fn decode_sparse_raw_row_with_contract(
raw_row: &RawRow,
contract: StructuralRowContract,
expected_key: StorageKey,
required_slots: &[usize],
) -> Result<Vec<Option<Value>>, InternalError> {
let fields = DirectStructuralRowFields::open(raw_row, contract.clone(), expected_key)?;
let mut values = vec![None; contract.field_count()];
let probe = StructuralReadProbe::begin(contract.field_count());
for &slot in required_slots {
values[slot] = Some(fields.decode_slot(slot, &probe)?);
}
finish_direct_probe(&probe);
Ok(values)
}
pub(in crate::db) fn decode_sparse_indexed_raw_row_with_contract(
raw_row: &RawRow,
contract: StructuralRowContract,
expected_key: StorageKey,
required_slots: &[usize],
) -> Result<Vec<Option<Value>>, InternalError> {
let fields = DirectStructuralRowFields::open(raw_row, contract.clone(), expected_key)?;
let mut values = Vec::with_capacity(required_slots.len());
let probe = StructuralReadProbe::begin(contract.field_count());
for &slot in required_slots {
values.push(Some(fields.decode_slot(slot, &probe)?));
}
finish_direct_probe(&probe);
Ok(values)
}
pub(in crate::db) fn decode_sparse_required_slot_with_contract(
raw_row: &RawRow,
contract: StructuralRowContract,
expected_key: StorageKey,
required_slot: usize,
) -> Result<Option<Value>, InternalError> {
decode_sparse_required_slot(raw_row, contract, expected_key, required_slot)
}
fn decode_sparse_required_slot(
raw_row: &RawRow,
contract: StructuralRowContract,
expected_key: StorageKey,
required_slot: usize,
) -> Result<Option<Value>, InternalError> {
let field =
DirectSparseRequiredRowField::open(raw_row, contract.clone(), expected_key, required_slot)?;
let probe = StructuralReadProbe::begin(contract.field_count());
let value = field.decode(&probe)?;
finish_direct_probe(&probe);
Ok(Some(value))
}
fn decode_selected_slot_value(
contract: StructuralRowContract,
field_bytes: &StructuralRowFieldBytes<'_>,
slot: usize,
expected_key: StorageKey,
probe: &StructuralReadProbe,
) -> Result<Value, InternalError> {
let Some(raw_value) = field_bytes.field(slot) else {
return contract.missing_slot_value(slot);
};
decode_slot_with_contract(
&contract,
slot,
raw_value,
expected_key,
slot == contract.primary_key_slot(),
probe,
)
}
fn decode_slot_with_contract(
contract: &StructuralRowContract,
slot: usize,
raw_value: &[u8],
expected_key: StorageKey,
is_primary: bool,
probe: &StructuralReadProbe,
) -> Result<Value, InternalError> {
if is_primary {
if let Some(accepted_field) = contract.accepted_field_decode_contract(slot) {
return materialize_primary_key_slot_value_from_expected_key_with_accepted_field(
accepted_field,
expected_key,
probe,
);
}
} else if let Some(accepted_field) = contract.accepted_field_decode_contract(slot) {
return decode_slot_with_accepted_field(accepted_field, raw_value, probe);
}
let field = contract.field_decode_contract(slot)?;
decode_slot_with_field(field, raw_value, expected_key, is_primary, probe)
}
fn decode_slot_with_accepted_field(
field: AcceptedFieldDecodeContract<'_>,
raw_value: &[u8],
probe: &StructuralReadProbe,
) -> Result<Value, InternalError> {
match field.leaf_codec() {
LeafCodec::Scalar(codec) => {
probe.record_validated_slot();
match decode_scalar_slot_value(raw_value, codec, field.field_name())? {
ScalarSlotValueRef::Null => Ok(Value::Null),
ScalarSlotValueRef::Value(value) => Ok(value.into_value()),
}
}
LeafCodec::StructuralFallback => {
probe.record_validated_slot();
probe.record_validated_non_scalar();
probe.record_materialized_non_scalar();
decode_runtime_value_from_accepted_field_contract(field, raw_value)
}
}
}
fn decode_slot_with_field(
field: StructuralFieldDecodeContract,
raw_value: &[u8],
expected_key: StorageKey,
is_primary: bool,
probe: &StructuralReadProbe,
) -> Result<Value, InternalError> {
if is_primary {
return materialize_primary_key_slot_value_from_expected_key(field, expected_key, probe);
}
match field.leaf_codec() {
LeafCodec::Scalar(codec) => {
probe.record_validated_slot();
match decode_scalar_slot_value(raw_value, codec, field.name())? {
ScalarSlotValueRef::Null => Ok(Value::Null),
ScalarSlotValueRef::Value(value) => Ok(value.into_value()),
}
}
LeafCodec::StructuralFallback => {
probe.record_validated_slot();
probe.record_validated_non_scalar();
probe.record_materialized_non_scalar();
validate_non_scalar_slot_value(raw_value, field)?;
decode_runtime_value_from_field_contract(field, raw_value)
}
}
}