use crate::{
db::{
data::{
RawRow, SparseRequiredRowFieldBytes, StructuralFieldDecodeContract,
StructuralRowContract, StructuralRowDecodeError, StructuralRowFieldBytes,
persisted_row::{
codec::{ScalarSlotValueRef, decode_scalar_slot_value},
contract::{
decode_runtime_value_from_accepted_field_contract,
decode_runtime_value_from_field_contract, validate_non_scalar_slot_value,
},
reader::{
metrics::{StructuralReadProbe, finish_direct_probe},
primary_key::{
materialize_primary_key_slot_value_from_expected_component,
materialize_primary_key_slot_value_from_expected_component_with_accepted_field,
validate_primary_key_component_from_slot_bytes_with_contract,
validate_primary_key_value_from_field_bytes,
},
},
},
},
key_taxonomy::{PrimaryKeyComponent, PrimaryKeyValue},
schema::AcceptedFieldDecodeContract,
},
error::InternalError,
model::field::LeafCodec,
value::Value,
};
struct DirectStructuralRowFields<'a> {
contract: StructuralRowContract,
expected_key: PrimaryKeyValue,
field_bytes: StructuralRowFieldBytes<'a>,
}
impl<'a> DirectStructuralRowFields<'a> {
fn open(
raw_row: &'a RawRow,
contract: StructuralRowContract,
expected_key: &PrimaryKeyValue,
) -> Result<Self, InternalError> {
let field_bytes =
StructuralRowFieldBytes::from_raw_row_with_contract(raw_row, contract.clone())
.map_err(StructuralRowDecodeError::into_internal_error)?;
validate_primary_key_value_from_field_bytes(contract.clone(), &field_bytes, expected_key)?;
Ok(Self {
contract,
expected_key: *expected_key,
field_bytes,
})
}
fn decode_slot(
&self,
slot: usize,
probe: &StructuralReadProbe,
) -> Result<Value, InternalError> {
decode_selected_slot_value(
self.contract.clone(),
&self.field_bytes,
slot,
&self.expected_key,
probe,
)
}
}
struct DirectSparseRequiredRowField<'a> {
contract: StructuralRowContract,
expected_key: PrimaryKeyComponent,
required_slot: usize,
field_bytes: SparseRequiredRowFieldBytes<'a>,
}
impl<'a> DirectSparseRequiredRowField<'a> {
fn open(
raw_row: &'a RawRow,
contract: StructuralRowContract,
expected_key: PrimaryKeyComponent,
required_slot: usize,
) -> Result<Self, InternalError> {
let field_bytes = SparseRequiredRowFieldBytes::from_raw_row_with_contract(
raw_row,
contract.clone(),
required_slot,
)
.map_err(StructuralRowDecodeError::into_internal_error)?;
validate_primary_key_component_from_slot_bytes_with_contract(
&contract,
contract.primary_key_slot(),
field_bytes.primary_key_field(),
expected_key,
)?;
Ok(Self {
contract,
expected_key,
required_slot,
field_bytes,
})
}
fn decode(&self, probe: &StructuralReadProbe) -> Result<Value, InternalError> {
let Some(raw_value) = self.field_bytes.required_field() else {
return self.contract.missing_slot_value(self.required_slot);
};
decode_slot_with_contract(
&self.contract,
self.required_slot,
raw_value,
(self.required_slot == self.contract.primary_key_slot()).then_some(self.expected_key),
probe,
)
}
}
pub(in crate::db) fn decode_dense_raw_row_with_contract(
raw_row: &RawRow,
contract: StructuralRowContract,
expected_key: &PrimaryKeyValue,
) -> Result<Vec<Option<Value>>, InternalError> {
let fields = DirectStructuralRowFields::open(raw_row, contract.clone(), expected_key)?;
let mut values = Vec::with_capacity(contract.field_count());
let probe = StructuralReadProbe::begin(contract.field_count());
for slot in 0..contract.field_count() {
if contract.has_active_field_slot(slot) {
values.push(Some(fields.decode_slot(slot, &probe)?));
} else {
values.push(None);
}
}
Ok(values)
}
pub(in crate::db) fn decode_sparse_raw_row_with_contract(
raw_row: &RawRow,
contract: StructuralRowContract,
expected_key: &PrimaryKeyValue,
required_slots: &[usize],
) -> Result<Vec<Option<Value>>, InternalError> {
let fields = DirectStructuralRowFields::open(raw_row, contract.clone(), expected_key)?;
let mut values = vec![None; contract.field_count()];
let probe = StructuralReadProbe::begin(contract.field_count());
for &slot in required_slots {
values[slot] = Some(fields.decode_slot(slot, &probe)?);
}
finish_direct_probe(&probe);
Ok(values)
}
pub(in crate::db) fn decode_sparse_indexed_raw_row_with_contract(
raw_row: &RawRow,
contract: StructuralRowContract,
expected_key: &PrimaryKeyValue,
required_slots: &[usize],
) -> Result<Vec<Option<Value>>, InternalError> {
let fields = DirectStructuralRowFields::open(raw_row, contract.clone(), expected_key)?;
let mut values = Vec::with_capacity(required_slots.len());
let probe = StructuralReadProbe::begin(contract.field_count());
for &slot in required_slots {
values.push(Some(fields.decode_slot(slot, &probe)?));
}
finish_direct_probe(&probe);
Ok(values)
}
pub(in crate::db) fn decode_sparse_required_slot_with_contract(
raw_row: &RawRow,
contract: StructuralRowContract,
expected_key: &PrimaryKeyValue,
required_slot: usize,
) -> Result<Option<Value>, InternalError> {
decode_sparse_required_slot(raw_row, contract, expected_key, required_slot)
}
fn decode_sparse_required_slot(
raw_row: &RawRow,
contract: StructuralRowContract,
expected_key: &PrimaryKeyValue,
required_slot: usize,
) -> Result<Option<Value>, InternalError> {
if matches!(expected_key, PrimaryKeyValue::Composite(_)) {
let fields = DirectStructuralRowFields::open(raw_row, contract.clone(), expected_key)?;
let probe = StructuralReadProbe::begin(contract.field_count());
let value = fields.decode_slot(required_slot, &probe)?;
finish_direct_probe(&probe);
return Ok(Some(value));
}
let PrimaryKeyValue::Scalar(expected_key) = *expected_key else {
unreachable!("persisted row invariant");
};
let field =
DirectSparseRequiredRowField::open(raw_row, contract.clone(), expected_key, required_slot)?;
let probe = StructuralReadProbe::begin(contract.field_count());
let value = field.decode(&probe)?;
finish_direct_probe(&probe);
Ok(Some(value))
}
fn decode_selected_slot_value(
contract: StructuralRowContract,
field_bytes: &StructuralRowFieldBytes<'_>,
slot: usize,
expected_key: &PrimaryKeyValue,
probe: &StructuralReadProbe,
) -> Result<Value, InternalError> {
let Some(raw_value) = field_bytes.field(slot) else {
return contract.missing_slot_value(slot);
};
let expected_primary_key_component =
expected_primary_key_component_for_slot(&contract, expected_key, slot)?;
decode_slot_with_contract(
&contract,
slot,
raw_value,
expected_primary_key_component,
probe,
)
}
fn decode_slot_with_contract(
contract: &StructuralRowContract,
slot: usize,
raw_value: &[u8],
expected_primary_key_component: Option<PrimaryKeyComponent>,
probe: &StructuralReadProbe,
) -> Result<Value, InternalError> {
if contract.has_accepted_decode_contract() {
return decode_slot_with_accepted_contract(
contract,
slot,
raw_value,
expected_primary_key_component,
probe,
);
}
decode_slot_with_generated_contract(
contract,
slot,
raw_value,
expected_primary_key_component,
probe,
)
}
fn decode_slot_with_accepted_contract(
contract: &StructuralRowContract,
slot: usize,
raw_value: &[u8],
expected_primary_key_component: Option<PrimaryKeyComponent>,
probe: &StructuralReadProbe,
) -> Result<Value, InternalError> {
let accepted_field = contract.required_accepted_field_decode_contract(slot)?;
if let Some(expected_key) = expected_primary_key_component {
return materialize_primary_key_slot_value_from_expected_component_with_accepted_field(
accepted_field,
expected_key,
probe,
);
}
decode_slot_with_accepted_field(accepted_field, raw_value, probe)
}
fn decode_slot_with_generated_contract(
contract: &StructuralRowContract,
slot: usize,
raw_value: &[u8],
expected_primary_key_component: Option<PrimaryKeyComponent>,
probe: &StructuralReadProbe,
) -> Result<Value, InternalError> {
let field = contract.field_decode_contract(slot)?;
decode_slot_with_field(field, raw_value, expected_primary_key_component, probe)
}
fn decode_slot_with_accepted_field(
field: AcceptedFieldDecodeContract<'_>,
raw_value: &[u8],
probe: &StructuralReadProbe,
) -> Result<Value, InternalError> {
match field.leaf_codec() {
LeafCodec::Scalar(codec) => {
probe.record_validated_slot();
match decode_scalar_slot_value(raw_value, codec, field.field_name())? {
ScalarSlotValueRef::Null => Ok(Value::Null),
ScalarSlotValueRef::Value(value) => Ok(value.into_value()),
}
}
LeafCodec::StructuralFallback => {
probe.record_validated_slot();
probe.record_validated_non_scalar();
probe.record_materialized_non_scalar();
decode_runtime_value_from_accepted_field_contract(field, raw_value)
}
}
}
fn decode_slot_with_field(
field: StructuralFieldDecodeContract,
raw_value: &[u8],
expected_primary_key_component: Option<PrimaryKeyComponent>,
probe: &StructuralReadProbe,
) -> Result<Value, InternalError> {
if let Some(expected_key) = expected_primary_key_component {
return materialize_primary_key_slot_value_from_expected_component(
field,
expected_key,
probe,
);
}
match field.leaf_codec() {
LeafCodec::Scalar(codec) => {
probe.record_validated_slot();
match decode_scalar_slot_value(raw_value, codec, field.name())? {
ScalarSlotValueRef::Null => Ok(Value::Null),
ScalarSlotValueRef::Value(value) => Ok(value.into_value()),
}
}
LeafCodec::StructuralFallback => {
probe.record_validated_slot();
probe.record_validated_non_scalar();
probe.record_materialized_non_scalar();
validate_non_scalar_slot_value(raw_value, field)?;
decode_runtime_value_from_field_contract(field, raw_value)
}
}
}
fn expected_primary_key_component_for_slot(
contract: &StructuralRowContract,
expected_key: &PrimaryKeyValue,
slot: usize,
) -> Result<Option<PrimaryKeyComponent>, InternalError> {
match *expected_key {
PrimaryKeyValue::Scalar(component) => {
Ok((slot == contract.primary_key_slot()).then_some(component))
}
PrimaryKeyValue::Composite(composite) => {
let slots = contract.primary_key_slot_indices();
if slots.len() != composite.len() {
return Err(InternalError::persisted_row_decode_failed(format!(
"composite primary-key slot count mismatch: expected {} slots, row contract has {}",
composite.len(),
slots.len(),
)));
}
Ok(slots
.iter()
.position(|primary_key_slot| *primary_key_slot == slot)
.map(|component_index| composite.components()[component_index]))
}
}
}