#[cfg(test)]
use crate::db::data::persisted_row::{
contract::{
canonical_row_from_payload_source,
canonical_row_from_runtime_value_source_with_generated_contract,
},
writer::CompleteSerializedPatchWriter,
};
use crate::{
db::{
data::{
CanonicalRow, RawRow, StructuralRowContract,
persisted_row::{
codec::ScalarSlotValueRef,
contract::{
canonical_row_from_runtime_value_source_with_accepted_contract,
decode_runtime_value_from_row_contract,
decode_scalar_slot_value_from_row_contract,
encode_runtime_value_for_accepted_field_contract,
},
reader::StructuralSlotReader,
types::{
FieldSlot, PersistedRow, SerializedStructuralFieldUpdate,
SerializedStructuralPatch, SlotReader, StructuralPatch,
},
},
},
schema::AcceptedRowDecodeContract,
},
error::InternalError,
model::{entity::EntityModel, field::FieldModel},
traits::EntityValue,
value::Value,
};
use std::borrow::Cow;
struct SerializedPatchPayloads<'a> {
contract: StructuralRowContract,
generated_fields: &'static [FieldModel],
payloads: Vec<Option<&'a [u8]>>,
}
impl<'a> SerializedPatchPayloads<'a> {
#[cfg(test)]
fn new_for_generated_model_for_test(
model: &'static EntityModel,
patch: &'a SerializedStructuralPatch,
) -> Result<Self, InternalError> {
Self::from_contract(
StructuralRowContract::from_generated_model_for_test(model),
model.fields(),
patch,
)
}
fn new_with_accepted_contract(
model: &'static EntityModel,
accepted_decode_contract: AcceptedRowDecodeContract,
patch: &'a SerializedStructuralPatch,
) -> Result<Self, InternalError> {
Self::from_contract(
StructuralRowContract::from_accepted_decode_contract(
model.path(),
accepted_decode_contract,
),
model.fields(),
patch,
)
}
fn from_contract(
contract: StructuralRowContract,
generated_fields: &'static [FieldModel],
patch: &'a SerializedStructuralPatch,
) -> Result<Self, InternalError> {
let mut payloads = vec![None; contract.field_count()];
for entry in patch.entries() {
let slot = entry.slot().index();
Self::validate_payload_slot(&contract, generated_fields, slot)?;
payloads[slot] = Some(entry.payload());
}
Ok(Self {
contract,
generated_fields,
payloads,
})
}
fn generated_compatible_field_model(&self, slot: usize) -> Result<&FieldModel, InternalError> {
self.generated_fields.get(slot).ok_or_else(|| {
InternalError::persisted_row_slot_lookup_out_of_bounds(
self.contract.entity_path(),
slot,
)
})
}
fn validate_payload_slot(
contract: &StructuralRowContract,
generated_fields: &'static [FieldModel],
slot: usize,
) -> Result<(), InternalError> {
if contract.has_accepted_decode_contract() {
let _ = contract.required_accepted_field_decode_contract(slot)?;
return Ok(());
}
generated_fields.get(slot).map(|_| ()).ok_or_else(|| {
InternalError::persisted_row_slot_lookup_out_of_bounds(contract.entity_path(), slot)
})
}
fn has(&self, slot: usize) -> bool {
self.payloads.get(slot).is_some_and(Option::is_some)
}
fn get(&self, slot: usize) -> Option<&[u8]> {
self.payloads.get(slot).copied().flatten()
}
#[cfg(test)]
fn required_complete_payload(&self, slot: usize) -> Result<&[u8], InternalError> {
self.get(slot).ok_or_else(|| {
InternalError::persisted_row_encode_failed(format!(
"serialized patch did not emit slot {slot} for entity '{}'",
self.contract.entity_path()
))
})
}
}
struct SerializedPatchSlotReader<'a> {
payloads: SerializedPatchPayloads<'a>,
decoded: Vec<Option<Value>>,
}
impl<'a> SerializedPatchSlotReader<'a> {
#[cfg(test)]
fn new(
model: &'static EntityModel,
patch: &'a SerializedStructuralPatch,
) -> Result<Self, InternalError> {
let payloads = SerializedPatchPayloads::new_for_generated_model_for_test(model, patch)?;
let decoded = vec![None; payloads.contract.field_count()];
Ok(Self { payloads, decoded })
}
fn new_with_accepted_contract(
model: &'static EntityModel,
accepted_decode_contract: AcceptedRowDecodeContract,
patch: &'a SerializedStructuralPatch,
) -> Result<Self, InternalError> {
let payloads = SerializedPatchPayloads::new_with_accepted_contract(
model,
accepted_decode_contract,
patch,
)?;
let decoded = vec![None; payloads.contract.field_count()];
Ok(Self { payloads, decoded })
}
}
impl SlotReader for SerializedPatchSlotReader<'_> {
fn generated_compatible_field_model(&self, slot: usize) -> Result<&FieldModel, InternalError> {
self.payloads.generated_compatible_field_model(slot)
}
fn has(&self, slot: usize) -> bool {
self.payloads.has(slot)
}
fn get_bytes(&self, slot: usize) -> Option<&[u8]> {
self.payloads.get(slot)
}
fn get_scalar(&self, slot: usize) -> Result<Option<ScalarSlotValueRef<'_>>, InternalError> {
let Some(raw_value) = self.get_bytes(slot) else {
return Ok(None);
};
let crate::model::field::LeafCodec::Scalar(_) =
self.payloads.contract.field_leaf_codec(slot)?
else {
return Ok(None);
};
decode_scalar_slot_value_from_row_contract(
&self.payloads.contract,
slot,
raw_value,
"accepted serialized structural patch scalar read reached non-scalar slot",
"generated serialized structural patch scalar read reached non-scalar slot",
)
.map(Some)
}
fn get_value(&mut self, slot: usize) -> Result<Option<Value>, InternalError> {
if slot >= self.decoded.len() {
return Ok(None);
}
if self.decoded[slot].is_none()
&& let Some(raw_value) = self.get_bytes(slot)
{
self.decoded[slot] = Some(decode_runtime_value_from_row_contract(
&self.payloads.contract,
slot,
raw_value,
)?);
}
Ok(self.decoded[slot].clone())
}
}
#[cfg(test)]
pub(in crate::db) fn materialize_entity_from_serialized_structural_patch_for_generated_model_for_test<
E,
>(
patch: &SerializedStructuralPatch,
) -> Result<E, InternalError>
where
E: PersistedRow,
{
let mut slots = SerializedPatchSlotReader::new(E::MODEL, patch)?;
E::materialize_from_slots(&mut slots)
}
pub(in crate::db) fn materialize_entity_from_serialized_structural_patch_with_accepted_contract<E>(
patch: &SerializedStructuralPatch,
accepted_decode_contract: AcceptedRowDecodeContract,
) -> Result<E, InternalError>
where
E: PersistedRow,
{
let mut slots = SerializedPatchSlotReader::new_with_accepted_contract(
E::MODEL,
accepted_decode_contract,
patch,
)?;
E::materialize_from_slots(&mut slots)
}
#[cfg(test)]
pub(in crate::db) fn canonical_row_from_complete_serialized_structural_patch_for_generated_model_for_test(
model: &'static EntityModel,
patch: &SerializedStructuralPatch,
) -> Result<CanonicalRow, InternalError> {
let patch_payloads = SerializedPatchPayloads::new_for_generated_model_for_test(model, patch)?;
canonical_row_from_payload_source(model, |slot| patch_payloads.required_complete_payload(slot))
}
#[cfg(test)]
pub(in crate::db) fn canonical_row_from_entity_for_generated_model_for_test<E>(
entity: &E,
) -> Result<CanonicalRow, InternalError>
where
E: PersistedRow,
{
let serialized_slots =
serialize_entity_slots_as_complete_serialized_patch_for_generated_model_for_test(entity)?;
canonical_row_from_complete_serialized_structural_patch_for_generated_model_for_test(
E::MODEL,
&serialized_slots,
)
}
pub(in crate::db) fn canonical_row_from_entity_with_accepted_contract<E>(
entity_path: &'static str,
accepted_decode_contract: AcceptedRowDecodeContract,
entity: &E,
) -> Result<CanonicalRow, InternalError>
where
E: PersistedRow + EntityValue,
{
let contract =
StructuralRowContract::from_accepted_decode_contract(entity_path, accepted_decode_contract);
canonical_row_from_runtime_value_source_with_accepted_contract(&contract, |slot| {
entity
.get_value_by_index(slot)
.map(Cow::Owned)
.ok_or_else(|| {
InternalError::persisted_row_encode_failed(format!(
"accepted entity row emission missing slot {slot} for entity '{}'",
contract.entity_path()
))
})
})
}
#[cfg(test)]
fn canonical_row_from_structural_slot_reader_with_generated_contract(
row_fields: &StructuralSlotReader<'_>,
) -> Result<CanonicalRow, InternalError> {
canonical_row_from_runtime_value_source_with_generated_contract(row_fields.contract(), |slot| {
structural_slot_reader_value(row_fields, slot)
})
}
pub(in crate::db) fn canonical_row_from_structural_slot_reader_with_accepted_contract(
row_fields: &StructuralSlotReader<'_>,
) -> Result<CanonicalRow, InternalError> {
canonical_row_from_runtime_value_source_with_accepted_contract(row_fields.contract(), |slot| {
structural_slot_reader_value(row_fields, slot)
})
}
pub(in crate::db) fn canonical_row_from_raw_row_with_structural_contract(
raw_row: &RawRow,
contract: StructuralRowContract,
) -> Result<CanonicalRow, InternalError> {
let row_fields = StructuralSlotReader::from_raw_row_with_validated_contract(raw_row, contract)?;
if row_fields.has_accepted_decode_contract() {
return canonical_row_from_structural_slot_reader_with_accepted_contract(&row_fields);
}
#[cfg(test)]
{
canonical_row_from_structural_slot_reader_with_generated_contract(&row_fields)
}
#[cfg(not(test))]
Err(InternalError::store_invariant(format!(
"raw row canonicalization requires accepted row contract for entity '{}'",
row_fields.contract().entity_path(),
)))
}
pub(in crate::db) fn canonical_row_from_raw_row_with_accepted_decode_contract(
entity_path: &'static str,
accepted_decode_contract: AcceptedRowDecodeContract,
raw_row: &RawRow,
) -> Result<CanonicalRow, InternalError> {
let contract =
StructuralRowContract::from_accepted_decode_contract(entity_path, accepted_decode_contract);
canonical_row_from_raw_row_with_structural_contract(raw_row, contract)
}
pub(in crate::db) const fn canonical_row_from_stored_raw_row(raw_row: RawRow) -> CanonicalRow {
CanonicalRow::from_canonical_raw_row(raw_row)
}
pub(in crate::db) fn serialize_structural_patch_fields_with_accepted_contract(
entity_path: &'static str,
accepted_decode_contract: AcceptedRowDecodeContract,
patch: &StructuralPatch,
) -> Result<SerializedStructuralPatch, InternalError> {
let contract =
StructuralRowContract::from_accepted_decode_contract(entity_path, accepted_decode_contract);
serialize_structural_patch_fields_for_accepted_contract(&contract, patch)
}
pub(in crate::db) fn serialize_complete_structural_patch_fields_with_accepted_contract(
entity_path: &'static str,
accepted_decode_contract: AcceptedRowDecodeContract,
patch: &StructuralPatch,
) -> Result<SerializedStructuralPatch, InternalError> {
let contract =
StructuralRowContract::from_accepted_decode_contract(entity_path, accepted_decode_contract);
serialize_complete_structural_patch_fields_for_accepted_contract(&contract, patch)
}
fn serialize_structural_patch_fields_for_accepted_contract(
contract: &StructuralRowContract,
patch: &StructuralPatch,
) -> Result<SerializedStructuralPatch, InternalError> {
if patch.is_empty() {
return Ok(SerializedStructuralPatch::default());
}
let mut entries = Vec::with_capacity(patch.entries().len());
for entry in patch.entries() {
let slot = entry.slot();
let field = contract.required_accepted_field_decode_contract(slot.index())?;
let payload = encode_runtime_value_for_accepted_field_contract(field, entry.value())?;
entries.push(SerializedStructuralFieldUpdate::new(slot, payload));
}
Ok(SerializedStructuralPatch::new(entries))
}
fn serialize_complete_structural_patch_fields_for_accepted_contract(
contract: &StructuralRowContract,
patch: &StructuralPatch,
) -> Result<SerializedStructuralPatch, InternalError> {
let mut payloads = vec![None; contract.field_count()];
for entry in patch.entries() {
let slot = entry.slot().index();
let field = contract.required_accepted_field_decode_contract(slot)?;
let payload = encode_runtime_value_for_accepted_field_contract(field, entry.value())?;
payloads[slot] = Some(payload);
}
for (slot, payload) in payloads.iter_mut().enumerate() {
if payload.is_some() {
continue;
}
let field = contract.required_accepted_field_decode_contract(slot)?;
let value = contract.missing_slot_value(slot)?;
*payload = Some(encode_runtime_value_for_accepted_field_contract(
field, &value,
)?);
}
let entries = payloads
.into_iter()
.enumerate()
.map(|(slot, payload)| {
let payload = payload.ok_or_else(|| {
InternalError::persisted_row_slot_lookup_out_of_bounds(contract.entity_path(), slot)
})?;
Ok(SerializedStructuralFieldUpdate::new(
FieldSlot::from_validated_index(slot),
payload,
))
})
.collect::<Result<Vec<_>, InternalError>>()?;
Ok(SerializedStructuralPatch::new(entries))
}
#[cfg(test)]
pub(in crate::db) fn serialize_entity_slots_as_complete_serialized_patch_for_generated_model_for_test<
E,
>(
entity: &E,
) -> Result<SerializedStructuralPatch, InternalError>
where
E: PersistedRow,
{
let mut writer = CompleteSerializedPatchWriter::for_generated_model_for_test(E::MODEL);
entity.write_slots(&mut writer)?;
writer.finish_dense_slot_image()
}
pub(in crate::db) fn apply_serialized_structural_patch_to_raw_row_with_accepted_contract(
entity_path: &'static str,
accepted_decode_contract: AcceptedRowDecodeContract,
raw_row: &RawRow,
patch: &SerializedStructuralPatch,
) -> Result<CanonicalRow, InternalError> {
let contract =
StructuralRowContract::from_accepted_decode_contract(entity_path, accepted_decode_contract);
let row_fields =
StructuralSlotReader::from_raw_row_with_validated_contract(raw_row, contract.clone())?;
let mut values = Vec::with_capacity(contract.field_count());
for slot in 0..contract.field_count() {
values.push(row_fields.required_cached_value(slot)?.clone());
}
for entry in patch.entries() {
let slot = entry.slot().index();
let value = values.get_mut(slot).ok_or_else(|| {
InternalError::persisted_row_encode_failed(format!(
"slot {slot} is outside the accepted structural after-image for entity '{}'",
contract.entity_path()
))
})?;
*value = decode_runtime_value_from_row_contract(&contract, slot, entry.payload())?;
}
canonical_row_from_runtime_value_source_with_accepted_contract(&contract, |slot| {
values.get(slot).map(Cow::Borrowed).ok_or_else(|| {
InternalError::persisted_row_encode_failed(format!(
"slot {slot} is missing from accepted structural after-image for entity '{}'",
contract.entity_path()
))
})
})
}
fn structural_slot_reader_value<'a>(
row_fields: &'a StructuralSlotReader<'_>,
slot: usize,
) -> Result<Cow<'a, Value>, InternalError> {
row_fields
.required_cached_value(slot)
.map(Cow::Borrowed)
.map_err(|_| {
InternalError::persisted_row_encode_failed(format!(
"slot {slot} is missing from the structural value cache for entity '{}'",
row_fields.contract().entity_path()
))
})
}