mod codec;
use crate::{
db::{
codec::serialize_row_payload,
data::{
CanonicalRow, DataKey, RawRow, StructuralRowDecodeError, StructuralRowFieldBytes,
decode_storage_key_field_bytes, decode_structural_field_by_kind_bytes,
decode_structural_value_storage_bytes, validate_structural_field_by_kind_bytes,
validate_structural_value_storage_bytes,
},
scalar_expr::compile_scalar_literal_expr_value,
schema::{field_type_from_model_kind, literal_matches_type},
},
error::InternalError,
model::{
entity::{EntityModel, resolve_field_slot, resolve_primary_key_slot},
field::{FieldKind, FieldModel, FieldStorageDecode, LeafCodec},
},
serialize::serialize,
traits::EntityKind,
value::{StorageKey, Value, ValueEnum},
};
use serde_cbor::{Value as CborValue, value::to_value as to_cbor_value};
#[cfg(any(test, feature = "structural-read-metrics"))]
use std::cell::{Cell, RefCell};
use std::{borrow::Cow, cell::OnceCell, cmp::Ordering, collections::BTreeMap};
use self::codec::{decode_scalar_slot_value, encode_scalar_slot_value};
pub use self::codec::{
PersistedScalar, ScalarSlotValueRef, ScalarValueRef, decode_persisted_custom_many_slot_payload,
decode_persisted_custom_slot_payload, decode_persisted_non_null_slot_payload,
decode_persisted_option_scalar_slot_payload, decode_persisted_option_slot_payload,
decode_persisted_scalar_slot_payload, decode_persisted_slot_payload,
encode_persisted_custom_many_slot_payload, encode_persisted_custom_slot_payload,
encode_persisted_option_scalar_slot_payload, encode_persisted_scalar_slot_payload,
encode_persisted_slot_payload,
};
#[allow(dead_code)]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db) struct FieldSlot {
index: usize,
}
#[allow(dead_code)]
impl FieldSlot {
#[must_use]
pub(in crate::db) fn resolve(model: &'static EntityModel, field_name: &str) -> Option<Self> {
resolve_field_slot(model, field_name).map(|index| Self { index })
}
pub(in crate::db) fn from_index(
model: &'static EntityModel,
index: usize,
) -> Result<Self, InternalError> {
field_model_for_slot(model, index)?;
Ok(Self { index })
}
#[must_use]
pub(in crate::db) const fn index(self) -> usize {
self.index
}
}
#[allow(dead_code)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db) struct FieldUpdate {
slot: FieldSlot,
value: Value,
}
#[allow(dead_code)]
impl FieldUpdate {
#[must_use]
pub(in crate::db) const fn new(slot: FieldSlot, value: Value) -> Self {
Self { slot, value }
}
#[must_use]
pub(in crate::db) const fn slot(&self) -> FieldSlot {
self.slot
}
#[must_use]
pub(in crate::db) const fn value(&self) -> &Value {
&self.value
}
}
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct UpdatePatch {
entries: Vec<FieldUpdate>,
}
impl UpdatePatch {
#[must_use]
pub const fn new() -> Self {
Self {
entries: Vec::new(),
}
}
#[must_use]
pub(in crate::db) fn set(mut self, slot: FieldSlot, value: Value) -> Self {
self.entries.push(FieldUpdate::new(slot, value));
self
}
pub fn set_field(
self,
model: &'static EntityModel,
field_name: &str,
value: Value,
) -> Result<Self, InternalError> {
let Some(slot) = FieldSlot::resolve(model, field_name) else {
return Err(InternalError::mutation_structural_field_unknown(
model.path(),
field_name,
));
};
Ok(self.set(slot, value))
}
#[must_use]
pub(in crate::db) const fn entries(&self) -> &[FieldUpdate] {
self.entries.as_slice()
}
#[must_use]
pub(in crate::db) const fn is_empty(&self) -> bool {
self.entries.is_empty()
}
}
#[allow(dead_code)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db) struct SerializedFieldUpdate {
slot: FieldSlot,
payload: Vec<u8>,
}
#[allow(dead_code)]
impl SerializedFieldUpdate {
#[must_use]
pub(in crate::db) const fn new(slot: FieldSlot, payload: Vec<u8>) -> Self {
Self { slot, payload }
}
#[must_use]
pub(in crate::db) const fn slot(&self) -> FieldSlot {
self.slot
}
#[must_use]
pub(in crate::db) const fn payload(&self) -> &[u8] {
self.payload.as_slice()
}
}
#[allow(dead_code)]
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub(in crate::db) struct SerializedUpdatePatch {
entries: Vec<SerializedFieldUpdate>,
}
#[allow(dead_code)]
impl SerializedUpdatePatch {
#[must_use]
pub(in crate::db) const fn new(entries: Vec<SerializedFieldUpdate>) -> Self {
Self { entries }
}
#[must_use]
pub(in crate::db) const fn entries(&self) -> &[SerializedFieldUpdate] {
self.entries.as_slice()
}
#[must_use]
pub(in crate::db) const fn is_empty(&self) -> bool {
self.entries.is_empty()
}
}
pub trait SlotReader {
fn model(&self) -> &'static EntityModel;
fn has(&self, slot: usize) -> bool;
fn get_bytes(&self, slot: usize) -> Option<&[u8]>;
fn get_scalar(&self, slot: usize) -> Result<Option<ScalarSlotValueRef<'_>>, InternalError>;
fn get_value(&mut self, slot: usize) -> Result<Option<Value>, InternalError>;
}
pub(in crate::db) trait CanonicalSlotReader: SlotReader {
fn required_bytes(&self, slot: usize) -> Result<&[u8], InternalError> {
let field = field_model_for_slot(self.model(), slot)?;
self.get_bytes(slot)
.ok_or_else(|| InternalError::persisted_row_declared_field_missing(field.name()))
}
fn required_scalar(&self, slot: usize) -> Result<ScalarSlotValueRef<'_>, InternalError> {
let field = field_model_for_slot(self.model(), slot)?;
debug_assert!(matches!(field.leaf_codec(), LeafCodec::Scalar(_)));
self.get_scalar(slot)?
.ok_or_else(|| InternalError::persisted_row_declared_field_missing(field.name()))
}
fn required_value_by_contract(&self, slot: usize) -> Result<Value, InternalError> {
decode_slot_value_from_bytes(self.model(), slot, self.required_bytes(slot)?)
}
fn required_value_by_contract_cow(&self, slot: usize) -> Result<Cow<'_, Value>, InternalError> {
Ok(Cow::Owned(self.required_value_by_contract(slot)?))
}
}
pub trait SlotWriter {
fn write_slot(&mut self, slot: usize, payload: Option<&[u8]>) -> Result<(), InternalError>;
fn write_scalar(
&mut self,
slot: usize,
value: ScalarSlotValueRef<'_>,
) -> Result<(), InternalError> {
let payload = encode_scalar_slot_value(value);
self.write_slot(slot, Some(payload.as_slice()))
}
}
fn slot_cell_mut<T>(slots: &mut [T], slot: usize) -> Result<&mut T, InternalError> {
slots.get_mut(slot).ok_or_else(|| {
InternalError::persisted_row_encode_failed(
format!("slot {slot} is outside the row layout",),
)
})
}
fn required_slot_payload_bytes<'a>(
model: &'static EntityModel,
writer_label: &str,
slot: usize,
payload: Option<&'a [u8]>,
) -> Result<&'a [u8], InternalError> {
payload.ok_or_else(|| {
InternalError::persisted_row_encode_failed(format!(
"{writer_label} cannot clear slot {slot} for entity '{}'",
model.path()
))
})
}
fn encode_slot_payload_from_parts(
slot_count: usize,
slot_table: &[(u32, u32)],
payload_bytes: &[u8],
) -> Result<Vec<u8>, InternalError> {
let field_count = u16::try_from(slot_count).map_err(|_| {
InternalError::persisted_row_encode_failed(format!(
"field count {slot_count} exceeds u16 slot table capacity",
))
})?;
let mut encoded = Vec::with_capacity(
usize::from(field_count) * (u32::BITS as usize / 4) + 2 + payload_bytes.len(),
);
encoded.extend_from_slice(&field_count.to_be_bytes());
for (start, len) in slot_table {
encoded.extend_from_slice(&start.to_be_bytes());
encoded.extend_from_slice(&len.to_be_bytes());
}
encoded.extend_from_slice(payload_bytes);
Ok(encoded)
}
pub trait PersistedRow: EntityKind + Sized {
fn materialize_from_slots(slots: &mut dyn SlotReader) -> Result<Self, InternalError>;
fn write_slots(&self, out: &mut dyn SlotWriter) -> Result<(), InternalError>;
fn project_slot(slots: &mut dyn SlotReader, slot: usize) -> Result<Option<Value>, InternalError>
where
Self: crate::traits::FieldProjection,
{
let entity = Self::materialize_from_slots(slots)?;
Ok(<Self as crate::traits::FieldProjection>::get_value_by_index(&entity, slot))
}
}
#[cfg(test)]
pub(in crate::db) fn decode_slot_value_by_contract(
slots: &dyn SlotReader,
slot: usize,
) -> Result<Option<Value>, InternalError> {
let Some(raw_value) = slots.get_bytes(slot) else {
return Ok(None);
};
decode_slot_value_from_bytes(slots.model(), slot, raw_value).map(Some)
}
pub(in crate::db) fn decode_slot_value_from_bytes(
model: &'static EntityModel,
slot: usize,
raw_value: &[u8],
) -> Result<Value, InternalError> {
let field = field_model_for_slot(model, slot)?;
decode_slot_value_for_field(field, raw_value)
}
fn decode_slot_value_for_field(
field: &FieldModel,
raw_value: &[u8],
) -> Result<Value, InternalError> {
match field.leaf_codec() {
LeafCodec::Scalar(codec) => match decode_scalar_slot_value(raw_value, codec, field.name())?
{
ScalarSlotValueRef::Null => Ok(Value::Null),
ScalarSlotValueRef::Value(value) => Ok(value.into_value()),
},
LeafCodec::CborFallback => decode_non_scalar_slot_value(raw_value, field),
}
}
#[allow(dead_code)]
pub(in crate::db) fn encode_slot_value_from_value(
model: &'static EntityModel,
slot: usize,
value: &Value,
) -> Result<Vec<u8>, InternalError> {
let field = field_model_for_slot(model, slot)?;
ensure_slot_value_matches_field_contract(field, value)?;
match field.storage_decode() {
FieldStorageDecode::Value => serialize(value)
.map_err(|err| InternalError::persisted_row_field_encode_failed(field.name(), err)),
FieldStorageDecode::ByKind => match field.leaf_codec() {
LeafCodec::Scalar(_) => {
let scalar = compile_scalar_literal_expr_value(value).ok_or_else(|| {
InternalError::persisted_row_field_encode_failed(
field.name(),
format!(
"field kind {:?} requires a scalar runtime value, found {value:?}",
field.kind()
),
)
})?;
Ok(encode_scalar_slot_value(scalar.as_slot_value_ref()))
}
LeafCodec::CborFallback => {
encode_structural_field_bytes_by_kind(field.kind(), value, field.name())
}
},
}
}
fn canonicalize_slot_payload(
model: &'static EntityModel,
slot: usize,
raw_value: &[u8],
) -> Result<Vec<u8>, InternalError> {
let value = decode_slot_value_from_bytes(model, slot, raw_value)?;
encode_slot_value_from_value(model, slot, &value)
}
fn dense_canonical_slot_image_from_payload_source<'a, F>(
model: &'static EntityModel,
mut payload_for_slot: F,
) -> Result<Vec<Vec<u8>>, InternalError>
where
F: FnMut(usize) -> Result<&'a [u8], InternalError>,
{
let mut slot_payloads = Vec::with_capacity(model.fields().len());
for slot in 0..model.fields().len() {
let payload = payload_for_slot(slot)?;
slot_payloads.push(canonicalize_slot_payload(model, slot, payload)?);
}
Ok(slot_payloads)
}
fn dense_canonical_slot_image_from_value_source<'a, F>(
model: &'static EntityModel,
mut value_for_slot: F,
) -> Result<Vec<Vec<u8>>, InternalError>
where
F: FnMut(usize) -> Result<Cow<'a, Value>, InternalError>,
{
let mut slot_payloads = Vec::with_capacity(model.fields().len());
for slot in 0..model.fields().len() {
let value = value_for_slot(slot)?;
slot_payloads.push(encode_slot_value_from_value(model, slot, value.as_ref())?);
}
Ok(slot_payloads)
}
fn emit_raw_row_from_slot_payloads(
model: &'static EntityModel,
slot_payloads: &[Vec<u8>],
) -> Result<CanonicalRow, InternalError> {
if slot_payloads.len() != model.fields().len() {
return Err(InternalError::persisted_row_encode_failed(format!(
"canonical slot image expected {} slots for entity '{}', found {}",
model.fields().len(),
model.path(),
slot_payloads.len()
)));
}
let payload_capacity = slot_payloads
.iter()
.try_fold(0usize, |len, payload| len.checked_add(payload.len()))
.ok_or_else(|| {
InternalError::persisted_row_encode_failed(
"canonical slot image payload length overflow",
)
})?;
let mut payload_bytes = Vec::with_capacity(payload_capacity);
let mut slot_table = Vec::with_capacity(slot_payloads.len());
for (slot, payload) in slot_payloads.iter().enumerate() {
let start = u32::try_from(payload_bytes.len()).map_err(|_| {
InternalError::persisted_row_encode_failed(format!(
"canonical slot payload start exceeds u32 range: slot={slot}",
))
})?;
let len = u32::try_from(payload.len()).map_err(|_| {
InternalError::persisted_row_encode_failed(format!(
"canonical slot payload length exceeds u32 range: slot={slot}",
))
})?;
payload_bytes.extend_from_slice(payload.as_slice());
slot_table.push((start, len));
}
let row_payload =
encode_slot_payload_from_parts(slot_payloads.len(), slot_table.as_slice(), &payload_bytes)?;
let encoded = serialize_row_payload(row_payload)?;
let raw_row = RawRow::from_untrusted_bytes(encoded).map_err(InternalError::from)?;
Ok(CanonicalRow::from_canonical_raw_row(raw_row))
}
fn dense_canonical_slot_image_from_serialized_patch(
model: &'static EntityModel,
patch: &SerializedUpdatePatch,
) -> Result<Vec<Vec<u8>>, InternalError> {
let patch_payloads = serialized_patch_payload_by_slot(model, patch)?;
dense_canonical_slot_image_from_payload_source(model, |slot| {
patch_payloads[slot].ok_or_else(|| {
InternalError::persisted_row_encode_failed(format!(
"serialized patch did not emit slot {slot} for entity '{}'",
model.path()
))
})
})
}
pub(in crate::db) fn canonical_row_from_serialized_update_patch(
model: &'static EntityModel,
patch: &SerializedUpdatePatch,
) -> Result<CanonicalRow, InternalError> {
let slot_payloads = dense_canonical_slot_image_from_serialized_patch(model, patch)?;
emit_raw_row_from_slot_payloads(model, slot_payloads.as_slice())
}
pub(in crate::db) fn canonical_row_from_entity<E>(entity: &E) -> Result<CanonicalRow, InternalError>
where
E: PersistedRow,
{
let mut writer = SlotBufferWriter::for_model(E::MODEL);
entity.write_slots(&mut writer)?;
let encoded = serialize_row_payload(writer.finish()?)?;
let raw_row = RawRow::from_untrusted_bytes(encoded).map_err(InternalError::from)?;
Ok(CanonicalRow::from_canonical_raw_row(raw_row))
}
pub(in crate::db) fn canonical_row_from_structural_slot_reader(
row_fields: &StructuralSlotReader<'_>,
) -> Result<CanonicalRow, InternalError> {
let slot_payloads = dense_canonical_slot_image_from_value_source(row_fields.model, |slot| {
row_fields
.required_cached_value(slot)
.map(Cow::Borrowed)
.map_err(|_| {
InternalError::persisted_row_encode_failed(format!(
"slot {slot} is missing from the structural value cache for entity '{}'",
row_fields.model.path()
))
})
})?;
emit_raw_row_from_slot_payloads(row_fields.model, slot_payloads.as_slice())
}
pub(in crate::db) fn canonical_row_from_raw_row(
model: &'static EntityModel,
raw_row: &RawRow,
) -> Result<CanonicalRow, InternalError> {
let field_bytes = StructuralRowFieldBytes::from_raw_row(raw_row, model)
.map_err(StructuralRowDecodeError::into_internal_error)?;
let slot_payloads = dense_canonical_slot_image_from_payload_source(model, |slot| {
field_bytes.field(slot).ok_or_else(|| {
InternalError::persisted_row_encode_failed(format!(
"slot {slot} is missing from the baseline row for entity '{}'",
model.path()
))
})
})?;
emit_raw_row_from_slot_payloads(model, slot_payloads.as_slice())
}
pub(in crate::db) const fn canonical_row_from_stored_raw_row(raw_row: RawRow) -> CanonicalRow {
CanonicalRow::from_canonical_raw_row(raw_row)
}
#[allow(dead_code)]
pub(in crate::db) fn apply_update_patch_to_raw_row(
model: &'static EntityModel,
raw_row: &RawRow,
patch: &UpdatePatch,
) -> Result<CanonicalRow, InternalError> {
let serialized_patch = serialize_update_patch_fields(model, patch)?;
apply_serialized_update_patch_to_raw_row(model, raw_row, &serialized_patch)
}
#[allow(dead_code)]
pub(in crate::db) fn serialize_update_patch_fields(
model: &'static EntityModel,
patch: &UpdatePatch,
) -> Result<SerializedUpdatePatch, InternalError> {
if patch.is_empty() {
return Ok(SerializedUpdatePatch::default());
}
let mut entries = Vec::with_capacity(patch.entries().len());
for entry in patch.entries() {
let slot = entry.slot();
let payload = encode_slot_value_from_value(model, slot.index(), entry.value())?;
entries.push(SerializedFieldUpdate::new(slot, payload));
}
Ok(SerializedUpdatePatch::new(entries))
}
#[allow(dead_code)]
pub(in crate::db) fn serialize_entity_slots_as_update_patch<E>(
entity: &E,
) -> Result<SerializedUpdatePatch, InternalError>
where
E: PersistedRow,
{
let mut writer = SerializedPatchWriter::for_model(E::MODEL);
entity.write_slots(&mut writer)?;
writer.finish_complete()
}
#[allow(dead_code)]
pub(in crate::db) fn apply_serialized_update_patch_to_raw_row(
model: &'static EntityModel,
raw_row: &RawRow,
patch: &SerializedUpdatePatch,
) -> Result<CanonicalRow, InternalError> {
if patch.is_empty() {
return canonical_row_from_raw_row(model, raw_row);
}
let field_bytes = StructuralRowFieldBytes::from_raw_row(raw_row, model)
.map_err(StructuralRowDecodeError::into_internal_error)?;
let patch_payloads = serialized_patch_payload_by_slot(model, patch)?;
let slot_payloads = dense_canonical_slot_image_from_payload_source(model, |slot| {
if let Some(payload) = patch_payloads[slot] {
Ok(payload)
} else {
field_bytes.field(slot).ok_or_else(|| {
InternalError::persisted_row_encode_failed(format!(
"slot {slot} is missing from the baseline row for entity '{}'",
model.path()
))
})
}
})?;
emit_raw_row_from_slot_payloads(model, slot_payloads.as_slice())
}
fn decode_non_scalar_slot_value(
raw_value: &[u8],
field: &FieldModel,
) -> Result<Value, InternalError> {
let decoded = match field.storage_decode() {
crate::model::field::FieldStorageDecode::ByKind => {
decode_structural_field_by_kind_bytes(raw_value, field.kind())
}
crate::model::field::FieldStorageDecode::Value => {
decode_structural_value_storage_bytes(raw_value)
}
};
decoded.map_err(|err| {
InternalError::persisted_row_field_kind_decode_failed(field.name(), field.kind(), err)
})
}
fn validate_non_scalar_slot_value(
raw_value: &[u8],
field: &FieldModel,
) -> Result<(), InternalError> {
let validated = match field.storage_decode() {
crate::model::field::FieldStorageDecode::ByKind => {
validate_structural_field_by_kind_bytes(raw_value, field.kind())
}
crate::model::field::FieldStorageDecode::Value => {
validate_structural_value_storage_bytes(raw_value)
}
};
validated.map_err(|err| {
InternalError::persisted_row_field_kind_decode_failed(field.name(), field.kind(), err)
})
}
#[allow(dead_code)]
fn ensure_slot_value_matches_field_contract(
field: &FieldModel,
value: &Value,
) -> Result<(), InternalError> {
if matches!(value, Value::Null) {
if field.nullable() {
return Ok(());
}
return Err(InternalError::persisted_row_field_encode_failed(
field.name(),
"required field cannot store null",
));
}
if matches!(field.storage_decode(), FieldStorageDecode::Value) {
if !storage_value_matches_field_kind(field.kind(), value) {
return Err(InternalError::persisted_row_field_encode_failed(
field.name(),
format!(
"field kind {:?} does not accept runtime value {value:?}",
field.kind()
),
));
}
ensure_decimal_scale_matches(field.name(), field.kind(), value)?;
return ensure_value_is_deterministic_for_storage(field.name(), field.kind(), value);
}
let field_type = field_type_from_model_kind(&field.kind());
if !literal_matches_type(value, &field_type) {
return Err(InternalError::persisted_row_field_encode_failed(
field.name(),
format!(
"field kind {:?} does not accept runtime value {value:?}",
field.kind()
),
));
}
ensure_decimal_scale_matches(field.name(), field.kind(), value)?;
ensure_value_is_deterministic_for_storage(field.name(), field.kind(), value)
}
fn storage_value_matches_field_kind(kind: FieldKind, value: &Value) -> bool {
match (kind, value) {
(FieldKind::Account, Value::Account(_))
| (FieldKind::Blob, Value::Blob(_))
| (FieldKind::Bool, Value::Bool(_))
| (FieldKind::Date, Value::Date(_))
| (FieldKind::Decimal { .. }, Value::Decimal(_))
| (FieldKind::Duration, Value::Duration(_))
| (FieldKind::Enum { .. }, Value::Enum(_))
| (FieldKind::Float32, Value::Float32(_))
| (FieldKind::Float64, Value::Float64(_))
| (FieldKind::Int, Value::Int(_))
| (FieldKind::Int128, Value::Int128(_))
| (FieldKind::IntBig, Value::IntBig(_))
| (FieldKind::Principal, Value::Principal(_))
| (FieldKind::Subaccount, Value::Subaccount(_))
| (FieldKind::Text, Value::Text(_))
| (FieldKind::Timestamp, Value::Timestamp(_))
| (FieldKind::Uint, Value::Uint(_))
| (FieldKind::Uint128, Value::Uint128(_))
| (FieldKind::UintBig, Value::UintBig(_))
| (FieldKind::Ulid, Value::Ulid(_))
| (FieldKind::Unit, Value::Unit)
| (FieldKind::Structured { .. }, Value::List(_) | Value::Map(_)) => true,
(FieldKind::Relation { key_kind, .. }, value) => {
storage_value_matches_field_kind(*key_kind, value)
}
(FieldKind::List(inner) | FieldKind::Set(inner), Value::List(items)) => items
.iter()
.all(|item| storage_value_matches_field_kind(*inner, item)),
(FieldKind::Map { key, value }, Value::Map(entries)) => {
if Value::validate_map_entries(entries.as_slice()).is_err() {
return false;
}
entries.iter().all(|(entry_key, entry_value)| {
storage_value_matches_field_kind(*key, entry_key)
&& storage_value_matches_field_kind(*value, entry_value)
})
}
_ => false,
}
}
#[allow(dead_code)]
fn ensure_decimal_scale_matches(
field_name: &str,
kind: FieldKind,
value: &Value,
) -> Result<(), InternalError> {
if matches!(value, Value::Null) {
return Ok(());
}
match (kind, value) {
(FieldKind::Decimal { scale }, Value::Decimal(decimal)) => {
if decimal.scale() != scale {
return Err(InternalError::persisted_row_field_encode_failed(
field_name,
format!(
"decimal scale mismatch: expected {scale}, found {}",
decimal.scale()
),
));
}
Ok(())
}
(FieldKind::Relation { key_kind, .. }, value) => {
ensure_decimal_scale_matches(field_name, *key_kind, value)
}
(FieldKind::List(inner) | FieldKind::Set(inner), Value::List(items)) => {
for item in items {
ensure_decimal_scale_matches(field_name, *inner, item)?;
}
Ok(())
}
(
FieldKind::Map {
key,
value: map_value,
},
Value::Map(entries),
) => {
for (entry_key, entry_value) in entries {
ensure_decimal_scale_matches(field_name, *key, entry_key)?;
ensure_decimal_scale_matches(field_name, *map_value, entry_value)?;
}
Ok(())
}
_ => Ok(()),
}
}
#[allow(dead_code)]
fn ensure_value_is_deterministic_for_storage(
field_name: &str,
kind: FieldKind,
value: &Value,
) -> Result<(), InternalError> {
match (kind, value) {
(FieldKind::Set(_), Value::List(items)) => {
for pair in items.windows(2) {
let [left, right] = pair else {
continue;
};
if Value::canonical_cmp(left, right) != Ordering::Less {
return Err(InternalError::persisted_row_field_encode_failed(
field_name,
"set payload must already be canonical and deduplicated",
));
}
}
Ok(())
}
(FieldKind::Map { .. }, Value::Map(entries)) => {
Value::validate_map_entries(entries.as_slice())
.map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err))?;
if !Value::map_entries_are_strictly_canonical(entries.as_slice()) {
return Err(InternalError::persisted_row_field_encode_failed(
field_name,
"map payload must already be canonical and deduplicated",
));
}
Ok(())
}
_ => Ok(()),
}
}
fn serialized_patch_payload_by_slot<'a>(
model: &'static EntityModel,
patch: &'a SerializedUpdatePatch,
) -> Result<Vec<Option<&'a [u8]>>, InternalError> {
let mut payloads = vec![None; model.fields().len()];
for entry in patch.entries() {
let slot = entry.slot().index();
field_model_for_slot(model, slot)?;
payloads[slot] = Some(entry.payload());
}
Ok(payloads)
}
fn encode_structural_field_bytes_by_kind(
kind: FieldKind,
value: &Value,
field_name: &str,
) -> Result<Vec<u8>, InternalError> {
let cbor_value = encode_structural_field_cbor_by_kind(kind, value, field_name)?;
serialize(&cbor_value)
.map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err))
}
fn encode_structural_field_cbor_by_kind(
kind: FieldKind,
value: &Value,
field_name: &str,
) -> Result<CborValue, InternalError> {
match (kind, value) {
(_, Value::Null) => Ok(CborValue::Null),
(FieldKind::Blob, Value::Blob(value)) => Ok(CborValue::Bytes(value.clone())),
(FieldKind::Bool, Value::Bool(value)) => Ok(CborValue::Bool(*value)),
(FieldKind::Text, Value::Text(value)) => Ok(CborValue::Text(value.clone())),
(FieldKind::Int, Value::Int(value)) => Ok(CborValue::Integer(i128::from(*value))),
(FieldKind::Uint, Value::Uint(value)) => Ok(CborValue::Integer(i128::from(*value))),
(FieldKind::Float32, Value::Float32(value)) => to_cbor_value(value)
.map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err)),
(FieldKind::Float64, Value::Float64(value)) => to_cbor_value(value)
.map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err)),
(FieldKind::Int128, Value::Int128(value)) => to_cbor_value(value)
.map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err)),
(FieldKind::Uint128, Value::Uint128(value)) => to_cbor_value(value)
.map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err)),
(FieldKind::Ulid, Value::Ulid(value)) => Ok(CborValue::Text(value.to_string())),
(FieldKind::Account, Value::Account(value)) => encode_leaf_cbor_value(value, field_name),
(FieldKind::Date, Value::Date(value)) => encode_leaf_cbor_value(value, field_name),
(FieldKind::Decimal { .. }, Value::Decimal(value)) => {
encode_leaf_cbor_value(value, field_name)
}
(FieldKind::Duration, Value::Duration(value)) => encode_leaf_cbor_value(value, field_name),
(FieldKind::IntBig, Value::IntBig(value)) => encode_leaf_cbor_value(value, field_name),
(FieldKind::Principal, Value::Principal(value)) => {
encode_leaf_cbor_value(value, field_name)
}
(FieldKind::Subaccount, Value::Subaccount(value)) => {
encode_leaf_cbor_value(value, field_name)
}
(FieldKind::Timestamp, Value::Timestamp(value)) => {
encode_leaf_cbor_value(value, field_name)
}
(FieldKind::UintBig, Value::UintBig(value)) => encode_leaf_cbor_value(value, field_name),
(FieldKind::Unit, Value::Unit) => encode_leaf_cbor_value(&(), field_name),
(FieldKind::Relation { key_kind, .. }, value) => {
encode_structural_field_cbor_by_kind(*key_kind, value, field_name)
}
(FieldKind::List(inner) | FieldKind::Set(inner), Value::List(items)) => {
Ok(CborValue::Array(
items
.iter()
.map(|item| encode_structural_field_cbor_by_kind(*inner, item, field_name))
.collect::<Result<Vec<_>, _>>()?,
))
}
(FieldKind::Map { key, value }, Value::Map(entries)) => {
let mut encoded = BTreeMap::new();
for (entry_key, entry_value) in entries {
encoded.insert(
encode_structural_field_cbor_by_kind(*key, entry_key, field_name)?,
encode_structural_field_cbor_by_kind(*value, entry_value, field_name)?,
);
}
Ok(CborValue::Map(encoded))
}
(FieldKind::Enum { path, variants }, Value::Enum(value)) => {
encode_enum_cbor_value(path, variants, value, field_name)
}
(FieldKind::Structured { .. }, _) => Err(InternalError::persisted_row_field_encode_failed(
field_name,
"structured ByKind field encoding is unsupported",
)),
_ => Err(InternalError::persisted_row_field_encode_failed(
field_name,
format!("field kind {kind:?} does not accept runtime value {value:?}"),
)),
}
}
fn encode_leaf_cbor_value<T>(value: &T, field_name: &str) -> Result<CborValue, InternalError>
where
T: serde::Serialize,
{
to_cbor_value(value)
.map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err))
}
fn encode_enum_cbor_value(
path: &'static str,
variants: &'static [crate::model::field::EnumVariantModel],
value: &ValueEnum,
field_name: &str,
) -> Result<CborValue, InternalError> {
if let Some(actual_path) = value.path()
&& actual_path != path
{
return Err(InternalError::persisted_row_field_encode_failed(
field_name,
format!("enum path mismatch: expected '{path}', found '{actual_path}'"),
));
}
let Some(payload) = value.payload() else {
return Ok(CborValue::Text(value.variant().to_string()));
};
let Some(variant_model) = variants.iter().find(|item| item.ident() == value.variant()) else {
return Err(InternalError::persisted_row_field_encode_failed(
field_name,
format!(
"unknown enum variant '{}' for path '{path}'",
value.variant()
),
));
};
let Some(payload_kind) = variant_model.payload_kind() else {
return Err(InternalError::persisted_row_field_encode_failed(
field_name,
format!(
"enum variant '{}' does not accept a payload",
value.variant()
),
));
};
let payload_value = match variant_model.payload_storage_decode() {
FieldStorageDecode::ByKind => {
encode_structural_field_cbor_by_kind(*payload_kind, payload, field_name)?
}
FieldStorageDecode::Value => to_cbor_value(payload)
.map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err))?,
};
let mut encoded = BTreeMap::new();
encoded.insert(CborValue::Text(value.variant().to_string()), payload_value);
Ok(CborValue::Map(encoded))
}
fn field_model_for_slot(
model: &'static EntityModel,
slot: usize,
) -> Result<&'static FieldModel, InternalError> {
model
.fields()
.get(slot)
.ok_or_else(|| InternalError::persisted_row_slot_lookup_out_of_bounds(model.path(), slot))
}
pub(in crate::db) struct SlotBufferWriter {
model: &'static EntityModel,
slots: Vec<SlotBufferSlot>,
}
impl SlotBufferWriter {
pub(in crate::db) fn for_model(model: &'static EntityModel) -> Self {
Self {
model,
slots: vec![SlotBufferSlot::Missing; model.fields().len()],
}
}
pub(in crate::db) fn finish(self) -> Result<Vec<u8>, InternalError> {
let slot_count = self.slots.len();
let mut payload_bytes = Vec::new();
let mut slot_table = Vec::with_capacity(slot_count);
for (slot, slot_payload) in self.slots.into_iter().enumerate() {
match slot_payload {
SlotBufferSlot::Set(bytes) => {
let start = u32::try_from(payload_bytes.len()).map_err(|_| {
InternalError::persisted_row_encode_failed(
"slot payload start exceeds u32 range",
)
})?;
let len = u32::try_from(bytes.len()).map_err(|_| {
InternalError::persisted_row_encode_failed(
"slot payload length exceeds u32 range",
)
})?;
payload_bytes.extend_from_slice(&bytes);
slot_table.push((start, len));
}
SlotBufferSlot::Missing => {
return Err(InternalError::persisted_row_encode_failed(format!(
"slot buffer writer did not emit slot {slot} for entity '{}'",
self.model.path()
)));
}
}
}
encode_slot_payload_from_parts(slot_count, slot_table.as_slice(), payload_bytes.as_slice())
}
}
impl SlotWriter for SlotBufferWriter {
fn write_slot(&mut self, slot: usize, payload: Option<&[u8]>) -> Result<(), InternalError> {
let entry = slot_cell_mut(self.slots.as_mut_slice(), slot)?;
let payload = required_slot_payload_bytes(self.model, "slot buffer writer", slot, payload)?;
*entry = SlotBufferSlot::Set(payload.to_vec());
Ok(())
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
enum SlotBufferSlot {
Missing,
Set(Vec<u8>),
}
struct SerializedPatchWriter {
model: &'static EntityModel,
slots: Vec<PatchWriterSlot>,
}
impl SerializedPatchWriter {
fn for_model(model: &'static EntityModel) -> Self {
Self {
model,
slots: vec![PatchWriterSlot::Missing; model.fields().len()],
}
}
fn finish_complete(self) -> Result<SerializedUpdatePatch, InternalError> {
let mut entries = Vec::with_capacity(self.slots.len());
for (slot, payload) in self.slots.into_iter().enumerate() {
let field_slot = FieldSlot::from_index(self.model, slot)?;
let serialized = match payload {
PatchWriterSlot::Set(payload) => SerializedFieldUpdate::new(field_slot, payload),
PatchWriterSlot::Missing => {
return Err(InternalError::persisted_row_encode_failed(format!(
"serialized patch writer did not emit slot {slot} for entity '{}'",
self.model.path()
)));
}
};
entries.push(serialized);
}
Ok(SerializedUpdatePatch::new(entries))
}
}
impl SlotWriter for SerializedPatchWriter {
fn write_slot(&mut self, slot: usize, payload: Option<&[u8]>) -> Result<(), InternalError> {
let entry = slot_cell_mut(self.slots.as_mut_slice(), slot)?;
let payload =
required_slot_payload_bytes(self.model, "serialized patch writer", slot, payload)?;
*entry = PatchWriterSlot::Set(payload.to_vec());
Ok(())
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
enum PatchWriterSlot {
Missing,
Set(Vec<u8>),
}
pub(in crate::db) struct StructuralSlotReader<'a> {
model: &'static EntityModel,
field_bytes: StructuralRowFieldBytes<'a>,
cached_values: Vec<CachedSlotValue>,
#[cfg(any(test, feature = "structural-read-metrics"))]
metrics: StructuralReadProbe,
}
impl<'a> StructuralSlotReader<'a> {
pub(in crate::db) fn from_raw_row(
raw_row: &'a RawRow,
model: &'static EntityModel,
) -> Result<Self, InternalError> {
let field_bytes = StructuralRowFieldBytes::from_raw_row(raw_row, model)
.map_err(StructuralRowDecodeError::into_internal_error)?;
let cached_values = std::iter::repeat_with(|| CachedSlotValue::Pending)
.take(model.fields().len())
.collect();
let mut reader = Self {
model,
field_bytes,
cached_values,
#[cfg(any(test, feature = "structural-read-metrics"))]
metrics: StructuralReadProbe::begin(model.fields().len()),
};
reader.validate_all_declared_slots()?;
Ok(reader)
}
pub(in crate::db) fn validate_storage_key(
&self,
data_key: &DataKey,
) -> Result<(), InternalError> {
self.validate_storage_key_value(data_key.storage_key())
}
pub(in crate::db) fn validate_storage_key_value(
&self,
expected_key: StorageKey,
) -> Result<(), InternalError> {
let Some(primary_key_slot) = resolve_primary_key_slot(self.model) else {
return Err(InternalError::persisted_row_primary_key_field_missing(
self.model.path(),
));
};
let field = self.field_model(primary_key_slot)?;
let decoded_key = match self.get_scalar(primary_key_slot)? {
Some(ScalarSlotValueRef::Null) => None,
Some(ScalarSlotValueRef::Value(value)) => storage_key_from_scalar_ref(value),
None => Some(
decode_storage_key_field_bytes(
self.required_field_bytes(primary_key_slot, field.name())?,
field.kind,
)
.map_err(|err| {
InternalError::persisted_row_primary_key_not_storage_encodable(
expected_key,
err,
)
})?,
),
};
let Some(decoded_key) = decoded_key else {
return Err(InternalError::persisted_row_primary_key_slot_missing(
expected_key,
));
};
if decoded_key != expected_key {
return Err(InternalError::persisted_row_key_mismatch(
expected_key,
decoded_key,
));
}
Ok(())
}
fn field_model(&self, slot: usize) -> Result<&FieldModel, InternalError> {
field_model_for_slot(self.model, slot)
}
fn validate_all_declared_slots(&mut self) -> Result<(), InternalError> {
for slot in 0..self.model.fields().len() {
self.validate_slot_into_cache(slot)?;
}
Ok(())
}
fn validate_slot_into_cache(&mut self, slot: usize) -> Result<(), InternalError> {
if !matches!(self.cached_values.get(slot), Some(CachedSlotValue::Pending)) {
return Ok(());
}
let field = field_model_for_slot(self.model, slot)?;
let raw_value = self
.field_bytes
.field(slot)
.ok_or_else(|| InternalError::persisted_row_declared_field_missing(field.name()))?;
let cached = match field.leaf_codec() {
LeafCodec::Scalar(codec) => CachedSlotValue::Scalar(materialize_scalar_slot_value(
decode_scalar_slot_value(raw_value, codec, field.name())?,
)),
LeafCodec::CborFallback => {
#[cfg(any(test, feature = "structural-read-metrics"))]
self.metrics.record_validated_non_scalar();
validate_non_scalar_slot_value(raw_value, field)?;
CachedSlotValue::Deferred {
materialized: OnceCell::new(),
}
}
};
self.cached_values[slot] = cached;
Ok(())
}
pub(in crate::db) fn into_decoded_values(
mut self,
) -> Result<Vec<Option<Value>>, InternalError> {
let model = self.model;
let cached_values = std::mem::take(&mut self.cached_values);
let mut values = Vec::with_capacity(cached_values.len());
for (slot, cached) in cached_values.into_iter().enumerate() {
match cached {
CachedSlotValue::Scalar(value) => values.push(Some(value)),
CachedSlotValue::Deferred { materialized } => {
let field = field_model_for_slot(model, slot)?;
let value = if let Some(value) = materialized.into_inner() {
value
} else {
#[cfg(any(test, feature = "structural-read-metrics"))]
self.metrics.record_materialized_non_scalar();
let raw_value = self.field_bytes.field(slot).ok_or_else(|| {
InternalError::persisted_row_declared_field_missing(field.name())
})?;
decode_slot_value_for_field(field, raw_value)?
};
values.push(Some(value));
}
CachedSlotValue::Pending => {
return Err(InternalError::persisted_row_decode_failed(format!(
"structural slot cache was not fully validated before consumption: slot={slot}",
)));
}
}
}
Ok(values)
}
fn required_cached_value(&self, slot: usize) -> Result<&Value, InternalError> {
let cached = self.cached_values.get(slot).ok_or_else(|| {
InternalError::persisted_row_slot_cache_lookup_out_of_bounds(self.model.path(), slot)
})?;
match cached {
CachedSlotValue::Scalar(value) => Ok(value),
CachedSlotValue::Deferred { materialized } => {
let field = self.field_model(slot)?;
let raw_value = self.required_field_bytes(slot, field.name())?;
if materialized.get().is_none() {
#[cfg(any(test, feature = "structural-read-metrics"))]
self.metrics.record_materialized_non_scalar();
let value = decode_slot_value_for_field(field, raw_value)?;
let _ = materialized.set(value);
}
materialized.get().ok_or_else(|| {
InternalError::persisted_row_decode_failed(format!(
"structural slot cache failed to materialize deferred value: slot={slot}",
))
})
}
CachedSlotValue::Pending => Err(InternalError::persisted_row_decode_failed(format!(
"structural slot cache missing validated value after row-open validation: slot={slot}",
))),
}
}
pub(in crate::db) fn required_field_bytes(
&self,
slot: usize,
field_name: &str,
) -> Result<&[u8], InternalError> {
self.field_bytes
.field(slot)
.ok_or_else(|| InternalError::persisted_row_declared_field_missing(field_name))
}
}
const fn storage_key_from_scalar_ref(value: ScalarValueRef<'_>) -> Option<StorageKey> {
match value {
ScalarValueRef::Int(value) => Some(StorageKey::Int(value)),
ScalarValueRef::Principal(value) => Some(StorageKey::Principal(value)),
ScalarValueRef::Subaccount(value) => Some(StorageKey::Subaccount(value)),
ScalarValueRef::Timestamp(value) => Some(StorageKey::Timestamp(value)),
ScalarValueRef::Uint(value) => Some(StorageKey::Uint(value)),
ScalarValueRef::Ulid(value) => Some(StorageKey::Ulid(value)),
ScalarValueRef::Unit => Some(StorageKey::Unit),
_ => None,
}
}
fn scalar_slot_value_ref_from_cached_value(
value: &Value,
) -> Result<ScalarSlotValueRef<'_>, InternalError> {
let scalar = match value {
Value::Null => return Ok(ScalarSlotValueRef::Null),
Value::Blob(value) => ScalarValueRef::Blob(value.as_slice()),
Value::Bool(value) => ScalarValueRef::Bool(*value),
Value::Date(value) => ScalarValueRef::Date(*value),
Value::Duration(value) => ScalarValueRef::Duration(*value),
Value::Float32(value) => ScalarValueRef::Float32(*value),
Value::Float64(value) => ScalarValueRef::Float64(*value),
Value::Int(value) => ScalarValueRef::Int(*value),
Value::Principal(value) => ScalarValueRef::Principal(*value),
Value::Subaccount(value) => ScalarValueRef::Subaccount(*value),
Value::Text(value) => ScalarValueRef::Text(value.as_str()),
Value::Timestamp(value) => ScalarValueRef::Timestamp(*value),
Value::Uint(value) => ScalarValueRef::Uint(*value),
Value::Ulid(value) => ScalarValueRef::Ulid(*value),
Value::Unit => ScalarValueRef::Unit,
_ => {
return Err(InternalError::persisted_row_decode_failed(format!(
"cached structural scalar slot cannot borrow non-scalar value variant: {value:?}",
)));
}
};
Ok(ScalarSlotValueRef::Value(scalar))
}
fn materialize_scalar_slot_value(value: ScalarSlotValueRef<'_>) -> Value {
match value {
ScalarSlotValueRef::Null => Value::Null,
ScalarSlotValueRef::Value(value) => value.into_value(),
}
}
impl SlotReader for StructuralSlotReader<'_> {
fn model(&self) -> &'static EntityModel {
self.model
}
fn has(&self, slot: usize) -> bool {
self.field_bytes.field(slot).is_some()
}
fn get_bytes(&self, slot: usize) -> Option<&[u8]> {
self.field_bytes.field(slot)
}
fn get_scalar(&self, slot: usize) -> Result<Option<ScalarSlotValueRef<'_>>, InternalError> {
let field = self.field_model(slot)?;
match field.leaf_codec() {
LeafCodec::Scalar(_codec) => match self.cached_values.get(slot) {
Some(CachedSlotValue::Scalar(value)) => {
scalar_slot_value_ref_from_cached_value(value).map(Some)
}
Some(CachedSlotValue::Pending) => {
Err(InternalError::persisted_row_decode_failed(format!(
"structural scalar slot cache missing validated value after row-open validation: slot={slot}",
)))
}
Some(CachedSlotValue::Deferred { .. }) => {
Err(InternalError::persisted_row_decode_failed(format!(
"structural scalar slot routed through non-scalar cache variant: slot={slot}",
)))
}
None => Err(
InternalError::persisted_row_slot_cache_lookup_out_of_bounds(
self.model.path(),
slot,
),
),
},
LeafCodec::CborFallback => Ok(None),
}
}
fn get_value(&mut self, slot: usize) -> Result<Option<Value>, InternalError> {
self.validate_slot_into_cache(slot)?;
Ok(Some(self.required_cached_value(slot)?.clone()))
}
}
impl CanonicalSlotReader for StructuralSlotReader<'_> {
fn required_value_by_contract(&self, slot: usize) -> Result<Value, InternalError> {
Ok(self.required_cached_value(slot)?.clone())
}
fn required_value_by_contract_cow(&self, slot: usize) -> Result<Cow<'_, Value>, InternalError> {
Ok(Cow::Borrowed(self.required_cached_value(slot)?))
}
}
#[derive(Debug)]
enum CachedSlotValue {
Pending,
Scalar(Value),
Deferred { materialized: OnceCell<Value> },
}
#[cfg(any(test, feature = "structural-read-metrics"))]
#[cfg_attr(
all(test, not(feature = "structural-read-metrics")),
allow(unreachable_pub)
)]
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub struct StructuralReadMetrics {
pub rows_opened: u64,
pub declared_slots_validated: u64,
pub validated_non_scalar_slots: u64,
pub materialized_non_scalar_slots: u64,
pub rows_without_lazy_non_scalar_materializations: u64,
}
#[cfg(any(test, feature = "structural-read-metrics"))]
std::thread_local! {
static STRUCTURAL_READ_METRICS: RefCell<Option<StructuralReadMetrics>> = const {
RefCell::new(None)
};
}
#[cfg(any(test, feature = "structural-read-metrics"))]
#[derive(Debug)]
struct StructuralReadProbe {
collect: bool,
declared_slots_validated: u64,
validated_non_scalar_slots: Cell<u64>,
materialized_non_scalar_slots: Cell<u64>,
}
#[cfg(any(test, feature = "structural-read-metrics"))]
impl StructuralReadProbe {
fn begin(field_count: usize) -> Self {
let collect = STRUCTURAL_READ_METRICS.with(|metrics| metrics.borrow().is_some());
Self {
collect,
declared_slots_validated: field_count as u64,
validated_non_scalar_slots: Cell::new(0),
materialized_non_scalar_slots: Cell::new(0),
}
}
fn record_validated_non_scalar(&self) {
if !self.collect {
return;
}
self.validated_non_scalar_slots
.set(self.validated_non_scalar_slots.get().saturating_add(1));
}
fn record_materialized_non_scalar(&self) {
if !self.collect {
return;
}
self.materialized_non_scalar_slots
.set(self.materialized_non_scalar_slots.get().saturating_add(1));
}
}
#[cfg(any(test, feature = "structural-read-metrics"))]
impl Drop for StructuralSlotReader<'_> {
fn drop(&mut self) {
if !self.metrics.collect {
return;
}
let validated_non_scalar_slots = self.metrics.validated_non_scalar_slots.get();
let materialized_non_scalar_slots = self.metrics.materialized_non_scalar_slots.get();
STRUCTURAL_READ_METRICS.with(|metrics| {
if let Some(aggregate) = metrics.borrow_mut().as_mut() {
aggregate.rows_opened = aggregate.rows_opened.saturating_add(1);
aggregate.declared_slots_validated = aggregate
.declared_slots_validated
.saturating_add(self.metrics.declared_slots_validated);
aggregate.validated_non_scalar_slots = aggregate
.validated_non_scalar_slots
.saturating_add(validated_non_scalar_slots);
aggregate.materialized_non_scalar_slots = aggregate
.materialized_non_scalar_slots
.saturating_add(materialized_non_scalar_slots);
if materialized_non_scalar_slots == 0 {
aggregate.rows_without_lazy_non_scalar_materializations = aggregate
.rows_without_lazy_non_scalar_materializations
.saturating_add(1);
}
}
});
}
}
#[cfg(any(test, feature = "structural-read-metrics"))]
#[cfg_attr(
all(test, not(feature = "structural-read-metrics")),
allow(unreachable_pub)
)]
pub fn with_structural_read_metrics<T>(f: impl FnOnce() -> T) -> (T, StructuralReadMetrics) {
STRUCTURAL_READ_METRICS.with(|metrics| {
debug_assert!(
metrics.borrow().is_none(),
"structural read metrics captures should not nest"
);
*metrics.borrow_mut() = Some(StructuralReadMetrics::default());
});
let result = f();
let metrics =
STRUCTURAL_READ_METRICS.with(|metrics| metrics.borrow_mut().take().unwrap_or_default());
(result, metrics)
}
#[cfg(test)]
mod tests {
use super::{
CachedSlotValue, FieldSlot, ScalarSlotValueRef, ScalarValueRef, SerializedFieldUpdate,
SerializedPatchWriter, SerializedUpdatePatch, SlotBufferWriter, SlotReader, SlotWriter,
UpdatePatch, apply_serialized_update_patch_to_raw_row, apply_update_patch_to_raw_row,
decode_persisted_custom_many_slot_payload, decode_persisted_custom_slot_payload,
decode_persisted_non_null_slot_payload, decode_persisted_option_slot_payload,
decode_persisted_slot_payload, decode_slot_value_by_contract, decode_slot_value_from_bytes,
encode_persisted_custom_many_slot_payload, encode_persisted_custom_slot_payload,
encode_scalar_slot_value, encode_slot_payload_from_parts, encode_slot_value_from_value,
serialize_entity_slots_as_update_patch, serialize_update_patch_fields,
with_structural_read_metrics,
};
use crate::{
db::{
codec::serialize_row_payload,
data::{RawRow, StructuralSlotReader, decode_structural_value_storage_bytes},
},
error::InternalError,
model::{
EntityModel,
field::{EnumVariantModel, FieldKind, FieldModel, FieldStorageDecode},
},
serialize::serialize,
testing::SIMPLE_ENTITY_TAG,
traits::{EntitySchema, FieldValue},
types::{
Account, Date, Decimal, Duration, Float32, Float64, Int, Int128, Nat, Nat128,
Principal, Subaccount, Timestamp, Ulid,
},
value::{Value, ValueEnum},
};
use icydb_derive::{FieldProjection, PersistedRow};
use serde::{Deserialize, Serialize};
crate::test_canister! {
ident = PersistedRowPatchBridgeCanister,
commit_memory_id = crate::testing::test_commit_memory_id(),
}
crate::test_store! {
ident = PersistedRowPatchBridgeStore,
canister = PersistedRowPatchBridgeCanister,
}
#[derive(
Clone, Debug, Default, Deserialize, FieldProjection, PartialEq, PersistedRow, Serialize,
)]
struct PersistedRowPatchBridgeEntity {
id: crate::types::Ulid,
name: String,
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
struct PersistedRowProfileValue {
bio: String,
}
impl FieldValue for PersistedRowProfileValue {
fn kind() -> crate::traits::FieldValueKind {
crate::traits::FieldValueKind::Structured { queryable: false }
}
fn to_value(&self) -> Value {
Value::from_map(vec![(
Value::Text("bio".to_string()),
Value::Text(self.bio.clone()),
)])
.expect("profile test value should encode as canonical map")
}
fn from_value(value: &Value) -> Option<Self> {
let Value::Map(entries) = value else {
return None;
};
let normalized = Value::normalize_map_entries(entries.clone()).ok()?;
let bio = normalized
.iter()
.find_map(|(entry_key, entry_value)| match entry_key {
Value::Text(entry_key) if entry_key == "bio" => match entry_value {
Value::Text(bio) => Some(bio.clone()),
_ => None,
},
_ => None,
})?;
if normalized.len() != 1 {
return None;
}
Some(Self { bio })
}
}
crate::test_entity_schema! {
ident = PersistedRowPatchBridgeEntity,
id = crate::types::Ulid,
id_field = id,
entity_name = "PersistedRowPatchBridgeEntity",
entity_tag = SIMPLE_ENTITY_TAG,
pk_index = 0,
fields = [
("id", FieldKind::Ulid),
("name", FieldKind::Text),
],
indexes = [],
store = PersistedRowPatchBridgeStore,
canister = PersistedRowPatchBridgeCanister,
}
static STATE_VARIANTS: &[EnumVariantModel] = &[EnumVariantModel::new(
"Loaded",
Some(&FieldKind::Uint),
FieldStorageDecode::ByKind,
)];
static FIELD_MODELS: [FieldModel; 2] = [
FieldModel::new("name", FieldKind::Text),
FieldModel::new_with_storage_decode("payload", FieldKind::Text, FieldStorageDecode::Value),
];
static LIST_FIELD_MODELS: [FieldModel; 1] =
[FieldModel::new("tags", FieldKind::List(&FieldKind::Text))];
static MAP_FIELD_MODELS: [FieldModel; 1] = [FieldModel::new(
"props",
FieldKind::Map {
key: &FieldKind::Text,
value: &FieldKind::Uint,
},
)];
static ENUM_FIELD_MODELS: [FieldModel; 1] = [FieldModel::new(
"state",
FieldKind::Enum {
path: "tests::State",
variants: STATE_VARIANTS,
},
)];
static ACCOUNT_FIELD_MODELS: [FieldModel; 1] = [FieldModel::new("owner", FieldKind::Account)];
static REQUIRED_STRUCTURED_FIELD_MODELS: [FieldModel; 1] = [FieldModel::new(
"profile",
FieldKind::Structured { queryable: false },
)];
static OPTIONAL_STRUCTURED_FIELD_MODELS: [FieldModel; 1] =
[FieldModel::new_with_storage_decode_and_nullability(
"profile",
FieldKind::Structured { queryable: false },
FieldStorageDecode::ByKind,
true,
)];
static VALUE_STORAGE_STRUCTURED_FIELD_MODELS: [FieldModel; 1] =
[FieldModel::new_with_storage_decode(
"manifest",
FieldKind::Structured { queryable: false },
FieldStorageDecode::Value,
)];
static STRUCTURED_MAP_VALUE_KIND: FieldKind = FieldKind::Structured { queryable: false };
static STRUCTURED_MAP_VALUE_STORAGE_FIELD_MODELS: [FieldModel; 1] =
[FieldModel::new_with_storage_decode(
"projects",
FieldKind::Map {
key: &FieldKind::Principal,
value: &STRUCTURED_MAP_VALUE_KIND,
},
FieldStorageDecode::Value,
)];
static INDEX_MODELS: [&crate::model::index::IndexModel; 0] = [];
static TEST_MODEL: EntityModel = EntityModel::new(
"tests::PersistedRowFieldCodecEntity",
"persisted_row_field_codec_entity",
&FIELD_MODELS[0],
&FIELD_MODELS,
&INDEX_MODELS,
);
static LIST_MODEL: EntityModel = EntityModel::new(
"tests::PersistedRowListFieldCodecEntity",
"persisted_row_list_field_codec_entity",
&LIST_FIELD_MODELS[0],
&LIST_FIELD_MODELS,
&INDEX_MODELS,
);
static MAP_MODEL: EntityModel = EntityModel::new(
"tests::PersistedRowMapFieldCodecEntity",
"persisted_row_map_field_codec_entity",
&MAP_FIELD_MODELS[0],
&MAP_FIELD_MODELS,
&INDEX_MODELS,
);
static ENUM_MODEL: EntityModel = EntityModel::new(
"tests::PersistedRowEnumFieldCodecEntity",
"persisted_row_enum_field_codec_entity",
&ENUM_FIELD_MODELS[0],
&ENUM_FIELD_MODELS,
&INDEX_MODELS,
);
static ACCOUNT_MODEL: EntityModel = EntityModel::new(
"tests::PersistedRowAccountFieldCodecEntity",
"persisted_row_account_field_codec_entity",
&ACCOUNT_FIELD_MODELS[0],
&ACCOUNT_FIELD_MODELS,
&INDEX_MODELS,
);
static REQUIRED_STRUCTURED_MODEL: EntityModel = EntityModel::new(
"tests::PersistedRowRequiredStructuredFieldCodecEntity",
"persisted_row_required_structured_field_codec_entity",
&REQUIRED_STRUCTURED_FIELD_MODELS[0],
&REQUIRED_STRUCTURED_FIELD_MODELS,
&INDEX_MODELS,
);
static OPTIONAL_STRUCTURED_MODEL: EntityModel = EntityModel::new(
"tests::PersistedRowOptionalStructuredFieldCodecEntity",
"persisted_row_optional_structured_field_codec_entity",
&OPTIONAL_STRUCTURED_FIELD_MODELS[0],
&OPTIONAL_STRUCTURED_FIELD_MODELS,
&INDEX_MODELS,
);
static VALUE_STORAGE_STRUCTURED_MODEL: EntityModel = EntityModel::new(
"tests::PersistedRowValueStorageStructuredFieldCodecEntity",
"persisted_row_value_storage_structured_field_codec_entity",
&VALUE_STORAGE_STRUCTURED_FIELD_MODELS[0],
&VALUE_STORAGE_STRUCTURED_FIELD_MODELS,
&INDEX_MODELS,
);
static STRUCTURED_MAP_VALUE_STORAGE_MODEL: EntityModel = EntityModel::new(
"tests::PersistedRowStructuredMapValueStorageEntity",
"persisted_row_structured_map_value_storage_entity",
&STRUCTURED_MAP_VALUE_STORAGE_FIELD_MODELS[0],
&STRUCTURED_MAP_VALUE_STORAGE_FIELD_MODELS,
&INDEX_MODELS,
);
fn representative_value_storage_cases() -> Vec<Value> {
let nested = Value::from_map(vec![
(
Value::Text("blob".to_string()),
Value::Blob(vec![0x10, 0x20, 0x30]),
),
(
Value::Text("i128".to_string()),
Value::Int128(Int128::from(-123i128)),
),
(
Value::Text("u128".to_string()),
Value::Uint128(Nat128::from(456u128)),
),
(
Value::Text("enum".to_string()),
Value::Enum(
ValueEnum::new("Loaded", Some("tests::PersistedRowManifest"))
.with_payload(Value::Blob(vec![0xAA, 0xBB])),
),
),
])
.expect("nested value storage case should normalize");
vec![
Value::Account(Account::dummy(7)),
Value::Blob(vec![1u8, 2u8, 3u8]),
Value::Bool(true),
Value::Date(Date::new(2024, 1, 2)),
Value::Decimal(Decimal::new(123, 2)),
Value::Duration(Duration::from_secs(1)),
Value::Enum(
ValueEnum::new("Ready", Some("tests::PersistedRowState"))
.with_payload(nested.clone()),
),
Value::Float32(Float32::try_new(1.25).expect("float32 sample should be finite")),
Value::Float64(Float64::try_new(2.5).expect("float64 sample should be finite")),
Value::Int(-7),
Value::Int128(Int128::from(123i128)),
Value::IntBig(Int::from(99i32)),
Value::List(vec![
Value::Blob(vec![0xCC, 0xDD]),
Value::Text("nested".to_string()),
nested.clone(),
]),
nested,
Value::Null,
Value::Principal(Principal::dummy(9)),
Value::Subaccount(Subaccount::new([7u8; 32])),
Value::Text("example".to_string()),
Value::Timestamp(Timestamp::from_secs(1)),
Value::Uint(7),
Value::Uint128(Nat128::from(9u128)),
Value::UintBig(Nat::from(11u64)),
Value::Ulid(Ulid::from_u128(42)),
Value::Unit,
]
}
fn representative_structured_value_storage_cases() -> Vec<Value> {
let nested_map = Value::from_map(vec![
(
Value::Text("account".to_string()),
Value::Account(Account::dummy(7)),
),
(
Value::Text("blob".to_string()),
Value::Blob(vec![1u8, 2u8, 3u8]),
),
(Value::Text("bool".to_string()), Value::Bool(true)),
(
Value::Text("date".to_string()),
Value::Date(Date::new(2024, 1, 2)),
),
(
Value::Text("decimal".to_string()),
Value::Decimal(Decimal::new(123, 2)),
),
(
Value::Text("duration".to_string()),
Value::Duration(Duration::from_secs(1)),
),
(
Value::Text("enum".to_string()),
Value::Enum(
ValueEnum::new("Loaded", Some("tests::PersistedRowManifest"))
.with_payload(Value::Blob(vec![0xAA, 0xBB])),
),
),
(
Value::Text("f32".to_string()),
Value::Float32(Float32::try_new(1.25).expect("float32 sample should be finite")),
),
(
Value::Text("f64".to_string()),
Value::Float64(Float64::try_new(2.5).expect("float64 sample should be finite")),
),
(Value::Text("i64".to_string()), Value::Int(-7)),
(
Value::Text("i128".to_string()),
Value::Int128(Int128::from(123i128)),
),
(
Value::Text("ibig".to_string()),
Value::IntBig(Int::from(99i32)),
),
(Value::Text("null".to_string()), Value::Null),
(
Value::Text("principal".to_string()),
Value::Principal(Principal::dummy(9)),
),
(
Value::Text("subaccount".to_string()),
Value::Subaccount(Subaccount::new([7u8; 32])),
),
(
Value::Text("text".to_string()),
Value::Text("example".to_string()),
),
(
Value::Text("timestamp".to_string()),
Value::Timestamp(Timestamp::from_secs(1)),
),
(Value::Text("u64".to_string()), Value::Uint(7)),
(
Value::Text("u128".to_string()),
Value::Uint128(Nat128::from(9u128)),
),
(
Value::Text("ubig".to_string()),
Value::UintBig(Nat::from(11u64)),
),
(
Value::Text("ulid".to_string()),
Value::Ulid(Ulid::from_u128(42)),
),
(Value::Text("unit".to_string()), Value::Unit),
])
.expect("structured value-storage map should normalize");
vec![
nested_map.clone(),
Value::List(vec![
Value::Blob(vec![0xCC, 0xDD]),
Value::Text("nested".to_string()),
nested_map,
]),
]
}
fn encode_slot_payload_allowing_missing_for_tests(
model: &'static EntityModel,
slots: &[Option<&[u8]>],
) -> Result<Vec<u8>, InternalError> {
if slots.len() != model.fields().len() {
return Err(InternalError::persisted_row_encode_failed(format!(
"noncanonical slot payload test helper expected {} slots for entity '{}', found {}",
model.fields().len(),
model.path(),
slots.len()
)));
}
let mut payload_bytes = Vec::new();
let mut slot_table = Vec::with_capacity(slots.len());
for slot_payload in slots {
match slot_payload {
Some(bytes) => {
let start = u32::try_from(payload_bytes.len()).map_err(|_| {
InternalError::persisted_row_encode_failed(
"slot payload start exceeds u32 range",
)
})?;
let len = u32::try_from(bytes.len()).map_err(|_| {
InternalError::persisted_row_encode_failed(
"slot payload length exceeds u32 range",
)
})?;
payload_bytes.extend_from_slice(bytes);
slot_table.push((start, len));
}
None => slot_table.push((0_u32, 0_u32)),
}
}
encode_slot_payload_from_parts(slots.len(), slot_table.as_slice(), payload_bytes.as_slice())
}
#[test]
fn decode_slot_value_from_bytes_decodes_scalar_slots_through_one_owner() {
let payload =
encode_scalar_slot_value(ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")));
let value =
decode_slot_value_from_bytes(&TEST_MODEL, 0, payload.as_slice()).expect("decode slot");
assert_eq!(value, Value::Text("Ada".to_string()));
}
#[test]
fn decode_slot_value_from_bytes_reports_scalar_prefix_bytes() {
let err = decode_slot_value_from_bytes(&TEST_MODEL, 0, &[0x00, 1])
.expect_err("invalid scalar slot prefix should fail closed");
assert!(
err.message
.contains("expected slot envelope prefix byte 0xFF, found 0x00"),
"unexpected error: {err:?}"
);
}
#[test]
fn decode_slot_value_from_bytes_respects_value_storage_decode_contract() {
let payload = crate::serialize::serialize(&Value::Text("Ada".to_string()))
.expect("encode value-storage payload");
let value =
decode_slot_value_from_bytes(&TEST_MODEL, 1, payload.as_slice()).expect("decode slot");
assert_eq!(value, Value::Text("Ada".to_string()));
}
#[test]
fn encode_slot_value_from_value_roundtrips_scalar_slots() {
let payload = encode_slot_value_from_value(&TEST_MODEL, 0, &Value::Text("Ada".to_string()))
.expect("encode slot");
let decoded =
decode_slot_value_from_bytes(&TEST_MODEL, 0, payload.as_slice()).expect("decode slot");
assert_eq!(decoded, Value::Text("Ada".to_string()));
}
#[test]
fn encode_slot_value_from_value_roundtrips_value_storage_slots() {
let payload = encode_slot_value_from_value(&TEST_MODEL, 1, &Value::Text("Ada".to_string()))
.expect("encode slot");
let decoded =
decode_slot_value_from_bytes(&TEST_MODEL, 1, payload.as_slice()).expect("decode slot");
assert_eq!(decoded, Value::Text("Ada".to_string()));
}
#[test]
fn encode_slot_value_from_value_roundtrips_structured_value_storage_slots_for_all_cases() {
for value in representative_structured_value_storage_cases() {
let payload = encode_slot_value_from_value(&VALUE_STORAGE_STRUCTURED_MODEL, 0, &value)
.unwrap_or_else(|err| {
panic!(
"structured value-storage slot should encode for value {value:?}: {err:?}"
)
});
let decoded = decode_slot_value_from_bytes(
&VALUE_STORAGE_STRUCTURED_MODEL,
0,
payload.as_slice(),
)
.unwrap_or_else(|err| {
panic!(
"structured value-storage slot should decode for value {value:?} with payload {payload:?}: {err:?}"
)
});
assert_eq!(decoded, value);
}
}
#[test]
fn encode_slot_value_from_value_roundtrips_list_by_kind_slots() {
let payload = encode_slot_value_from_value(
&LIST_MODEL,
0,
&Value::List(vec![Value::Text("alpha".to_string())]),
)
.expect("encode list slot");
let decoded =
decode_slot_value_from_bytes(&LIST_MODEL, 0, payload.as_slice()).expect("decode slot");
assert_eq!(decoded, Value::List(vec![Value::Text("alpha".to_string())]),);
}
#[test]
fn encode_slot_value_from_value_roundtrips_map_by_kind_slots() {
let payload = encode_slot_value_from_value(
&MAP_MODEL,
0,
&Value::Map(vec![(Value::Text("alpha".to_string()), Value::Uint(7))]),
)
.expect("encode map slot");
let decoded =
decode_slot_value_from_bytes(&MAP_MODEL, 0, payload.as_slice()).expect("decode slot");
assert_eq!(
decoded,
Value::Map(vec![(Value::Text("alpha".to_string()), Value::Uint(7))]),
);
}
#[test]
fn encode_slot_value_from_value_accepts_value_storage_maps_with_structured_values() {
let principal = Principal::dummy(7);
let project = Value::from_map(vec![
(Value::Text("pid".to_string()), Value::Principal(principal)),
(
Value::Text("status".to_string()),
Value::Enum(ValueEnum::new(
"Saved",
Some("design::app::user::customise::project::ProjectStatus"),
)),
),
])
.expect("project value should normalize into a canonical map");
let projects = Value::from_map(vec![(Value::Principal(principal), project)])
.expect("outer map should normalize into a canonical map");
let payload =
encode_slot_value_from_value(&STRUCTURED_MAP_VALUE_STORAGE_MODEL, 0, &projects)
.expect("encode structured map slot");
let decoded = decode_slot_value_from_bytes(
&STRUCTURED_MAP_VALUE_STORAGE_MODEL,
0,
payload.as_slice(),
)
.expect("decode structured map slot");
assert_eq!(decoded, projects);
}
#[test]
fn structured_value_storage_cases_decode_through_direct_value_storage_boundary() {
for value in representative_value_storage_cases() {
let payload = serialize(&value).unwrap_or_else(|err| {
panic!(
"structured value-storage payload should serialize for value {value:?}: {err:?}"
)
});
let decoded = decode_structural_value_storage_bytes(payload.as_slice()).unwrap_or_else(
|err| {
panic!(
"structured value-storage payload should decode for value {value:?} with payload {payload:?}: {err:?}"
)
},
);
assert_eq!(decoded, value);
}
}
#[test]
fn encode_slot_value_from_value_roundtrips_enum_by_kind_slots() {
let payload = encode_slot_value_from_value(
&ENUM_MODEL,
0,
&Value::Enum(
ValueEnum::new("Loaded", Some("tests::State")).with_payload(Value::Uint(7)),
),
)
.expect("encode enum slot");
let decoded =
decode_slot_value_from_bytes(&ENUM_MODEL, 0, payload.as_slice()).expect("decode slot");
assert_eq!(
decoded,
Value::Enum(
ValueEnum::new("Loaded", Some("tests::State")).with_payload(Value::Uint(7,))
),
);
}
#[test]
fn encode_slot_value_from_value_roundtrips_leaf_by_kind_wrapper_slots() {
let account = Account::from_parts(Principal::dummy(7), Some(Subaccount::from([7_u8; 32])));
let payload = encode_slot_value_from_value(&ACCOUNT_MODEL, 0, &Value::Account(account))
.expect("encode account slot");
let decoded = decode_slot_value_from_bytes(&ACCOUNT_MODEL, 0, payload.as_slice())
.expect("decode slot");
assert_eq!(decoded, Value::Account(account));
}
#[test]
fn custom_slot_payload_roundtrips_structured_field_value() {
let profile = PersistedRowProfileValue {
bio: "Ada".to_string(),
};
let payload = encode_persisted_custom_slot_payload(&profile, "profile")
.expect("encode custom structured payload");
let decoded = decode_persisted_custom_slot_payload::<PersistedRowProfileValue>(
payload.as_slice(),
"profile",
)
.expect("decode custom structured payload");
assert_eq!(decoded, profile);
assert_eq!(
decode_persisted_slot_payload::<Value>(payload.as_slice(), "profile")
.expect("decode raw value payload"),
profile.to_value(),
);
}
#[test]
fn custom_many_slot_payload_roundtrips_structured_value_lists() {
let profiles = vec![
PersistedRowProfileValue {
bio: "Ada".to_string(),
},
PersistedRowProfileValue {
bio: "Grace".to_string(),
},
];
let payload = encode_persisted_custom_many_slot_payload(profiles.as_slice(), "profiles")
.expect("encode custom structured list payload");
let decoded = decode_persisted_custom_many_slot_payload::<PersistedRowProfileValue>(
payload.as_slice(),
"profiles",
)
.expect("decode custom structured list payload");
assert_eq!(decoded, profiles);
}
#[test]
fn decode_persisted_non_null_slot_payload_rejects_null_for_required_structured_fields() {
let err =
decode_persisted_non_null_slot_payload::<PersistedRowProfileValue>(&[0xF6], "profile")
.expect_err("required structured payload must reject null");
assert!(
err.message
.contains("unexpected null for non-nullable field"),
"unexpected error: {err:?}"
);
}
#[test]
fn decode_persisted_option_slot_payload_treats_cbor_null_as_none() {
let decoded =
decode_persisted_option_slot_payload::<PersistedRowProfileValue>(&[0xF6], "profile")
.expect("optional structured payload should decode");
assert_eq!(decoded, None);
}
#[test]
fn encode_slot_value_from_value_rejects_null_for_required_structured_slots() {
let err = encode_slot_value_from_value(&REQUIRED_STRUCTURED_MODEL, 0, &Value::Null)
.expect_err("required structured slot must reject null");
assert!(
err.message.contains("required field cannot store null"),
"unexpected error: {err:?}"
);
}
#[test]
fn encode_slot_value_from_value_allows_null_for_optional_structured_slots() {
let payload = encode_slot_value_from_value(&OPTIONAL_STRUCTURED_MODEL, 0, &Value::Null)
.expect("optional structured slot should allow null");
let decoded =
decode_slot_value_from_bytes(&OPTIONAL_STRUCTURED_MODEL, 0, payload.as_slice())
.expect("optional structured slot should decode");
assert_eq!(decoded, Value::Null);
}
#[test]
fn encode_slot_value_from_value_rejects_unknown_enum_payload_variants() {
let err = encode_slot_value_from_value(
&ENUM_MODEL,
0,
&Value::Enum(
ValueEnum::new("Unknown", Some("tests::State")).with_payload(Value::Uint(7)),
),
)
.expect_err("unknown enum payload should fail closed");
assert!(err.message.contains("unknown enum variant"));
}
#[test]
fn structural_slot_reader_and_direct_decode_share_the_same_field_codec_boundary() {
let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
.expect("encode value-storage payload");
writer
.write_scalar(0, ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")))
.expect("write scalar slot");
writer
.write_slot(1, Some(payload.as_slice()))
.expect("write value-storage slot");
let raw_row = RawRow::try_new(
serialize_row_payload(writer.finish().expect("finish slot payload"))
.expect("serialize row payload"),
)
.expect("build raw row");
let direct_slots =
StructuralSlotReader::from_raw_row(&raw_row, &TEST_MODEL).expect("decode row");
let mut cached_slots =
StructuralSlotReader::from_raw_row(&raw_row, &TEST_MODEL).expect("decode row");
let direct_name = decode_slot_value_by_contract(&direct_slots, 0).expect("decode name");
let direct_payload =
decode_slot_value_by_contract(&direct_slots, 1).expect("decode payload");
let cached_name = cached_slots.get_value(0).expect("cached name");
let cached_payload = cached_slots.get_value(1).expect("cached payload");
assert_eq!(direct_name, cached_name);
assert_eq!(direct_payload, cached_payload);
}
#[test]
fn structural_slot_reader_validates_declared_slots_but_defers_non_scalar_materialization() {
let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
.expect("encode value-storage payload");
writer
.write_scalar(0, ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")))
.expect("write scalar slot");
writer
.write_slot(1, Some(payload.as_slice()))
.expect("write value-storage slot");
let raw_row = RawRow::try_new(
serialize_row_payload(writer.finish().expect("finish slot payload"))
.expect("serialize row payload"),
)
.expect("build raw row");
let mut reader = StructuralSlotReader::from_raw_row(&raw_row, &TEST_MODEL)
.expect("row-open validation should succeed");
match &reader.cached_values[0] {
CachedSlotValue::Scalar(value) => {
assert_eq!(
value,
&Value::Text("Ada".to_string()),
"scalar slot should stay on the validated scalar fast path",
);
}
other => panic!("expected validated scalar cache for slot 0, found {other:?}"),
}
match &reader.cached_values[1] {
CachedSlotValue::Deferred { materialized } => {
assert!(
materialized.get().is_none(),
"non-scalar slot should validate at row-open without eagerly materializing a runtime Value",
);
}
other => panic!("expected deferred cache for slot 1, found {other:?}"),
}
assert_eq!(
reader.get_value(1).expect("decode deferred slot"),
Some(Value::Text("payload".to_string()))
);
match &reader.cached_values[1] {
CachedSlotValue::Deferred { materialized } => {
assert_eq!(
materialized.get(),
Some(&Value::Text("payload".to_string())),
"non-scalar slot should materialize on first semantic access",
);
}
other => panic!("expected deferred cache for slot 1, found {other:?}"),
}
}
#[test]
fn structural_slot_reader_metrics_report_zero_non_scalar_materializations_for_scalar_only_access()
{
let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
.expect("encode value-storage payload");
writer
.write_scalar(0, ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")))
.expect("write scalar slot");
writer
.write_slot(1, Some(payload.as_slice()))
.expect("write value-storage slot");
let raw_row = RawRow::try_new(
serialize_row_payload(writer.finish().expect("finish slot payload"))
.expect("serialize row payload"),
)
.expect("build raw row");
let (_scalar_read, metrics) = with_structural_read_metrics(|| {
let reader = StructuralSlotReader::from_raw_row(&raw_row, &TEST_MODEL)
.expect("row-open validation should succeed");
matches!(
reader
.get_scalar(0)
.expect("scalar fast path should succeed"),
Some(ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")))
)
});
assert_eq!(metrics.rows_opened, 1);
assert_eq!(metrics.declared_slots_validated, 2);
assert_eq!(metrics.validated_non_scalar_slots, 1);
assert_eq!(
metrics.materialized_non_scalar_slots, 0,
"scalar-only access should not materialize the unused value-storage slot",
);
assert_eq!(metrics.rows_without_lazy_non_scalar_materializations, 1);
}
#[test]
fn structural_slot_reader_metrics_report_one_non_scalar_materialization_on_first_semantic_access()
{
let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
.expect("encode value-storage payload");
writer
.write_scalar(0, ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")))
.expect("write scalar slot");
writer
.write_slot(1, Some(payload.as_slice()))
.expect("write value-storage slot");
let raw_row = RawRow::try_new(
serialize_row_payload(writer.finish().expect("finish slot payload"))
.expect("serialize row payload"),
)
.expect("build raw row");
let (_value, metrics) = with_structural_read_metrics(|| {
let mut reader = StructuralSlotReader::from_raw_row(&raw_row, &TEST_MODEL)
.expect("row-open validation should succeed");
reader
.get_value(1)
.expect("deferred slot should materialize")
});
assert_eq!(metrics.rows_opened, 1);
assert_eq!(metrics.declared_slots_validated, 2);
assert_eq!(metrics.validated_non_scalar_slots, 1);
assert_eq!(
metrics.materialized_non_scalar_slots, 1,
"first semantic access should materialize the value-storage slot exactly once",
);
assert_eq!(metrics.rows_without_lazy_non_scalar_materializations, 0);
}
#[test]
fn structural_slot_reader_rejects_malformed_unused_value_storage_slot_at_row_open() {
let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
writer
.write_scalar(0, ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")))
.expect("write scalar slot");
writer
.write_slot(1, Some(&[0xFF]))
.expect("write malformed value-storage slot");
let raw_row = RawRow::try_new(
serialize_row_payload(writer.finish().expect("finish slot payload"))
.expect("serialize row payload"),
)
.expect("build raw row");
let err = StructuralSlotReader::from_raw_row(&raw_row, &TEST_MODEL)
.err()
.expect("malformed unused value-storage slot must still fail at row-open");
assert!(
err.message.contains("field 'payload'"),
"unexpected error: {err:?}"
);
}
#[test]
fn apply_update_patch_to_raw_row_updates_only_targeted_slots() {
let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
.expect("encode value-storage payload");
writer
.write_scalar(0, ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")))
.expect("write scalar slot");
writer
.write_slot(1, Some(payload.as_slice()))
.expect("write value-storage slot");
let raw_row = RawRow::try_new(
serialize_row_payload(writer.finish().expect("finish slot payload"))
.expect("serialize row payload"),
)
.expect("build raw row");
let patch = UpdatePatch::new().set(
FieldSlot::from_index(&TEST_MODEL, 0).expect("resolve slot"),
Value::Text("Grace".to_string()),
);
let patched =
apply_update_patch_to_raw_row(&TEST_MODEL, &raw_row, &patch).expect("apply patch");
let mut reader =
StructuralSlotReader::from_raw_row(&patched, &TEST_MODEL).expect("decode row");
assert_eq!(
reader.get_value(0).expect("decode slot"),
Some(Value::Text("Grace".to_string()))
);
assert_eq!(
reader.get_value(1).expect("decode slot"),
Some(Value::Text("payload".to_string()))
);
}
#[test]
fn serialize_update_patch_fields_encodes_canonical_slot_payloads() {
let patch = UpdatePatch::new()
.set(
FieldSlot::from_index(&TEST_MODEL, 0).expect("resolve slot"),
Value::Text("Grace".to_string()),
)
.set(
FieldSlot::from_index(&TEST_MODEL, 1).expect("resolve slot"),
Value::Text("payload".to_string()),
);
let serialized =
serialize_update_patch_fields(&TEST_MODEL, &patch).expect("serialize patch");
assert_eq!(serialized.entries().len(), 2);
assert_eq!(
decode_slot_value_from_bytes(
&TEST_MODEL,
serialized.entries()[0].slot().index(),
serialized.entries()[0].payload(),
)
.expect("decode slot payload"),
Value::Text("Grace".to_string())
);
assert_eq!(
decode_slot_value_from_bytes(
&TEST_MODEL,
serialized.entries()[1].slot().index(),
serialized.entries()[1].payload(),
)
.expect("decode slot payload"),
Value::Text("payload".to_string())
);
}
#[test]
fn serialized_patch_writer_rejects_clear_slots() {
let mut writer = SerializedPatchWriter::for_model(&TEST_MODEL);
let err = writer
.write_slot(0, None)
.expect_err("0.65 patch staging must reject missing-slot clears");
assert!(
err.message
.contains("serialized patch writer cannot clear slot 0"),
"unexpected error: {err:?}"
);
assert!(
err.message.contains(TEST_MODEL.path()),
"unexpected error: {err:?}"
);
}
#[test]
fn slot_buffer_writer_rejects_clear_slots() {
let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
let err = writer
.write_slot(0, None)
.expect_err("canonical row staging must reject missing-slot clears");
assert!(
err.message
.contains("slot buffer writer cannot clear slot 0"),
"unexpected error: {err:?}"
);
assert!(
err.message.contains(TEST_MODEL.path()),
"unexpected error: {err:?}"
);
}
#[test]
fn apply_update_patch_to_raw_row_uses_last_write_wins() {
let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
.expect("encode value-storage payload");
writer
.write_scalar(0, ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")))
.expect("write scalar slot");
writer
.write_slot(1, Some(payload.as_slice()))
.expect("write value-storage slot");
let raw_row = RawRow::try_new(
serialize_row_payload(writer.finish().expect("finish slot payload"))
.expect("serialize row payload"),
)
.expect("build raw row");
let slot = FieldSlot::from_index(&TEST_MODEL, 0).expect("resolve slot");
let patch = UpdatePatch::new()
.set(slot, Value::Text("Grace".to_string()))
.set(slot, Value::Text("Lin".to_string()));
let patched =
apply_update_patch_to_raw_row(&TEST_MODEL, &raw_row, &patch).expect("apply patch");
let mut reader =
StructuralSlotReader::from_raw_row(&patched, &TEST_MODEL).expect("decode row");
assert_eq!(
reader.get_value(0).expect("decode slot"),
Some(Value::Text("Lin".to_string()))
);
}
#[test]
fn apply_update_patch_to_raw_row_rejects_noncanonical_missing_slot_baseline() {
let empty_slots = vec![None::<&[u8]>; TEST_MODEL.fields().len()];
let raw_row = RawRow::try_new(
serialize_row_payload(
encode_slot_payload_allowing_missing_for_tests(&TEST_MODEL, empty_slots.as_slice())
.expect("encode malformed slot payload"),
)
.expect("serialize row payload"),
)
.expect("build raw row");
let patch = UpdatePatch::new().set(
FieldSlot::from_index(&TEST_MODEL, 1).expect("resolve slot"),
Value::Text("payload".to_string()),
);
let err = apply_update_patch_to_raw_row(&TEST_MODEL, &raw_row, &patch)
.expect_err("noncanonical rows with missing slots must fail closed");
assert_eq!(err.message, "row decode: missing slot payload: slot=0");
}
#[test]
fn apply_serialized_update_patch_to_raw_row_rejects_noncanonical_scalar_baseline() {
let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
.expect("encode value-storage payload");
let malformed_slots = [Some([0xF6].as_slice()), Some(payload.as_slice())];
let raw_row = RawRow::try_new(
serialize_row_payload(
encode_slot_payload_allowing_missing_for_tests(&TEST_MODEL, &malformed_slots)
.expect("encode malformed slot payload"),
)
.expect("serialize row payload"),
)
.expect("build raw row");
let patch = UpdatePatch::new().set(
FieldSlot::from_index(&TEST_MODEL, 1).expect("resolve slot"),
Value::Text("patched".to_string()),
);
let serialized =
serialize_update_patch_fields(&TEST_MODEL, &patch).expect("serialize patch");
let err = apply_serialized_update_patch_to_raw_row(&TEST_MODEL, &raw_row, &serialized)
.expect_err("noncanonical scalar baseline must fail closed");
assert!(
err.message.contains("field 'name'"),
"unexpected error: {err:?}"
);
assert!(
err.message
.contains("expected slot envelope prefix byte 0xFF, found 0xF6"),
"unexpected error: {err:?}"
);
}
#[test]
fn apply_serialized_update_patch_to_raw_row_rejects_noncanonical_scalar_patch_payload() {
let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
.expect("encode value-storage payload");
writer
.write_scalar(0, ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")))
.expect("write scalar slot");
writer
.write_slot(1, Some(payload.as_slice()))
.expect("write value-storage slot");
let raw_row = RawRow::try_new(
serialize_row_payload(writer.finish().expect("finish slot payload"))
.expect("serialize row payload"),
)
.expect("build raw row");
let serialized = SerializedUpdatePatch::new(vec![SerializedFieldUpdate::new(
FieldSlot::from_index(&TEST_MODEL, 0).expect("resolve slot"),
vec![0xF6],
)]);
let err = apply_serialized_update_patch_to_raw_row(&TEST_MODEL, &raw_row, &serialized)
.expect_err("noncanonical serialized patch payload must fail closed");
assert!(
err.message.contains("field 'name'"),
"unexpected error: {err:?}"
);
assert!(
err.message
.contains("expected slot envelope prefix byte 0xFF, found 0xF6"),
"unexpected error: {err:?}"
);
}
#[test]
fn structural_slot_reader_rejects_slot_count_mismatch() {
let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
.expect("encode payload");
writer
.write_scalar(0, ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")))
.expect("write scalar slot");
writer
.write_slot(1, Some(payload.as_slice()))
.expect("write payload slot");
let mut payload = writer.finish().expect("finish slot payload");
payload[..2].copy_from_slice(&1_u16.to_be_bytes());
let raw_row =
RawRow::try_new(serialize_row_payload(payload).expect("serialize row payload"))
.expect("build raw row");
let err = StructuralSlotReader::from_raw_row(&raw_row, &TEST_MODEL)
.err()
.expect("slot-count drift must fail closed");
assert_eq!(
err.message,
"row decode: slot count mismatch: expected 2, found 1"
);
}
#[test]
fn structural_slot_reader_rejects_slot_span_exceeds_payload_length() {
let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
.expect("encode payload");
writer
.write_scalar(0, ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")))
.expect("write scalar slot");
writer
.write_slot(1, Some(payload.as_slice()))
.expect("write payload slot");
let mut payload = writer.finish().expect("finish slot payload");
payload[14..18].copy_from_slice(&u32::MAX.to_be_bytes());
let raw_row =
RawRow::try_new(serialize_row_payload(payload).expect("serialize row payload"))
.expect("build raw row");
let err = StructuralSlotReader::from_raw_row(&raw_row, &TEST_MODEL)
.err()
.expect("slot span drift must fail closed");
assert_eq!(err.message, "row decode: slot span exceeds payload length");
}
#[test]
fn apply_serialized_update_patch_to_raw_row_replays_preencoded_slots() {
let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
.expect("encode value-storage payload");
writer
.write_scalar(0, ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")))
.expect("write scalar slot");
writer
.write_slot(1, Some(payload.as_slice()))
.expect("write value-storage slot");
let raw_row = RawRow::try_new(
serialize_row_payload(writer.finish().expect("finish slot payload"))
.expect("serialize row payload"),
)
.expect("build raw row");
let patch = UpdatePatch::new().set(
FieldSlot::from_index(&TEST_MODEL, 0).expect("resolve slot"),
Value::Text("Grace".to_string()),
);
let serialized =
serialize_update_patch_fields(&TEST_MODEL, &patch).expect("serialize patch");
let patched = raw_row
.apply_serialized_update_patch(&TEST_MODEL, &serialized)
.expect("apply serialized patch");
let mut reader =
StructuralSlotReader::from_raw_row(&patched, &TEST_MODEL).expect("decode row");
assert_eq!(
reader.get_value(0).expect("decode slot"),
Some(Value::Text("Grace".to_string()))
);
}
#[test]
fn serialize_entity_slots_as_update_patch_replays_full_typed_after_image() {
let old_entity = PersistedRowPatchBridgeEntity {
id: crate::types::Ulid::from_u128(7),
name: "Ada".to_string(),
};
let new_entity = PersistedRowPatchBridgeEntity {
id: crate::types::Ulid::from_u128(7),
name: "Grace".to_string(),
};
let raw_row = RawRow::from_entity(&old_entity).expect("encode old row");
let old_decoded = raw_row
.try_decode::<PersistedRowPatchBridgeEntity>()
.expect("decode old entity");
let serialized =
serialize_entity_slots_as_update_patch(&new_entity).expect("serialize entity patch");
let direct =
RawRow::from_serialized_update_patch(PersistedRowPatchBridgeEntity::MODEL, &serialized)
.expect("direct row emission should succeed");
let patched = raw_row
.apply_serialized_update_patch(PersistedRowPatchBridgeEntity::MODEL, &serialized)
.expect("apply serialized patch");
let decoded = patched
.try_decode::<PersistedRowPatchBridgeEntity>()
.expect("decode patched entity");
assert_eq!(
direct, patched,
"fresh row emission and replayed full-image patch must converge on identical bytes",
);
assert_eq!(old_decoded, old_entity);
assert_eq!(decoded, new_entity);
}
#[test]
fn canonical_row_from_raw_row_replays_canonical_full_image_bytes() {
let entity = PersistedRowPatchBridgeEntity {
id: crate::types::Ulid::from_u128(11),
name: "Ada".to_string(),
};
let raw_row = RawRow::from_entity(&entity).expect("encode canonical row");
let canonical =
super::canonical_row_from_raw_row(PersistedRowPatchBridgeEntity::MODEL, &raw_row)
.expect("canonical re-emission should succeed");
assert_eq!(
canonical.as_bytes(),
raw_row.as_bytes(),
"canonical raw-row rebuild must preserve already canonical row bytes",
);
}
#[test]
fn canonical_row_from_raw_row_rejects_noncanonical_scalar_payload() {
let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
.expect("encode value-storage payload");
let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
writer
.write_slot(0, Some(&[0xF6]))
.expect("write malformed scalar slot");
writer
.write_slot(1, Some(payload.as_slice()))
.expect("write value-storage slot");
let raw_row = RawRow::try_new(
serialize_row_payload(writer.finish().expect("finish slot payload"))
.expect("serialize malformed row"),
)
.expect("build malformed raw row");
let err = super::canonical_row_from_raw_row(&TEST_MODEL, &raw_row)
.expect_err("canonical raw-row rebuild must reject malformed scalar payloads");
assert!(
err.message.contains("field 'name'"),
"unexpected error: {err:?}"
);
assert!(
err.message
.contains("expected slot envelope prefix byte 0xFF, found 0xF6"),
"unexpected error: {err:?}"
);
}
#[test]
fn raw_row_from_serialized_update_patch_rejects_noncanonical_scalar_payload() {
let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
.expect("encode value-storage payload");
let serialized = SerializedUpdatePatch::new(vec![
SerializedFieldUpdate::new(
FieldSlot::from_index(&TEST_MODEL, 0).expect("resolve slot"),
vec![0xF6],
),
SerializedFieldUpdate::new(
FieldSlot::from_index(&TEST_MODEL, 1).expect("resolve slot"),
payload,
),
]);
let err = RawRow::from_serialized_update_patch(&TEST_MODEL, &serialized)
.expect_err("fresh row emission must reject noncanonical serialized patch payloads");
assert!(
err.message.contains("field 'name'"),
"unexpected error: {err:?}"
);
assert!(
err.message
.contains("expected slot envelope prefix byte 0xFF, found 0xF6"),
"unexpected error: {err:?}"
);
}
#[test]
fn raw_row_from_serialized_update_patch_rejects_incomplete_slot_image() {
let serialized = SerializedUpdatePatch::new(vec![SerializedFieldUpdate::new(
FieldSlot::from_index(&TEST_MODEL, 1).expect("resolve slot"),
crate::serialize::serialize(&Value::Text("payload".to_string()))
.expect("encode value-storage payload"),
)]);
let err = RawRow::from_serialized_update_patch(&TEST_MODEL, &serialized)
.expect_err("fresh row emission must reject missing declared slots");
assert!(
err.message.contains("serialized patch did not emit slot 0"),
"unexpected error: {err:?}"
);
}
}