use crate::{
db::data::{CanonicalRow, RawRow, StructuralRowDecodeError, StructuralRowFieldBytes},
error::InternalError,
model::entity::EntityModel,
value::Value,
};
use std::borrow::Cow;
use crate::db::data::persisted_row::{
codec::ScalarSlotValueRef,
contract::{
decode_slot_value_from_bytes, dense_canonical_slot_image_from_payload_source,
dense_canonical_slot_image_from_value_source, emit_raw_row_from_slot_payloads,
encode_slot_value_from_value, field_model_for_slot, serialized_patch_payload_by_slot,
},
reader::StructuralSlotReader,
types::{PersistedRow, SerializedFieldUpdate, SerializedUpdatePatch, SlotReader, UpdatePatch},
writer::{CompleteSerializedPatchWriter, SlotBufferWriter},
};
fn dense_canonical_slot_image_from_complete_serialized_patch(
model: &'static EntityModel,
patch: &SerializedUpdatePatch,
) -> Result<Vec<Vec<u8>>, InternalError> {
let patch_payloads = serialized_patch_payload_by_slot(model, patch)?;
dense_canonical_slot_image_from_payload_source(model, |slot| {
patch_payloads[slot].ok_or_else(|| {
InternalError::persisted_row_encode_failed(format!(
"serialized patch did not emit slot {slot} for entity '{}'",
model.path()
))
})
})
}
struct SerializedPatchSlotReader<'a> {
model: &'static EntityModel,
payloads: Vec<Option<&'a [u8]>>,
decoded: Vec<Option<Value>>,
}
impl<'a> SerializedPatchSlotReader<'a> {
fn new(
model: &'static EntityModel,
patch: &'a SerializedUpdatePatch,
) -> Result<Self, InternalError> {
let payloads = serialized_patch_payload_by_slot(model, patch)?;
let decoded = vec![None; model.fields().len()];
Ok(Self {
model,
payloads,
decoded,
})
}
}
impl SlotReader for SerializedPatchSlotReader<'_> {
fn model(&self) -> &'static EntityModel {
self.model
}
fn has(&self, slot: usize) -> bool {
self.payloads.get(slot).is_some_and(Option::is_some)
}
fn get_bytes(&self, slot: usize) -> Option<&[u8]> {
self.payloads.get(slot).copied().flatten()
}
fn get_scalar(&self, slot: usize) -> Result<Option<ScalarSlotValueRef<'_>>, InternalError> {
let Some(raw_value) = self.get_bytes(slot) else {
return Ok(None);
};
let field = field_model_for_slot(self.model, slot)?;
let crate::model::field::LeafCodec::Scalar(codec) = field.leaf_codec() else {
return Ok(None);
};
crate::db::data::persisted_row::codec::decode_scalar_slot_value(
raw_value,
codec,
field.name(),
)
.map(Some)
}
fn get_value(&mut self, slot: usize) -> Result<Option<Value>, InternalError> {
if slot >= self.decoded.len() {
return Ok(None);
}
if self.decoded[slot].is_none()
&& let Some(raw_value) = self.get_bytes(slot)
{
self.decoded[slot] = Some(decode_slot_value_from_bytes(self.model, slot, raw_value)?);
}
Ok(self.decoded[slot].clone())
}
}
pub(in crate::db) fn materialize_entity_from_serialized_update_patch<E>(
patch: &SerializedUpdatePatch,
) -> Result<E, InternalError>
where
E: PersistedRow,
{
let mut slots = SerializedPatchSlotReader::new(E::MODEL, patch)?;
E::materialize_from_slots(&mut slots)
}
pub(in crate::db) fn canonical_row_from_complete_serialized_update_patch(
model: &'static EntityModel,
patch: &SerializedUpdatePatch,
) -> Result<CanonicalRow, InternalError> {
let slot_payloads = dense_canonical_slot_image_from_complete_serialized_patch(model, patch)?;
emit_raw_row_from_slot_payloads(model, slot_payloads.as_slice())
}
pub(in crate::db) fn canonical_row_from_entity<E>(entity: &E) -> Result<CanonicalRow, InternalError>
where
E: PersistedRow,
{
let mut writer = SlotBufferWriter::for_model(E::MODEL);
entity.write_slots(&mut writer)?;
let encoded = crate::db::codec::serialize_row_payload(writer.finish()?)?;
let raw_row = RawRow::from_untrusted_bytes(encoded).map_err(InternalError::from)?;
Ok(CanonicalRow::from_canonical_raw_row(raw_row))
}
pub(in crate::db) fn canonical_row_from_structural_slot_reader(
row_fields: &StructuralSlotReader<'_>,
) -> Result<CanonicalRow, InternalError> {
let slot_payloads = dense_canonical_slot_image_from_value_source(row_fields.model(), |slot| {
row_fields
.required_cached_value(slot)
.map(Cow::Borrowed)
.map_err(|_| {
InternalError::persisted_row_encode_failed(format!(
"slot {slot} is missing from the structural value cache for entity '{}'",
row_fields.model().path()
))
})
})?;
emit_raw_row_from_slot_payloads(row_fields.model(), slot_payloads.as_slice())
}
pub(in crate::db) fn canonical_row_from_raw_row(
model: &'static EntityModel,
raw_row: &RawRow,
) -> Result<CanonicalRow, InternalError> {
let field_bytes = StructuralRowFieldBytes::from_raw_row(raw_row, model)
.map_err(StructuralRowDecodeError::into_internal_error)?;
let slot_payloads = dense_canonical_slot_image_from_payload_source(model, |slot| {
field_bytes.field(slot).ok_or_else(|| {
InternalError::persisted_row_encode_failed(format!(
"slot {slot} is missing from the baseline row for entity '{}'",
model.path()
))
})
})?;
emit_raw_row_from_slot_payloads(model, slot_payloads.as_slice())
}
pub(in crate::db) const fn canonical_row_from_stored_raw_row(raw_row: RawRow) -> CanonicalRow {
CanonicalRow::from_canonical_raw_row(raw_row)
}
#[cfg(test)]
pub(in crate::db) fn apply_update_patch_to_raw_row(
model: &'static EntityModel,
raw_row: &RawRow,
patch: &UpdatePatch,
) -> Result<CanonicalRow, InternalError> {
let serialized_patch = serialize_update_patch_fields(model, patch)?;
apply_serialized_update_patch_to_raw_row(model, raw_row, &serialized_patch)
}
pub(in crate::db) fn serialize_update_patch_fields(
model: &'static EntityModel,
patch: &UpdatePatch,
) -> Result<SerializedUpdatePatch, InternalError> {
if patch.is_empty() {
return Ok(SerializedUpdatePatch::default());
}
let mut entries = Vec::with_capacity(patch.entries().len());
for entry in patch.entries() {
let slot = entry.slot();
let payload = encode_slot_value_from_value(model, slot.index(), entry.value())?;
entries.push(SerializedFieldUpdate::new(slot, payload));
}
Ok(SerializedUpdatePatch::new(entries))
}
pub(in crate::db) fn serialize_entity_slots_as_complete_serialized_patch<E>(
entity: &E,
) -> Result<SerializedUpdatePatch, InternalError>
where
E: PersistedRow,
{
let mut writer = CompleteSerializedPatchWriter::for_model(E::MODEL);
entity.write_slots(&mut writer)?;
writer.finish_dense_slot_image()
}
pub(in crate::db) fn apply_serialized_update_patch_to_raw_row(
model: &'static EntityModel,
raw_row: &RawRow,
patch: &SerializedUpdatePatch,
) -> Result<CanonicalRow, InternalError> {
if patch.is_empty() {
return canonical_row_from_raw_row(model, raw_row);
}
let field_bytes = StructuralRowFieldBytes::from_raw_row(raw_row, model)
.map_err(StructuralRowDecodeError::into_internal_error)?;
let patch_payloads = serialized_patch_payload_by_slot(model, patch)?;
let slot_payloads = dense_canonical_slot_image_from_payload_source(model, |slot| {
if let Some(payload) = patch_payloads[slot] {
Ok(payload)
} else {
field_bytes.field(slot).ok_or_else(|| {
InternalError::persisted_row_encode_failed(format!(
"slot {slot} is missing from the baseline row for entity '{}'",
model.path()
))
})
}
})?;
emit_raw_row_from_slot_payloads(model, slot_payloads.as_slice())
}