Skip to main content

icydb_core/db/data/
persisted_row.rs

1//! Module: data::persisted_row
2//! Responsibility: slot-oriented persisted-row seams over runtime row bytes.
3//! Does not own: row envelope versions, typed entity materialization, or query semantics.
4//! Boundary: commit/index planning, row writes, and typed materialization all
5//! consume the canonical slot-oriented persisted-row boundary here.
6
7use crate::{
8    db::{
9        codec::serialize_row_payload,
10        data::{
11            CanonicalRow, DataKey, RawRow, StructuralRowDecodeError, StructuralRowFieldBytes,
12            decode_storage_key_field_bytes, decode_structural_field_by_kind_bytes,
13            decode_structural_value_storage_bytes,
14        },
15        scalar_expr::compile_scalar_literal_expr_value,
16        schema::{field_type_from_model_kind, literal_matches_type},
17    },
18    error::InternalError,
19    model::{
20        entity::{EntityModel, resolve_field_slot, resolve_primary_key_slot},
21        field::{FieldKind, FieldModel, FieldStorageDecode, LeafCodec, ScalarCodec},
22    },
23    serialize::{deserialize, serialize},
24    traits::{EntityKind, FieldValue, field_value_vec_from_value},
25    types::{Blob, Date, Duration, Float32, Float64, Principal, Subaccount, Timestamp, Ulid, Unit},
26    value::{StorageKey, Value, ValueEnum},
27};
28use serde_cbor::{Value as CborValue, value::to_value as to_cbor_value};
29use std::{cmp::Ordering, collections::BTreeMap, str};
30
31const SCALAR_SLOT_PREFIX: u8 = 0xFF;
32const SCALAR_SLOT_TAG_NULL: u8 = 0;
33const SCALAR_SLOT_TAG_VALUE: u8 = 1;
34const CBOR_NULL_PAYLOAD: [u8; 1] = [0xF6];
35
36const SCALAR_BOOL_PAYLOAD_LEN: usize = 1;
37const SCALAR_WORD32_PAYLOAD_LEN: usize = 4;
38const SCALAR_WORD64_PAYLOAD_LEN: usize = 8;
39const SCALAR_ULID_PAYLOAD_LEN: usize = 16;
40const SCALAR_SUBACCOUNT_PAYLOAD_LEN: usize = 32;
41
42const SCALAR_BOOL_FALSE_TAG: u8 = 0;
43const SCALAR_BOOL_TRUE_TAG: u8 = 1;
44
45///
46/// FieldSlot
47///
48/// FieldSlot
49///
50/// FieldSlot is the structural stable slot reference used by the `0.64`
51/// patching path.
52/// It intentionally carries only the model-local slot index so field-level
53/// mutation stays structural instead of reintroducing typed entity helpers.
54///
55
56#[allow(dead_code)]
57#[derive(Clone, Copy, Debug, Eq, PartialEq)]
58pub(in crate::db) struct FieldSlot {
59    index: usize,
60}
61
62#[allow(dead_code)]
63impl FieldSlot {
64    /// Resolve one stable field slot by runtime field name.
65    #[must_use]
66    pub(in crate::db) fn resolve(model: &'static EntityModel, field_name: &str) -> Option<Self> {
67        resolve_field_slot(model, field_name).map(|index| Self { index })
68    }
69
70    /// Build one stable field slot from an already validated index.
71    pub(in crate::db) fn from_index(
72        model: &'static EntityModel,
73        index: usize,
74    ) -> Result<Self, InternalError> {
75        field_model_for_slot(model, index)?;
76
77        Ok(Self { index })
78    }
79
80    /// Return the stable slot index inside `EntityModel::fields`.
81    #[must_use]
82    pub(in crate::db) const fn index(self) -> usize {
83        self.index
84    }
85}
86
87///
88/// FieldUpdate
89///
90/// FieldUpdate
91///
92/// FieldUpdate carries one ordered field-level mutation over the structural
93/// persisted-row boundary.
94/// `UpdatePatch` applies these entries in order and last write wins for the
95/// same slot.
96///
97
98#[allow(dead_code)]
99#[derive(Clone, Debug, Eq, PartialEq)]
100pub(in crate::db) struct FieldUpdate {
101    slot: FieldSlot,
102    value: Value,
103}
104
105#[allow(dead_code)]
106impl FieldUpdate {
107    /// Build one field-level structural update.
108    #[must_use]
109    pub(in crate::db) const fn new(slot: FieldSlot, value: Value) -> Self {
110        Self { slot, value }
111    }
112
113    /// Return the stable target slot.
114    #[must_use]
115    pub(in crate::db) const fn slot(&self) -> FieldSlot {
116        self.slot
117    }
118
119    /// Return the runtime value payload for this update.
120    #[must_use]
121    pub(in crate::db) const fn value(&self) -> &Value {
122        &self.value
123    }
124}
125
126///
127/// UpdatePatch
128///
129/// UpdatePatch
130///
131/// UpdatePatch is the ordered structural mutation program applied to one
132/// persisted row.
133/// This is the phase-1 `0.64` patch container: it updates slot values
134/// structurally and then re-encodes the full row.
135///
136
137#[derive(Clone, Debug, Default, Eq, PartialEq)]
138pub struct UpdatePatch {
139    entries: Vec<FieldUpdate>,
140}
141
142impl UpdatePatch {
143    /// Build one empty patch.
144    #[must_use]
145    pub const fn new() -> Self {
146        Self {
147            entries: Vec::new(),
148        }
149    }
150
151    /// Append one structural field update in declaration order.
152    #[must_use]
153    pub(in crate::db) fn set(mut self, slot: FieldSlot, value: Value) -> Self {
154        self.entries.push(FieldUpdate::new(slot, value));
155        self
156    }
157
158    /// Resolve one field name and append its structural update.
159    pub fn set_field(
160        self,
161        model: &'static EntityModel,
162        field_name: &str,
163        value: Value,
164    ) -> Result<Self, InternalError> {
165        let Some(slot) = FieldSlot::resolve(model, field_name) else {
166            return Err(InternalError::mutation_structural_field_unknown(
167                model.path(),
168                field_name,
169            ));
170        };
171
172        Ok(self.set(slot, value))
173    }
174
175    /// Borrow the ordered field updates carried by this patch.
176    #[must_use]
177    pub(in crate::db) const fn entries(&self) -> &[FieldUpdate] {
178        self.entries.as_slice()
179    }
180
181    /// Return whether this patch carries no field updates.
182    #[must_use]
183    pub(in crate::db) const fn is_empty(&self) -> bool {
184        self.entries.is_empty()
185    }
186}
187
188///
189/// SerializedFieldUpdate
190///
191/// SerializedFieldUpdate
192///
193/// SerializedFieldUpdate carries one ordered field-level mutation after the
194/// owning persisted-row field codec has already lowered the runtime `Value`
195/// into canonical slot payload bytes.
196/// This lets later patch-application stages consume one mechanical slot-patch
197/// artifact instead of rebuilding per-field encode dispatch.
198///
199
200#[allow(dead_code)]
201#[derive(Clone, Debug, Eq, PartialEq)]
202pub(in crate::db) struct SerializedFieldUpdate {
203    slot: FieldSlot,
204    payload: Vec<u8>,
205}
206
207#[allow(dead_code)]
208impl SerializedFieldUpdate {
209    /// Build one serialized structural field update.
210    #[must_use]
211    pub(in crate::db) const fn new(slot: FieldSlot, payload: Vec<u8>) -> Self {
212        Self { slot, payload }
213    }
214
215    /// Return the stable target slot.
216    #[must_use]
217    pub(in crate::db) const fn slot(&self) -> FieldSlot {
218        self.slot
219    }
220
221    /// Borrow the canonical slot payload bytes for this update when present.
222    #[must_use]
223    pub(in crate::db) const fn payload(&self) -> &[u8] {
224        self.payload.as_slice()
225    }
226}
227
228///
229/// SerializedUpdatePatch
230///
231/// SerializedUpdatePatch
232///
233/// SerializedUpdatePatch is the canonical serialized form of `UpdatePatch`
234/// over persisted-row slot payload bytes.
235/// This is the structural patch artifact later write-path stages can stage or
236/// replay without re-entering field-contract encode logic.
237///
238
239#[allow(dead_code)]
240#[derive(Clone, Debug, Default, Eq, PartialEq)]
241pub(in crate::db) struct SerializedUpdatePatch {
242    entries: Vec<SerializedFieldUpdate>,
243}
244
245#[allow(dead_code)]
246impl SerializedUpdatePatch {
247    /// Build one serialized patch from already encoded slot payloads.
248    #[must_use]
249    pub(in crate::db) const fn new(entries: Vec<SerializedFieldUpdate>) -> Self {
250        Self { entries }
251    }
252
253    /// Borrow the ordered serialized field updates carried by this patch.
254    #[must_use]
255    pub(in crate::db) const fn entries(&self) -> &[SerializedFieldUpdate] {
256        self.entries.as_slice()
257    }
258
259    /// Return whether this serialized patch carries no field updates.
260    #[must_use]
261    pub(in crate::db) const fn is_empty(&self) -> bool {
262        self.entries.is_empty()
263    }
264}
265
266///
267/// SlotReader
268///
269/// SlotReader exposes one persisted row as stable slot-addressable fields.
270/// Callers may inspect field presence, borrow raw field bytes, or decode one
271/// field value on demand.
272///
273
274pub trait SlotReader {
275    /// Return the structural model that owns this slot mapping.
276    fn model(&self) -> &'static EntityModel;
277
278    /// Return whether the given slot is present in the persisted row.
279    fn has(&self, slot: usize) -> bool;
280
281    /// Borrow the raw persisted payload for one slot when present.
282    fn get_bytes(&self, slot: usize) -> Option<&[u8]>;
283
284    /// Decode one slot as a scalar leaf when the field model declares a scalar codec.
285    fn get_scalar(&self, slot: usize) -> Result<Option<ScalarSlotValueRef<'_>>, InternalError>;
286
287    /// Decode one slot value on demand using the field contract declared by the model.
288    fn get_value(&mut self, slot: usize) -> Result<Option<Value>, InternalError>;
289}
290
291///
292/// CanonicalSlotReader
293///
294/// CanonicalSlotReader
295///
296/// CanonicalSlotReader is the stricter structural row-reader contract used
297/// once `0.65` canonical-row invariants are in force.
298/// Declared slots must already exist, so callers can fail closed on missing
299/// payloads instead of carrying absent-slot fallback branches.
300///
301
302pub(in crate::db) trait CanonicalSlotReader: SlotReader {
303    /// Borrow one declared slot payload, erroring when the persisted row is not canonical.
304    fn required_bytes(&self, slot: usize) -> Result<&[u8], InternalError> {
305        let field = field_model_for_slot(self.model(), slot)?;
306
307        self.get_bytes(slot)
308            .ok_or_else(|| InternalError::persisted_row_declared_field_missing(field.name()))
309    }
310
311    /// Read one scalar slot through the structural fast path without allowing
312    /// declared-slot absence.
313    fn required_scalar(&self, slot: usize) -> Result<ScalarSlotValueRef<'_>, InternalError> {
314        let field = field_model_for_slot(self.model(), slot)?;
315        debug_assert!(matches!(field.leaf_codec(), LeafCodec::Scalar(_)));
316
317        self.get_scalar(slot)?
318            .ok_or_else(|| InternalError::persisted_row_declared_field_missing(field.name()))
319    }
320
321    /// Decode one declared slot through the owning field contract without
322    /// allowing absent payloads.
323    fn required_value_by_contract(&self, slot: usize) -> Result<Value, InternalError> {
324        decode_slot_value_from_bytes(self.model(), slot, self.required_bytes(slot)?)
325    }
326}
327
328///
329/// SlotWriter
330///
331/// SlotWriter is the canonical row-container output seam used by persisted-row
332/// writers.
333///
334
335pub trait SlotWriter {
336    /// Record one slot payload for the current row.
337    fn write_slot(&mut self, slot: usize, payload: Option<&[u8]>) -> Result<(), InternalError>;
338
339    /// Record one scalar slot payload using the canonical scalar leaf envelope.
340    fn write_scalar(
341        &mut self,
342        slot: usize,
343        value: ScalarSlotValueRef<'_>,
344    ) -> Result<(), InternalError> {
345        let payload = encode_scalar_slot_value(value);
346
347        self.write_slot(slot, Some(payload.as_slice()))
348    }
349}
350
351// Resolve one staged slot cell by layout index before writer-specific payload handling.
352fn slot_cell_mut<T>(slots: &mut [T], slot: usize) -> Result<&mut T, InternalError> {
353    slots.get_mut(slot).ok_or_else(|| {
354        InternalError::persisted_row_encode_failed(
355            format!("slot {slot} is outside the row layout",),
356        )
357    })
358}
359
360// Reject slot clears at the canonical slot-image staging boundary while keeping
361// writer-specific error wording at the call site.
362fn required_slot_payload_bytes<'a>(
363    model: &'static EntityModel,
364    writer_label: &str,
365    slot: usize,
366    payload: Option<&'a [u8]>,
367) -> Result<&'a [u8], InternalError> {
368    payload.ok_or_else(|| {
369        InternalError::persisted_row_encode_failed(format!(
370            "{writer_label} cannot clear slot {slot} for entity '{}'",
371            model.path()
372        ))
373    })
374}
375
376// Encode one fixed-width slot table plus concatenated slot payload bytes into
377// the canonical row payload container.
378fn encode_slot_payload_from_parts(
379    slot_count: usize,
380    slot_table: &[(u32, u32)],
381    payload_bytes: &[u8],
382) -> Result<Vec<u8>, InternalError> {
383    let field_count = u16::try_from(slot_count).map_err(|_| {
384        InternalError::persisted_row_encode_failed(format!(
385            "field count {slot_count} exceeds u16 slot table capacity",
386        ))
387    })?;
388    let mut encoded = Vec::with_capacity(
389        usize::from(field_count) * (u32::BITS as usize / 4) + 2 + payload_bytes.len(),
390    );
391    encoded.extend_from_slice(&field_count.to_be_bytes());
392    for (start, len) in slot_table {
393        encoded.extend_from_slice(&start.to_be_bytes());
394        encoded.extend_from_slice(&len.to_be_bytes());
395    }
396    encoded.extend_from_slice(payload_bytes);
397
398    Ok(encoded)
399}
400
401///
402/// PersistedRow
403///
404/// PersistedRow is the derive-owned bridge between typed entities and
405/// slot-addressable persisted rows.
406/// It owns entity-specific materialization/default semantics while runtime
407/// paths stay structural at the row boundary.
408///
409
410pub trait PersistedRow: EntityKind + Sized {
411    /// Materialize one typed entity from one slot reader.
412    fn materialize_from_slots(slots: &mut dyn SlotReader) -> Result<Self, InternalError>;
413
414    /// Write one typed entity into one slot writer.
415    fn write_slots(&self, out: &mut dyn SlotWriter) -> Result<(), InternalError>;
416
417    /// Decode one slot value needed by structural planner/projection consumers.
418    fn project_slot(slots: &mut dyn SlotReader, slot: usize) -> Result<Option<Value>, InternalError>
419    where
420        Self: crate::traits::FieldProjection,
421    {
422        let entity = Self::materialize_from_slots(slots)?;
423
424        Ok(<Self as crate::traits::FieldProjection>::get_value_by_index(&entity, slot))
425    }
426}
427
428/// Decode one slot value through the declared field contract without routing
429/// through `SlotReader::get_value`.
430#[cfg(test)]
431pub(in crate::db) fn decode_slot_value_by_contract(
432    slots: &dyn SlotReader,
433    slot: usize,
434) -> Result<Option<Value>, InternalError> {
435    let Some(raw_value) = slots.get_bytes(slot) else {
436        return Ok(None);
437    };
438
439    decode_slot_value_from_bytes(slots.model(), slot, raw_value).map(Some)
440}
441
442/// Decode one structural slot payload using the owning model field contract.
443///
444/// This is the canonical field-level decode boundary for persisted-row bytes.
445/// Higher-level row readers may still cache decoded values, but they should not
446/// rebuild scalar-vs-CBOR field dispatch themselves.
447pub(in crate::db) fn decode_slot_value_from_bytes(
448    model: &'static EntityModel,
449    slot: usize,
450    raw_value: &[u8],
451) -> Result<Value, InternalError> {
452    let field = field_model_for_slot(model, slot)?;
453
454    match field.leaf_codec() {
455        LeafCodec::Scalar(codec) => match decode_scalar_slot_value(raw_value, codec, field.name())?
456        {
457            ScalarSlotValueRef::Null => Ok(Value::Null),
458            ScalarSlotValueRef::Value(value) => Ok(value.into_value()),
459        },
460        LeafCodec::CborFallback => decode_non_scalar_slot_value(raw_value, field),
461    }
462}
463
464/// Encode one structural slot value using the owning model field contract.
465///
466/// This is the initial `0.64` write-side field-codec boundary. It currently
467/// covers:
468/// - scalar leaf slots
469/// - `FieldStorageDecode::Value` slots
470///
471/// Composite `ByKind` field encoding remains a follow-up slice so the runtime
472/// can add one structural encoder owner instead of quietly rebuilding typed
473/// per-field branches.
474#[allow(dead_code)]
475pub(in crate::db) fn encode_slot_value_from_value(
476    model: &'static EntityModel,
477    slot: usize,
478    value: &Value,
479) -> Result<Vec<u8>, InternalError> {
480    let field = field_model_for_slot(model, slot)?;
481    ensure_slot_value_matches_field_contract(field, value)?;
482
483    match field.storage_decode() {
484        FieldStorageDecode::Value => serialize(value)
485            .map_err(|err| InternalError::persisted_row_field_encode_failed(field.name(), err)),
486        FieldStorageDecode::ByKind => match field.leaf_codec() {
487            LeafCodec::Scalar(_) => {
488                let scalar = compile_scalar_literal_expr_value(value).ok_or_else(|| {
489                    InternalError::persisted_row_field_encode_failed(
490                        field.name(),
491                        format!(
492                            "field kind {:?} requires a scalar runtime value, found {value:?}",
493                            field.kind()
494                        ),
495                    )
496                })?;
497
498                Ok(encode_scalar_slot_value(scalar.as_slot_value_ref()))
499            }
500            LeafCodec::CborFallback => {
501                encode_structural_field_bytes_by_kind(field.kind(), value, field.name())
502            }
503        },
504    }
505}
506
507// Decode one slot payload and immediately re-encode it through the current
508// field contract so every row-emission path normalizes bytes at the boundary.
509fn canonicalize_slot_payload(
510    model: &'static EntityModel,
511    slot: usize,
512    raw_value: &[u8],
513) -> Result<Vec<u8>, InternalError> {
514    let value = decode_slot_value_from_bytes(model, slot, raw_value)?;
515
516    encode_slot_value_from_value(model, slot, &value)
517}
518
519// Emit one raw row from a dense canonical slot image.
520fn emit_raw_row_from_slot_payloads(
521    model: &'static EntityModel,
522    slot_payloads: &[Vec<u8>],
523) -> Result<CanonicalRow, InternalError> {
524    if slot_payloads.len() != model.fields().len() {
525        return Err(InternalError::persisted_row_encode_failed(format!(
526            "canonical slot image expected {} slots for entity '{}', found {}",
527            model.fields().len(),
528            model.path(),
529            slot_payloads.len()
530        )));
531    }
532
533    let mut writer = SlotBufferWriter::for_model(model);
534
535    // Phase 1: write the already canonicalized dense slot image through the
536    // single row-container authority.
537    for (slot, payload) in slot_payloads.iter().enumerate() {
538        writer.write_slot(slot, Some(payload.as_slice()))?;
539    }
540
541    // Phase 2: wrap the canonical slot container in the shared row envelope.
542    let encoded = serialize_row_payload(writer.finish()?)?;
543    let raw_row = RawRow::from_untrusted_bytes(encoded).map_err(InternalError::from)?;
544
545    Ok(CanonicalRow::from_canonical_raw_row(raw_row))
546}
547
548// Build one dense canonical slot image from a serialized patch, failing closed
549// when any declared slot is missing or any payload is non-canonical.
550fn dense_canonical_slot_image_from_serialized_patch(
551    model: &'static EntityModel,
552    patch: &SerializedUpdatePatch,
553) -> Result<Vec<Vec<u8>>, InternalError> {
554    let patch_payloads = serialized_patch_payload_by_slot(model, patch)?;
555    let mut slot_payloads = Vec::with_capacity(model.fields().len());
556
557    for (slot, payload) in patch_payloads.into_iter().enumerate() {
558        let payload = payload.ok_or_else(|| {
559            InternalError::persisted_row_encode_failed(format!(
560                "serialized patch did not emit slot {slot} for entity '{}'",
561                model.path()
562            ))
563        })?;
564        slot_payloads.push(canonicalize_slot_payload(model, slot, payload)?);
565    }
566
567    Ok(slot_payloads)
568}
569
570/// Build one canonical row from one serialized structural patch that already
571/// describes a full logical row image.
572pub(in crate::db) fn canonical_row_from_serialized_update_patch(
573    model: &'static EntityModel,
574    patch: &SerializedUpdatePatch,
575) -> Result<CanonicalRow, InternalError> {
576    let slot_payloads = dense_canonical_slot_image_from_serialized_patch(model, patch)?;
577
578    emit_raw_row_from_slot_payloads(model, slot_payloads.as_slice())
579}
580
581// Rebuild one full canonical row image from an existing raw row before it
582// crosses a storage write boundary.
583pub(in crate::db) fn canonical_row_from_raw_row(
584    model: &'static EntityModel,
585    raw_row: &RawRow,
586) -> Result<CanonicalRow, InternalError> {
587    let field_bytes = StructuralRowFieldBytes::from_raw_row(raw_row, model)
588        .map_err(StructuralRowDecodeError::into_internal_error)?;
589    let mut slot_payloads = Vec::with_capacity(model.fields().len());
590
591    // Phase 1: canonicalize every declared slot from the existing row image.
592    for slot in 0..model.fields().len() {
593        let existing_payload = field_bytes.field(slot).ok_or_else(|| {
594            InternalError::persisted_row_encode_failed(format!(
595                "slot {slot} is missing from the baseline row for entity '{}'",
596                model.path()
597            ))
598        })?;
599        slot_payloads.push(canonicalize_slot_payload(model, slot, existing_payload)?);
600    }
601
602    // Phase 2: re-emit the full image through the single row-emission owner.
603    emit_raw_row_from_slot_payloads(model, slot_payloads.as_slice())
604}
605
606// Rewrap one row already loaded from storage as a canonical write token.
607pub(in crate::db) const fn canonical_row_from_stored_raw_row(raw_row: RawRow) -> CanonicalRow {
608    CanonicalRow::from_canonical_raw_row(raw_row)
609}
610
611/// Apply one ordered structural patch to one raw row using the current
612/// persisted-row field codec authority.
613#[allow(dead_code)]
614pub(in crate::db) fn apply_update_patch_to_raw_row(
615    model: &'static EntityModel,
616    raw_row: &RawRow,
617    patch: &UpdatePatch,
618) -> Result<CanonicalRow, InternalError> {
619    let serialized_patch = serialize_update_patch_fields(model, patch)?;
620
621    apply_serialized_update_patch_to_raw_row(model, raw_row, &serialized_patch)
622}
623
624/// Serialize one ordered structural patch into canonical slot payload bytes.
625///
626/// This is the phase-1 partial-serialization seam for `0.64`: later mutation
627/// stages can stage or replay one field patch without rebuilding the runtime
628/// value-to-bytes contract per consumer.
629#[allow(dead_code)]
630pub(in crate::db) fn serialize_update_patch_fields(
631    model: &'static EntityModel,
632    patch: &UpdatePatch,
633) -> Result<SerializedUpdatePatch, InternalError> {
634    if patch.is_empty() {
635        return Ok(SerializedUpdatePatch::default());
636    }
637
638    let mut entries = Vec::with_capacity(patch.entries().len());
639
640    // Phase 1: validate and encode each ordered field update through the
641    // canonical slot codec owner.
642    for entry in patch.entries() {
643        let slot = entry.slot();
644        let payload = encode_slot_value_from_value(model, slot.index(), entry.value())?;
645        entries.push(SerializedFieldUpdate::new(slot, payload));
646    }
647
648    Ok(SerializedUpdatePatch::new(entries))
649}
650
651/// Serialize one full typed entity image into the canonical serialized patch
652/// artifact used by row-boundary patch replay.
653///
654/// This keeps typed save/update APIs on the existing surface while moving the
655/// actual after-image staging onto the structural slot-patch boundary.
656#[allow(dead_code)]
657pub(in crate::db) fn serialize_entity_slots_as_update_patch<E>(
658    entity: &E,
659) -> Result<SerializedUpdatePatch, InternalError>
660where
661    E: PersistedRow,
662{
663    let mut writer = SerializedPatchWriter::for_model(E::MODEL);
664
665    // Phase 1: let the derive-owned persisted-row writer emit the complete
666    // structural slot image for this entity.
667    entity.write_slots(&mut writer)?;
668
669    // Phase 2: require a dense slot image so save/update replay remains
670    // equivalent to the existing full-row write semantics.
671    writer.finish_complete()
672}
673
674/// Apply one serialized structural patch to one raw row.
675///
676/// This mechanical replay step no longer owns any `Value -> bytes` dispatch.
677/// It only replays already encoded slot payloads over the current row layout.
678#[allow(dead_code)]
679pub(in crate::db) fn apply_serialized_update_patch_to_raw_row(
680    model: &'static EntityModel,
681    raw_row: &RawRow,
682    patch: &SerializedUpdatePatch,
683) -> Result<CanonicalRow, InternalError> {
684    if patch.is_empty() {
685        return canonical_row_from_raw_row(model, raw_row);
686    }
687
688    let field_bytes = StructuralRowFieldBytes::from_raw_row(raw_row, model)
689        .map_err(StructuralRowDecodeError::into_internal_error)?;
690    let patch_payloads = serialized_patch_payload_by_slot(model, patch)?;
691    let mut slot_payloads = Vec::with_capacity(model.fields().len());
692
693    // Phase 1: replay the current row layout slot-by-slot.
694    // Both patch and baseline bytes are normalized through the field contract
695    // so no opaque payload can cross into the emitted row image.
696    for (slot, patch_payload) in patch_payloads.iter().enumerate() {
697        if let Some(payload) = patch_payload {
698            slot_payloads.push(canonicalize_slot_payload(model, slot, payload)?);
699        } else {
700            let existing_payload = field_bytes.field(slot).ok_or_else(|| {
701                InternalError::persisted_row_encode_failed(format!(
702                    "slot {slot} is missing from the baseline row for entity '{}'",
703                    model.path()
704                ))
705            })?;
706            slot_payloads.push(canonicalize_slot_payload(model, slot, existing_payload)?);
707        }
708    }
709
710    // Phase 2: emit the rebuilt row through the single row-construction owner.
711    emit_raw_row_from_slot_payloads(model, slot_payloads.as_slice())
712}
713
714// Decode one non-scalar slot through the exact persisted contract declared by
715// the field model.
716fn decode_non_scalar_slot_value(
717    raw_value: &[u8],
718    field: &FieldModel,
719) -> Result<Value, InternalError> {
720    let decoded = match field.storage_decode() {
721        crate::model::field::FieldStorageDecode::ByKind => {
722            decode_structural_field_by_kind_bytes(raw_value, field.kind())
723        }
724        crate::model::field::FieldStorageDecode::Value => {
725            decode_structural_value_storage_bytes(raw_value)
726        }
727    };
728
729    decoded.map_err(|err| {
730        InternalError::persisted_row_field_kind_decode_failed(field.name(), field.kind(), err)
731    })
732}
733
734// Validate one runtime value against the persisted field contract before field-
735// level structural encoding writes bytes into a row slot.
736#[allow(dead_code)]
737fn ensure_slot_value_matches_field_contract(
738    field: &FieldModel,
739    value: &Value,
740) -> Result<(), InternalError> {
741    if matches!(value, Value::Null) {
742        if field.nullable() {
743            return Ok(());
744        }
745
746        return Err(InternalError::persisted_row_field_encode_failed(
747            field.name(),
748            "required field cannot store null",
749        ));
750    }
751
752    // `FieldStorageDecode::Value` fields persist the generic `Value` envelope
753    // directly, so storage-side validation must accept structured leaves nested
754    // under collection contracts instead of reusing the predicate literal gate.
755    if matches!(field.storage_decode(), FieldStorageDecode::Value) {
756        if !storage_value_matches_field_kind(field.kind(), value) {
757            return Err(InternalError::persisted_row_field_encode_failed(
758                field.name(),
759                format!(
760                    "field kind {:?} does not accept runtime value {value:?}",
761                    field.kind()
762                ),
763            ));
764        }
765
766        ensure_decimal_scale_matches(field.name(), field.kind(), value)?;
767
768        return ensure_value_is_deterministic_for_storage(field.name(), field.kind(), value);
769    }
770
771    let field_type = field_type_from_model_kind(&field.kind());
772    if !literal_matches_type(value, &field_type) {
773        return Err(InternalError::persisted_row_field_encode_failed(
774            field.name(),
775            format!(
776                "field kind {:?} does not accept runtime value {value:?}",
777                field.kind()
778            ),
779        ));
780    }
781
782    ensure_decimal_scale_matches(field.name(), field.kind(), value)?;
783    ensure_value_is_deterministic_for_storage(field.name(), field.kind(), value)
784}
785
786// Match one runtime value against the semantic field kind used by value-backed
787// storage. Unlike predicate literals, non-queryable structured leaves are valid
788// persisted payloads when they arrive as canonical `Value::List` / `Value::Map`
789// shapes.
790fn storage_value_matches_field_kind(kind: FieldKind, value: &Value) -> bool {
791    match (kind, value) {
792        (FieldKind::Account, Value::Account(_))
793        | (FieldKind::Blob, Value::Blob(_))
794        | (FieldKind::Bool, Value::Bool(_))
795        | (FieldKind::Date, Value::Date(_))
796        | (FieldKind::Decimal { .. }, Value::Decimal(_))
797        | (FieldKind::Duration, Value::Duration(_))
798        | (FieldKind::Enum { .. }, Value::Enum(_))
799        | (FieldKind::Float32, Value::Float32(_))
800        | (FieldKind::Float64, Value::Float64(_))
801        | (FieldKind::Int, Value::Int(_))
802        | (FieldKind::Int128, Value::Int128(_))
803        | (FieldKind::IntBig, Value::IntBig(_))
804        | (FieldKind::Principal, Value::Principal(_))
805        | (FieldKind::Subaccount, Value::Subaccount(_))
806        | (FieldKind::Text, Value::Text(_))
807        | (FieldKind::Timestamp, Value::Timestamp(_))
808        | (FieldKind::Uint, Value::Uint(_))
809        | (FieldKind::Uint128, Value::Uint128(_))
810        | (FieldKind::UintBig, Value::UintBig(_))
811        | (FieldKind::Ulid, Value::Ulid(_))
812        | (FieldKind::Unit, Value::Unit)
813        | (FieldKind::Structured { .. }, Value::List(_) | Value::Map(_)) => true,
814        (FieldKind::Relation { key_kind, .. }, value) => {
815            storage_value_matches_field_kind(*key_kind, value)
816        }
817        (FieldKind::List(inner) | FieldKind::Set(inner), Value::List(items)) => items
818            .iter()
819            .all(|item| storage_value_matches_field_kind(*inner, item)),
820        (FieldKind::Map { key, value }, Value::Map(entries)) => {
821            if Value::validate_map_entries(entries.as_slice()).is_err() {
822                return false;
823            }
824
825            entries.iter().all(|(entry_key, entry_value)| {
826                storage_value_matches_field_kind(*key, entry_key)
827                    && storage_value_matches_field_kind(*value, entry_value)
828            })
829        }
830        _ => false,
831    }
832}
833
834// Enforce fixed decimal scales through nested collection/map shapes before a
835// field-level patch value is persisted.
836#[allow(dead_code)]
837fn ensure_decimal_scale_matches(
838    field_name: &str,
839    kind: FieldKind,
840    value: &Value,
841) -> Result<(), InternalError> {
842    if matches!(value, Value::Null) {
843        return Ok(());
844    }
845
846    match (kind, value) {
847        (FieldKind::Decimal { scale }, Value::Decimal(decimal)) => {
848            if decimal.scale() != scale {
849                return Err(InternalError::persisted_row_field_encode_failed(
850                    field_name,
851                    format!(
852                        "decimal scale mismatch: expected {scale}, found {}",
853                        decimal.scale()
854                    ),
855                ));
856            }
857
858            Ok(())
859        }
860        (FieldKind::Relation { key_kind, .. }, value) => {
861            ensure_decimal_scale_matches(field_name, *key_kind, value)
862        }
863        (FieldKind::List(inner) | FieldKind::Set(inner), Value::List(items)) => {
864            for item in items {
865                ensure_decimal_scale_matches(field_name, *inner, item)?;
866            }
867
868            Ok(())
869        }
870        (
871            FieldKind::Map {
872                key,
873                value: map_value,
874            },
875            Value::Map(entries),
876        ) => {
877            for (entry_key, entry_value) in entries {
878                ensure_decimal_scale_matches(field_name, *key, entry_key)?;
879                ensure_decimal_scale_matches(field_name, *map_value, entry_value)?;
880            }
881
882            Ok(())
883        }
884        _ => Ok(()),
885    }
886}
887
888// Enforce the canonical persisted ordering rules for set/map shapes before one
889// field-level patch value becomes row bytes.
890#[allow(dead_code)]
891fn ensure_value_is_deterministic_for_storage(
892    field_name: &str,
893    kind: FieldKind,
894    value: &Value,
895) -> Result<(), InternalError> {
896    match (kind, value) {
897        (FieldKind::Set(_), Value::List(items)) => {
898            for pair in items.windows(2) {
899                let [left, right] = pair else {
900                    continue;
901                };
902                if Value::canonical_cmp(left, right) != Ordering::Less {
903                    return Err(InternalError::persisted_row_field_encode_failed(
904                        field_name,
905                        "set payload must already be canonical and deduplicated",
906                    ));
907                }
908            }
909
910            Ok(())
911        }
912        (FieldKind::Map { .. }, Value::Map(entries)) => {
913            Value::validate_map_entries(entries.as_slice())
914                .map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err))?;
915
916            if !Value::map_entries_are_strictly_canonical(entries.as_slice()) {
917                return Err(InternalError::persisted_row_field_encode_failed(
918                    field_name,
919                    "map payload must already be canonical and deduplicated",
920                ));
921            }
922
923            Ok(())
924        }
925        _ => Ok(()),
926    }
927}
928
929// Materialize the last-write-wins serialized patch view indexed by stable slot.
930fn serialized_patch_payload_by_slot<'a>(
931    model: &'static EntityModel,
932    patch: &'a SerializedUpdatePatch,
933) -> Result<Vec<Option<&'a [u8]>>, InternalError> {
934    let mut payloads = vec![None; model.fields().len()];
935
936    for entry in patch.entries() {
937        let slot = entry.slot().index();
938        field_model_for_slot(model, slot)?;
939        payloads[slot] = Some(entry.payload());
940    }
941
942    Ok(payloads)
943}
944
945// Encode one `ByKind` field payload into the raw CBOR shape expected by the
946// structural field decoder.
947fn encode_structural_field_bytes_by_kind(
948    kind: FieldKind,
949    value: &Value,
950    field_name: &str,
951) -> Result<Vec<u8>, InternalError> {
952    let cbor_value = encode_structural_field_cbor_by_kind(kind, value, field_name)?;
953
954    serialize(&cbor_value)
955        .map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err))
956}
957
958// Encode one `ByKind` field payload into its raw CBOR value form.
959fn encode_structural_field_cbor_by_kind(
960    kind: FieldKind,
961    value: &Value,
962    field_name: &str,
963) -> Result<CborValue, InternalError> {
964    match (kind, value) {
965        (_, Value::Null) => Ok(CborValue::Null),
966        (FieldKind::Blob, Value::Blob(value)) => Ok(CborValue::Bytes(value.clone())),
967        (FieldKind::Bool, Value::Bool(value)) => Ok(CborValue::Bool(*value)),
968        (FieldKind::Text, Value::Text(value)) => Ok(CborValue::Text(value.clone())),
969        (FieldKind::Int, Value::Int(value)) => Ok(CborValue::Integer(i128::from(*value))),
970        (FieldKind::Uint, Value::Uint(value)) => Ok(CborValue::Integer(i128::from(*value))),
971        (FieldKind::Float32, Value::Float32(value)) => to_cbor_value(value)
972            .map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err)),
973        (FieldKind::Float64, Value::Float64(value)) => to_cbor_value(value)
974            .map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err)),
975        (FieldKind::Int128, Value::Int128(value)) => to_cbor_value(value)
976            .map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err)),
977        (FieldKind::Uint128, Value::Uint128(value)) => to_cbor_value(value)
978            .map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err)),
979        (FieldKind::Ulid, Value::Ulid(value)) => Ok(CborValue::Text(value.to_string())),
980        (FieldKind::Account, Value::Account(value)) => encode_leaf_cbor_value(value, field_name),
981        (FieldKind::Date, Value::Date(value)) => encode_leaf_cbor_value(value, field_name),
982        (FieldKind::Decimal { .. }, Value::Decimal(value)) => {
983            encode_leaf_cbor_value(value, field_name)
984        }
985        (FieldKind::Duration, Value::Duration(value)) => encode_leaf_cbor_value(value, field_name),
986        (FieldKind::IntBig, Value::IntBig(value)) => encode_leaf_cbor_value(value, field_name),
987        (FieldKind::Principal, Value::Principal(value)) => {
988            encode_leaf_cbor_value(value, field_name)
989        }
990        (FieldKind::Subaccount, Value::Subaccount(value)) => {
991            encode_leaf_cbor_value(value, field_name)
992        }
993        (FieldKind::Timestamp, Value::Timestamp(value)) => {
994            encode_leaf_cbor_value(value, field_name)
995        }
996        (FieldKind::UintBig, Value::UintBig(value)) => encode_leaf_cbor_value(value, field_name),
997        (FieldKind::Unit, Value::Unit) => encode_leaf_cbor_value(&(), field_name),
998        (FieldKind::Relation { key_kind, .. }, value) => {
999            encode_structural_field_cbor_by_kind(*key_kind, value, field_name)
1000        }
1001        (FieldKind::List(inner) | FieldKind::Set(inner), Value::List(items)) => {
1002            Ok(CborValue::Array(
1003                items
1004                    .iter()
1005                    .map(|item| encode_structural_field_cbor_by_kind(*inner, item, field_name))
1006                    .collect::<Result<Vec<_>, _>>()?,
1007            ))
1008        }
1009        (FieldKind::Map { key, value }, Value::Map(entries)) => {
1010            let mut encoded = BTreeMap::new();
1011            for (entry_key, entry_value) in entries {
1012                encoded.insert(
1013                    encode_structural_field_cbor_by_kind(*key, entry_key, field_name)?,
1014                    encode_structural_field_cbor_by_kind(*value, entry_value, field_name)?,
1015                );
1016            }
1017
1018            Ok(CborValue::Map(encoded))
1019        }
1020        (FieldKind::Enum { path, variants }, Value::Enum(value)) => {
1021            encode_enum_cbor_value(path, variants, value, field_name)
1022        }
1023        (FieldKind::Structured { .. }, _) => Err(InternalError::persisted_row_field_encode_failed(
1024            field_name,
1025            "structured ByKind field encoding is unsupported",
1026        )),
1027        _ => Err(InternalError::persisted_row_field_encode_failed(
1028            field_name,
1029            format!("field kind {kind:?} does not accept runtime value {value:?}"),
1030        )),
1031    }
1032}
1033
1034// Encode one typed leaf wrapper into its raw CBOR value form.
1035fn encode_leaf_cbor_value<T>(value: &T, field_name: &str) -> Result<CborValue, InternalError>
1036where
1037    T: serde::Serialize,
1038{
1039    to_cbor_value(value)
1040        .map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err))
1041}
1042
1043// Encode one enum field using the same unit-vs-one-entry-map envelope expected
1044// by structural enum decode.
1045fn encode_enum_cbor_value(
1046    path: &'static str,
1047    variants: &'static [crate::model::field::EnumVariantModel],
1048    value: &ValueEnum,
1049    field_name: &str,
1050) -> Result<CborValue, InternalError> {
1051    if let Some(actual_path) = value.path()
1052        && actual_path != path
1053    {
1054        return Err(InternalError::persisted_row_field_encode_failed(
1055            field_name,
1056            format!("enum path mismatch: expected '{path}', found '{actual_path}'"),
1057        ));
1058    }
1059
1060    let Some(payload) = value.payload() else {
1061        return Ok(CborValue::Text(value.variant().to_string()));
1062    };
1063
1064    let Some(variant_model) = variants.iter().find(|item| item.ident() == value.variant()) else {
1065        return Err(InternalError::persisted_row_field_encode_failed(
1066            field_name,
1067            format!(
1068                "unknown enum variant '{}' for path '{path}'",
1069                value.variant()
1070            ),
1071        ));
1072    };
1073    let Some(payload_kind) = variant_model.payload_kind() else {
1074        return Err(InternalError::persisted_row_field_encode_failed(
1075            field_name,
1076            format!(
1077                "enum variant '{}' does not accept a payload",
1078                value.variant()
1079            ),
1080        ));
1081    };
1082
1083    let payload_value = match variant_model.payload_storage_decode() {
1084        FieldStorageDecode::ByKind => {
1085            encode_structural_field_cbor_by_kind(*payload_kind, payload, field_name)?
1086        }
1087        FieldStorageDecode::Value => to_cbor_value(payload)
1088            .map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err))?,
1089    };
1090
1091    let mut encoded = BTreeMap::new();
1092    encoded.insert(CborValue::Text(value.variant().to_string()), payload_value);
1093
1094    Ok(CborValue::Map(encoded))
1095}
1096
1097// Resolve one field model entry by stable slot index.
1098fn field_model_for_slot(
1099    model: &'static EntityModel,
1100    slot: usize,
1101) -> Result<&'static FieldModel, InternalError> {
1102    model
1103        .fields()
1104        .get(slot)
1105        .ok_or_else(|| InternalError::persisted_row_slot_lookup_out_of_bounds(model.path(), slot))
1106}
1107
1108///
1109/// ScalarValueRef
1110///
1111/// ScalarValueRef is the borrowed-or-copy scalar payload view returned by the
1112/// slot-reader fast path.
1113/// It preserves cheap references for text/blob payloads while keeping fixed
1114/// width scalar wrappers as copy values.
1115///
1116
1117#[derive(Clone, Copy, Debug)]
1118pub enum ScalarValueRef<'a> {
1119    Blob(&'a [u8]),
1120    Bool(bool),
1121    Date(Date),
1122    Duration(Duration),
1123    Float32(Float32),
1124    Float64(Float64),
1125    Int(i64),
1126    Principal(Principal),
1127    Subaccount(Subaccount),
1128    Text(&'a str),
1129    Timestamp(Timestamp),
1130    Uint(u64),
1131    Ulid(Ulid),
1132    Unit,
1133}
1134
1135impl ScalarValueRef<'_> {
1136    /// Materialize this scalar view into the runtime `Value` enum.
1137    #[must_use]
1138    pub fn into_value(self) -> Value {
1139        match self {
1140            Self::Blob(value) => Value::Blob(value.to_vec()),
1141            Self::Bool(value) => Value::Bool(value),
1142            Self::Date(value) => Value::Date(value),
1143            Self::Duration(value) => Value::Duration(value),
1144            Self::Float32(value) => Value::Float32(value),
1145            Self::Float64(value) => Value::Float64(value),
1146            Self::Int(value) => Value::Int(value),
1147            Self::Principal(value) => Value::Principal(value),
1148            Self::Subaccount(value) => Value::Subaccount(value),
1149            Self::Text(value) => Value::Text(value.to_owned()),
1150            Self::Timestamp(value) => Value::Timestamp(value),
1151            Self::Uint(value) => Value::Uint(value),
1152            Self::Ulid(value) => Value::Ulid(value),
1153            Self::Unit => Value::Unit,
1154        }
1155    }
1156}
1157
1158///
1159/// ScalarSlotValueRef
1160///
1161/// ScalarSlotValueRef preserves the distinction between a missing slot and an
1162/// explicitly persisted `NULL` scalar payload.
1163/// The outer `Option` from `SlotReader::get_scalar` therefore still means
1164/// "slot absent".
1165///
1166
1167#[derive(Clone, Copy, Debug)]
1168pub enum ScalarSlotValueRef<'a> {
1169    Null,
1170    Value(ScalarValueRef<'a>),
1171}
1172
1173///
1174/// PersistedScalar
1175///
1176/// PersistedScalar defines the canonical binary payload codec for one scalar
1177/// leaf type.
1178/// Derive-generated persisted-row materializers and writers use this trait to
1179/// avoid routing scalar fields back through CBOR.
1180///
1181
1182pub trait PersistedScalar: Sized {
1183    /// Canonical scalar codec identifier used by schema/runtime metadata.
1184    const CODEC: ScalarCodec;
1185
1186    /// Encode this scalar value into its codec-specific payload bytes.
1187    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError>;
1188
1189    /// Decode this scalar value from its codec-specific payload bytes.
1190    fn decode_scalar_payload(bytes: &[u8], field_name: &'static str)
1191    -> Result<Self, InternalError>;
1192}
1193
1194/// Encode one persisted slot payload using the shared leaf codec boundary.
1195pub fn encode_persisted_slot_payload<T>(
1196    value: &T,
1197    field_name: &'static str,
1198) -> Result<Vec<u8>, InternalError>
1199where
1200    T: serde::Serialize,
1201{
1202    serialize(value)
1203        .map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err))
1204}
1205
1206/// Encode one persisted scalar slot payload using the canonical scalar envelope.
1207pub fn encode_persisted_scalar_slot_payload<T>(
1208    value: &T,
1209    field_name: &'static str,
1210) -> Result<Vec<u8>, InternalError>
1211where
1212    T: PersistedScalar,
1213{
1214    let payload = value.encode_scalar_payload()?;
1215    let mut encoded = Vec::with_capacity(payload.len() + 2);
1216    encoded.push(SCALAR_SLOT_PREFIX);
1217    encoded.push(SCALAR_SLOT_TAG_VALUE);
1218    encoded.extend_from_slice(&payload);
1219
1220    if encoded.len() < 2 {
1221        return Err(InternalError::persisted_row_field_encode_failed(
1222            field_name,
1223            "scalar payload envelope underflow",
1224        ));
1225    }
1226
1227    Ok(encoded)
1228}
1229
1230/// Encode one optional persisted scalar slot payload preserving explicit `NULL`.
1231pub fn encode_persisted_option_scalar_slot_payload<T>(
1232    value: &Option<T>,
1233    field_name: &'static str,
1234) -> Result<Vec<u8>, InternalError>
1235where
1236    T: PersistedScalar,
1237{
1238    match value {
1239        Some(value) => encode_persisted_scalar_slot_payload(value, field_name),
1240        None => Ok(vec![SCALAR_SLOT_PREFIX, SCALAR_SLOT_TAG_NULL]),
1241    }
1242}
1243
1244/// Decode one persisted slot payload using the shared leaf codec boundary.
1245pub fn decode_persisted_slot_payload<T>(
1246    bytes: &[u8],
1247    field_name: &'static str,
1248) -> Result<T, InternalError>
1249where
1250    T: serde::de::DeserializeOwned,
1251{
1252    deserialize(bytes)
1253        .map_err(|err| InternalError::persisted_row_field_decode_failed(field_name, err))
1254}
1255
1256/// Decode one non-null persisted slot payload through the shared leaf codec boundary.
1257pub fn decode_persisted_non_null_slot_payload<T>(
1258    bytes: &[u8],
1259    field_name: &'static str,
1260) -> Result<T, InternalError>
1261where
1262    T: serde::de::DeserializeOwned,
1263{
1264    if persisted_slot_payload_is_null(bytes) {
1265        return Err(InternalError::persisted_row_field_decode_failed(
1266            field_name,
1267            "unexpected null for non-nullable field",
1268        ));
1269    }
1270
1271    decode_persisted_slot_payload(bytes, field_name)
1272}
1273
1274/// Decode one optional persisted slot payload preserving explicit CBOR `NULL`.
1275pub fn decode_persisted_option_slot_payload<T>(
1276    bytes: &[u8],
1277    field_name: &'static str,
1278) -> Result<Option<T>, InternalError>
1279where
1280    T: serde::de::DeserializeOwned,
1281{
1282    if persisted_slot_payload_is_null(bytes) {
1283        return Ok(None);
1284    }
1285
1286    decode_persisted_slot_payload(bytes, field_name).map(Some)
1287}
1288
1289/// Decode one persisted custom-schema payload through `Value` and reconstruct
1290/// the typed field via `FieldValue`.
1291pub fn decode_persisted_custom_slot_payload<T>(
1292    bytes: &[u8],
1293    field_name: &'static str,
1294) -> Result<T, InternalError>
1295where
1296    T: FieldValue,
1297{
1298    let value = decode_persisted_slot_payload::<Value>(bytes, field_name)?;
1299
1300    T::from_value(&value).ok_or_else(|| {
1301        InternalError::persisted_row_field_decode_failed(
1302            field_name,
1303            format!(
1304                "value payload does not match {}",
1305                std::any::type_name::<T>()
1306            ),
1307        )
1308    })
1309}
1310
1311/// Decode one persisted repeated custom-schema payload through `Value::List`
1312/// and reconstruct each item via `FieldValue`.
1313pub fn decode_persisted_custom_many_slot_payload<T>(
1314    bytes: &[u8],
1315    field_name: &'static str,
1316) -> Result<Vec<T>, InternalError>
1317where
1318    T: FieldValue,
1319{
1320    let value = decode_persisted_slot_payload::<Value>(bytes, field_name)?;
1321
1322    field_value_vec_from_value::<T>(&value).ok_or_else(|| {
1323        InternalError::persisted_row_field_decode_failed(
1324            field_name,
1325            format!(
1326                "value payload does not match Vec<{}>",
1327                std::any::type_name::<T>()
1328            ),
1329        )
1330    })
1331}
1332
1333/// Encode one custom-schema field payload through its `FieldValue`
1334/// representation so structural decode preserves nested custom values.
1335pub fn encode_persisted_custom_slot_payload<T>(
1336    value: &T,
1337    field_name: &'static str,
1338) -> Result<Vec<u8>, InternalError>
1339where
1340    T: FieldValue,
1341{
1342    let value = value.to_value();
1343    encode_persisted_slot_payload(&value, field_name)
1344}
1345
1346/// Encode one repeated custom-schema payload through `Value::List`.
1347pub fn encode_persisted_custom_many_slot_payload<T>(
1348    values: &[T],
1349    field_name: &'static str,
1350) -> Result<Vec<u8>, InternalError>
1351where
1352    T: FieldValue,
1353{
1354    let value = Value::List(values.iter().map(FieldValue::to_value).collect());
1355    encode_persisted_slot_payload(&value, field_name)
1356}
1357
1358/// Decode one persisted scalar slot payload using the canonical scalar envelope.
1359pub fn decode_persisted_scalar_slot_payload<T>(
1360    bytes: &[u8],
1361    field_name: &'static str,
1362) -> Result<T, InternalError>
1363where
1364    T: PersistedScalar,
1365{
1366    let payload = decode_scalar_slot_payload_body(bytes, field_name)?.ok_or_else(|| {
1367        InternalError::persisted_row_field_decode_failed(
1368            field_name,
1369            "unexpected null for non-nullable scalar field",
1370        )
1371    })?;
1372
1373    T::decode_scalar_payload(payload, field_name)
1374}
1375
1376/// Decode one optional persisted scalar slot payload preserving explicit `NULL`.
1377pub fn decode_persisted_option_scalar_slot_payload<T>(
1378    bytes: &[u8],
1379    field_name: &'static str,
1380) -> Result<Option<T>, InternalError>
1381where
1382    T: PersistedScalar,
1383{
1384    let Some(payload) = decode_scalar_slot_payload_body(bytes, field_name)? else {
1385        return Ok(None);
1386    };
1387
1388    T::decode_scalar_payload(payload, field_name).map(Some)
1389}
1390
1391// Detect the canonical persisted CBOR null payload used by optional structural slots.
1392fn persisted_slot_payload_is_null(bytes: &[u8]) -> bool {
1393    bytes == CBOR_NULL_PAYLOAD
1394}
1395
1396///
1397/// SlotBufferWriter
1398///
1399/// SlotBufferWriter captures one dense canonical row worth of slot payloads
1400/// before they are encoded into the canonical slot container.
1401///
1402
1403pub(in crate::db) struct SlotBufferWriter {
1404    model: &'static EntityModel,
1405    slots: Vec<SlotBufferSlot>,
1406}
1407
1408impl SlotBufferWriter {
1409    /// Build one empty slot buffer for one entity model.
1410    pub(in crate::db) fn for_model(model: &'static EntityModel) -> Self {
1411        Self {
1412            model,
1413            slots: vec![SlotBufferSlot::Missing; model.fields().len()],
1414        }
1415    }
1416
1417    /// Encode the buffered slots into the canonical row payload.
1418    pub(in crate::db) fn finish(self) -> Result<Vec<u8>, InternalError> {
1419        let slot_count = self.slots.len();
1420        let mut payload_bytes = Vec::new();
1421        let mut slot_table = Vec::with_capacity(slot_count);
1422
1423        // Phase 1: require one payload for every declared slot before the row
1424        // can cross the canonical persisted-row boundary.
1425        for (slot, slot_payload) in self.slots.into_iter().enumerate() {
1426            match slot_payload {
1427                SlotBufferSlot::Set(bytes) => {
1428                    let start = u32::try_from(payload_bytes.len()).map_err(|_| {
1429                        InternalError::persisted_row_encode_failed(
1430                            "slot payload start exceeds u32 range",
1431                        )
1432                    })?;
1433                    let len = u32::try_from(bytes.len()).map_err(|_| {
1434                        InternalError::persisted_row_encode_failed(
1435                            "slot payload length exceeds u32 range",
1436                        )
1437                    })?;
1438                    payload_bytes.extend_from_slice(&bytes);
1439                    slot_table.push((start, len));
1440                }
1441                SlotBufferSlot::Missing => {
1442                    return Err(InternalError::persisted_row_encode_failed(format!(
1443                        "slot buffer writer did not emit slot {slot} for entity '{}'",
1444                        self.model.path()
1445                    )));
1446                }
1447            }
1448        }
1449
1450        // Phase 2: flatten the slot table plus payload bytes into the canonical row image.
1451        encode_slot_payload_from_parts(slot_count, slot_table.as_slice(), payload_bytes.as_slice())
1452    }
1453}
1454
1455impl SlotWriter for SlotBufferWriter {
1456    fn write_slot(&mut self, slot: usize, payload: Option<&[u8]>) -> Result<(), InternalError> {
1457        let entry = slot_cell_mut(self.slots.as_mut_slice(), slot)?;
1458        let payload = required_slot_payload_bytes(self.model, "slot buffer writer", slot, payload)?;
1459        *entry = SlotBufferSlot::Set(payload.to_vec());
1460
1461        Ok(())
1462    }
1463}
1464
1465///
1466/// SlotBufferSlot
1467///
1468/// SlotBufferSlot tracks whether one canonical row encoder has emitted a
1469/// payload for every declared slot before flattening the row payload.
1470///
1471
1472#[derive(Clone, Debug, Eq, PartialEq)]
1473enum SlotBufferSlot {
1474    Missing,
1475    Set(Vec<u8>),
1476}
1477
1478///
1479/// SerializedPatchWriter
1480///
1481/// SerializedPatchWriter
1482///
1483/// SerializedPatchWriter captures a dense typed entity slot image into the
1484/// serialized patch artifact used by `0.64` mutation staging.
1485/// Unlike `SlotBufferWriter`, this writer does not flatten into one row payload;
1486/// it preserves slot-level ownership so later stages can replay the row through
1487/// the structural patch boundary.
1488///
1489
1490struct SerializedPatchWriter {
1491    model: &'static EntityModel,
1492    slots: Vec<PatchWriterSlot>,
1493}
1494
1495impl SerializedPatchWriter {
1496    /// Build one empty serialized patch writer for one entity model.
1497    fn for_model(model: &'static EntityModel) -> Self {
1498        Self {
1499            model,
1500            slots: vec![PatchWriterSlot::Missing; model.fields().len()],
1501        }
1502    }
1503
1504    /// Materialize one dense serialized patch, erroring if the writer failed
1505    /// to emit any declared slot.
1506    fn finish_complete(self) -> Result<SerializedUpdatePatch, InternalError> {
1507        let mut entries = Vec::with_capacity(self.slots.len());
1508
1509        // Phase 1: require a complete slot image so typed save/update staging
1510        // stays equivalent to the existing full-row encoder.
1511        for (slot, payload) in self.slots.into_iter().enumerate() {
1512            let field_slot = FieldSlot::from_index(self.model, slot)?;
1513            let serialized = match payload {
1514                PatchWriterSlot::Set(payload) => SerializedFieldUpdate::new(field_slot, payload),
1515                PatchWriterSlot::Missing => {
1516                    return Err(InternalError::persisted_row_encode_failed(format!(
1517                        "serialized patch writer did not emit slot {slot} for entity '{}'",
1518                        self.model.path()
1519                    )));
1520                }
1521            };
1522            entries.push(serialized);
1523        }
1524
1525        Ok(SerializedUpdatePatch::new(entries))
1526    }
1527}
1528
1529impl SlotWriter for SerializedPatchWriter {
1530    fn write_slot(&mut self, slot: usize, payload: Option<&[u8]>) -> Result<(), InternalError> {
1531        let entry = slot_cell_mut(self.slots.as_mut_slice(), slot)?;
1532        let payload =
1533            required_slot_payload_bytes(self.model, "serialized patch writer", slot, payload)?;
1534        *entry = PatchWriterSlot::Set(payload.to_vec());
1535
1536        Ok(())
1537    }
1538}
1539
1540///
1541/// PatchWriterSlot
1542///
1543/// PatchWriterSlot
1544///
1545/// PatchWriterSlot tracks whether one dense slot-image writer has emitted a
1546/// payload or failed to visit the slot at all.
1547/// That lets the typed save/update bridge reject incomplete writers instead of
1548/// silently leaving stale bytes in the baseline row.
1549///
1550
1551#[derive(Clone, Debug, Eq, PartialEq)]
1552enum PatchWriterSlot {
1553    Missing,
1554    Set(Vec<u8>),
1555}
1556
1557///
1558/// StructuralSlotReader
1559///
1560/// StructuralSlotReader adapts the current persisted-row bytes into the
1561/// canonical slot-reader seam.
1562/// It validates row shape and fully decodes every declared field before any
1563/// consumer can observe the row, then keeps those decoded values cached so
1564/// later index/predicate reads do not re-run field decoders.
1565///
1566
1567pub(in crate::db) struct StructuralSlotReader<'a> {
1568    model: &'static EntityModel,
1569    field_bytes: StructuralRowFieldBytes<'a>,
1570    cached_values: Vec<CachedSlotValue>,
1571}
1572
1573impl<'a> StructuralSlotReader<'a> {
1574    /// Build one slot reader over one persisted row using the current structural row scanner.
1575    pub(in crate::db) fn from_raw_row(
1576        raw_row: &'a RawRow,
1577        model: &'static EntityModel,
1578    ) -> Result<Self, InternalError> {
1579        let field_bytes = StructuralRowFieldBytes::from_raw_row(raw_row, model)
1580            .map_err(StructuralRowDecodeError::into_internal_error)?;
1581        let cached_values = std::iter::repeat_with(|| CachedSlotValue::Pending)
1582            .take(model.fields().len())
1583            .collect();
1584        let mut reader = Self {
1585            model,
1586            field_bytes,
1587            cached_values,
1588        };
1589
1590        // Phase 1: force every declared slot through the owned field decode
1591        // contract once so malformed but unreferenced payloads cannot stay
1592        // latent behind consumer-specific partial decode paths.
1593        reader.decode_all_declared_slots()?;
1594
1595        Ok(reader)
1596    }
1597
1598    /// Validate the decoded primary-key slot against the authoritative row key.
1599    pub(in crate::db) fn validate_storage_key(
1600        &self,
1601        data_key: &DataKey,
1602    ) -> Result<(), InternalError> {
1603        let Some(primary_key_slot) = resolve_primary_key_slot(self.model) else {
1604            return Err(InternalError::persisted_row_primary_key_field_missing(
1605                self.model.path(),
1606            ));
1607        };
1608        let field = self.field_model(primary_key_slot)?;
1609        let decoded_key = match self.get_scalar(primary_key_slot)? {
1610            Some(ScalarSlotValueRef::Null) => None,
1611            Some(ScalarSlotValueRef::Value(value)) => storage_key_from_scalar_ref(value),
1612            None => Some(
1613                decode_storage_key_field_bytes(
1614                    self.required_field_bytes(primary_key_slot, field.name())?,
1615                    field.kind,
1616                )
1617                .map_err(|err| {
1618                    InternalError::persisted_row_primary_key_not_storage_encodable(data_key, err)
1619                })?,
1620            ),
1621        };
1622        let Some(decoded_key) = decoded_key else {
1623            return Err(InternalError::persisted_row_primary_key_slot_missing(
1624                data_key,
1625            ));
1626        };
1627        let expected_key = data_key.storage_key();
1628
1629        if decoded_key != expected_key {
1630            return Err(InternalError::persisted_row_key_mismatch(
1631                expected_key,
1632                decoded_key,
1633            ));
1634        }
1635
1636        Ok(())
1637    }
1638
1639    // Resolve one field model entry by stable slot index.
1640    fn field_model(&self, slot: usize) -> Result<&FieldModel, InternalError> {
1641        field_model_for_slot(self.model, slot)
1642    }
1643
1644    // Decode every declared slot exactly once at the structural row boundary so
1645    // later consumers inherit one globally enforced canonical-row contract.
1646    fn decode_all_declared_slots(&mut self) -> Result<(), InternalError> {
1647        for slot in 0..self.model.fields().len() {
1648            let _ = self.get_value(slot)?;
1649        }
1650
1651        Ok(())
1652    }
1653
1654    // Borrow one declared slot payload, treating absence as a persisted-row
1655    // invariant violation instead of a normal structural branch.
1656    pub(in crate::db) fn required_field_bytes(
1657        &self,
1658        slot: usize,
1659        field_name: &str,
1660    ) -> Result<&[u8], InternalError> {
1661        self.field_bytes
1662            .field(slot)
1663            .ok_or_else(|| InternalError::persisted_row_declared_field_missing(field_name))
1664    }
1665}
1666
1667// Convert one scalar slot fast-path value into its storage-key form when the
1668// field kind is storage-key-compatible.
1669const fn storage_key_from_scalar_ref(value: ScalarValueRef<'_>) -> Option<StorageKey> {
1670    match value {
1671        ScalarValueRef::Int(value) => Some(StorageKey::Int(value)),
1672        ScalarValueRef::Principal(value) => Some(StorageKey::Principal(value)),
1673        ScalarValueRef::Subaccount(value) => Some(StorageKey::Subaccount(value)),
1674        ScalarValueRef::Timestamp(value) => Some(StorageKey::Timestamp(value)),
1675        ScalarValueRef::Uint(value) => Some(StorageKey::Uint(value)),
1676        ScalarValueRef::Ulid(value) => Some(StorageKey::Ulid(value)),
1677        ScalarValueRef::Unit => Some(StorageKey::Unit),
1678        _ => None,
1679    }
1680}
1681
1682impl SlotReader for StructuralSlotReader<'_> {
1683    fn model(&self) -> &'static EntityModel {
1684        self.model
1685    }
1686
1687    fn has(&self, slot: usize) -> bool {
1688        self.field_bytes.field(slot).is_some()
1689    }
1690
1691    fn get_bytes(&self, slot: usize) -> Option<&[u8]> {
1692        self.field_bytes.field(slot)
1693    }
1694
1695    fn get_scalar(&self, slot: usize) -> Result<Option<ScalarSlotValueRef<'_>>, InternalError> {
1696        let field = self.field_model(slot)?;
1697
1698        match field.leaf_codec() {
1699            LeafCodec::Scalar(codec) => decode_scalar_slot_value(
1700                self.required_field_bytes(slot, field.name())?,
1701                codec,
1702                field.name(),
1703            )
1704            .map(Some),
1705            LeafCodec::CborFallback => Ok(None),
1706        }
1707    }
1708
1709    fn get_value(&mut self, slot: usize) -> Result<Option<Value>, InternalError> {
1710        let cached = self.cached_values.get(slot).ok_or_else(|| {
1711            InternalError::persisted_row_slot_cache_lookup_out_of_bounds(self.model.path(), slot)
1712        })?;
1713        if let CachedSlotValue::Decoded(value) = cached {
1714            return Ok(Some(value.clone()));
1715        }
1716
1717        let field = self.field_model(slot)?;
1718        let value = decode_slot_value_from_bytes(
1719            self.model,
1720            slot,
1721            self.required_field_bytes(slot, field.name())?,
1722        )?;
1723        self.cached_values[slot] = CachedSlotValue::Decoded(value.clone());
1724
1725        Ok(Some(value))
1726    }
1727}
1728
1729impl CanonicalSlotReader for StructuralSlotReader<'_> {}
1730
1731///
1732/// CachedSlotValue
1733///
1734/// CachedSlotValue tracks whether one slot has already been decoded during the
1735/// current structural row access pass.
1736///
1737
1738enum CachedSlotValue {
1739    Pending,
1740    Decoded(Value),
1741}
1742
1743// Encode one scalar slot value into the canonical prefixed scalar envelope.
1744fn encode_scalar_slot_value(value: ScalarSlotValueRef<'_>) -> Vec<u8> {
1745    match value {
1746        ScalarSlotValueRef::Null => vec![SCALAR_SLOT_PREFIX, SCALAR_SLOT_TAG_NULL],
1747        ScalarSlotValueRef::Value(value) => {
1748            let mut encoded = Vec::new();
1749            encoded.push(SCALAR_SLOT_PREFIX);
1750            encoded.push(SCALAR_SLOT_TAG_VALUE);
1751
1752            match value {
1753                ScalarValueRef::Blob(bytes) => encoded.extend_from_slice(bytes),
1754                ScalarValueRef::Bool(value) => encoded.push(u8::from(value)),
1755                ScalarValueRef::Date(value) => {
1756                    encoded.extend_from_slice(&value.as_days_since_epoch().to_le_bytes());
1757                }
1758                ScalarValueRef::Duration(value) => {
1759                    encoded.extend_from_slice(&value.as_millis().to_le_bytes());
1760                }
1761                ScalarValueRef::Float32(value) => {
1762                    encoded.extend_from_slice(&value.get().to_bits().to_le_bytes());
1763                }
1764                ScalarValueRef::Float64(value) => {
1765                    encoded.extend_from_slice(&value.get().to_bits().to_le_bytes());
1766                }
1767                ScalarValueRef::Int(value) => encoded.extend_from_slice(&value.to_le_bytes()),
1768                ScalarValueRef::Principal(value) => encoded.extend_from_slice(value.as_slice()),
1769                ScalarValueRef::Subaccount(value) => encoded.extend_from_slice(&value.to_bytes()),
1770                ScalarValueRef::Text(value) => encoded.extend_from_slice(value.as_bytes()),
1771                ScalarValueRef::Timestamp(value) => {
1772                    encoded.extend_from_slice(&value.as_millis().to_le_bytes());
1773                }
1774                ScalarValueRef::Uint(value) => encoded.extend_from_slice(&value.to_le_bytes()),
1775                ScalarValueRef::Ulid(value) => encoded.extend_from_slice(&value.to_bytes()),
1776                ScalarValueRef::Unit => {}
1777            }
1778
1779            encoded
1780        }
1781    }
1782}
1783
1784// Split one scalar slot envelope into `NULL` vs payload bytes.
1785fn decode_scalar_slot_payload_body<'a>(
1786    bytes: &'a [u8],
1787    field_name: &'static str,
1788) -> Result<Option<&'a [u8]>, InternalError> {
1789    let Some((&prefix, rest)) = bytes.split_first() else {
1790        return Err(InternalError::persisted_row_field_decode_failed(
1791            field_name,
1792            "empty scalar payload",
1793        ));
1794    };
1795    if prefix != SCALAR_SLOT_PREFIX {
1796        return Err(InternalError::persisted_row_field_decode_failed(
1797            field_name,
1798            format!(
1799                "scalar payload prefix mismatch: expected slot envelope prefix byte 0x{SCALAR_SLOT_PREFIX:02X}, found 0x{prefix:02X}",
1800            ),
1801        ));
1802    }
1803    let Some((&tag, payload)) = rest.split_first() else {
1804        return Err(InternalError::persisted_row_field_decode_failed(
1805            field_name,
1806            "truncated scalar payload tag",
1807        ));
1808    };
1809
1810    match tag {
1811        SCALAR_SLOT_TAG_NULL => {
1812            if !payload.is_empty() {
1813                return Err(InternalError::persisted_row_field_decode_failed(
1814                    field_name,
1815                    "null scalar payload has trailing bytes",
1816                ));
1817            }
1818
1819            Ok(None)
1820        }
1821        SCALAR_SLOT_TAG_VALUE => Ok(Some(payload)),
1822        _ => Err(InternalError::persisted_row_field_decode_failed(
1823            field_name,
1824            format!("invalid scalar payload tag {tag}"),
1825        )),
1826    }
1827}
1828
1829// Decode one scalar slot view using the field-declared scalar codec.
1830#[expect(clippy::too_many_lines)]
1831fn decode_scalar_slot_value<'a>(
1832    bytes: &'a [u8],
1833    codec: ScalarCodec,
1834    field_name: &'static str,
1835) -> Result<ScalarSlotValueRef<'a>, InternalError> {
1836    let Some(payload) = decode_scalar_slot_payload_body(bytes, field_name)? else {
1837        return Ok(ScalarSlotValueRef::Null);
1838    };
1839
1840    let value = match codec {
1841        ScalarCodec::Blob => ScalarValueRef::Blob(payload),
1842        ScalarCodec::Bool => {
1843            let [value] = payload else {
1844                return Err(
1845                    InternalError::persisted_row_field_payload_exact_len_required(
1846                        field_name,
1847                        "bool",
1848                        SCALAR_BOOL_PAYLOAD_LEN,
1849                    ),
1850                );
1851            };
1852            match *value {
1853                SCALAR_BOOL_FALSE_TAG => ScalarValueRef::Bool(false),
1854                SCALAR_BOOL_TRUE_TAG => ScalarValueRef::Bool(true),
1855                _ => {
1856                    return Err(InternalError::persisted_row_field_payload_invalid_byte(
1857                        field_name, "bool", *value,
1858                    ));
1859                }
1860            }
1861        }
1862        ScalarCodec::Date => {
1863            let bytes: [u8; SCALAR_WORD32_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
1864                InternalError::persisted_row_field_payload_exact_len_required(
1865                    field_name,
1866                    "date",
1867                    SCALAR_WORD32_PAYLOAD_LEN,
1868                )
1869            })?;
1870            ScalarValueRef::Date(Date::from_days_since_epoch(i32::from_le_bytes(bytes)))
1871        }
1872        ScalarCodec::Duration => {
1873            let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
1874                InternalError::persisted_row_field_payload_exact_len_required(
1875                    field_name,
1876                    "duration",
1877                    SCALAR_WORD64_PAYLOAD_LEN,
1878                )
1879            })?;
1880            ScalarValueRef::Duration(Duration::from_millis(u64::from_le_bytes(bytes)))
1881        }
1882        ScalarCodec::Float32 => {
1883            let bytes: [u8; SCALAR_WORD32_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
1884                InternalError::persisted_row_field_payload_exact_len_required(
1885                    field_name,
1886                    "float32",
1887                    SCALAR_WORD32_PAYLOAD_LEN,
1888                )
1889            })?;
1890            let value = f32::from_bits(u32::from_le_bytes(bytes));
1891            let value = Float32::try_new(value).ok_or_else(|| {
1892                InternalError::persisted_row_field_payload_non_finite(field_name, "float32")
1893            })?;
1894            ScalarValueRef::Float32(value)
1895        }
1896        ScalarCodec::Float64 => {
1897            let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
1898                InternalError::persisted_row_field_payload_exact_len_required(
1899                    field_name,
1900                    "float64",
1901                    SCALAR_WORD64_PAYLOAD_LEN,
1902                )
1903            })?;
1904            let value = f64::from_bits(u64::from_le_bytes(bytes));
1905            let value = Float64::try_new(value).ok_or_else(|| {
1906                InternalError::persisted_row_field_payload_non_finite(field_name, "float64")
1907            })?;
1908            ScalarValueRef::Float64(value)
1909        }
1910        ScalarCodec::Int64 => {
1911            let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
1912                InternalError::persisted_row_field_payload_exact_len_required(
1913                    field_name,
1914                    "int",
1915                    SCALAR_WORD64_PAYLOAD_LEN,
1916                )
1917            })?;
1918            ScalarValueRef::Int(i64::from_le_bytes(bytes))
1919        }
1920        ScalarCodec::Principal => ScalarValueRef::Principal(
1921            Principal::try_from_bytes(payload)
1922                .map_err(|err| InternalError::persisted_row_field_decode_failed(field_name, err))?,
1923        ),
1924        ScalarCodec::Subaccount => {
1925            let bytes: [u8; SCALAR_SUBACCOUNT_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
1926                InternalError::persisted_row_field_payload_exact_len_required(
1927                    field_name,
1928                    "subaccount",
1929                    SCALAR_SUBACCOUNT_PAYLOAD_LEN,
1930                )
1931            })?;
1932            ScalarValueRef::Subaccount(Subaccount::from_array(bytes))
1933        }
1934        ScalarCodec::Text => {
1935            let value = str::from_utf8(payload).map_err(|err| {
1936                InternalError::persisted_row_field_text_payload_invalid_utf8(field_name, err)
1937            })?;
1938            ScalarValueRef::Text(value)
1939        }
1940        ScalarCodec::Timestamp => {
1941            let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
1942                InternalError::persisted_row_field_payload_exact_len_required(
1943                    field_name,
1944                    "timestamp",
1945                    SCALAR_WORD64_PAYLOAD_LEN,
1946                )
1947            })?;
1948            ScalarValueRef::Timestamp(Timestamp::from_millis(i64::from_le_bytes(bytes)))
1949        }
1950        ScalarCodec::Uint64 => {
1951            let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
1952                InternalError::persisted_row_field_payload_exact_len_required(
1953                    field_name,
1954                    "uint",
1955                    SCALAR_WORD64_PAYLOAD_LEN,
1956                )
1957            })?;
1958            ScalarValueRef::Uint(u64::from_le_bytes(bytes))
1959        }
1960        ScalarCodec::Ulid => {
1961            let bytes: [u8; SCALAR_ULID_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
1962                InternalError::persisted_row_field_payload_exact_len_required(
1963                    field_name,
1964                    "ulid",
1965                    SCALAR_ULID_PAYLOAD_LEN,
1966                )
1967            })?;
1968            ScalarValueRef::Ulid(Ulid::from_bytes(bytes))
1969        }
1970        ScalarCodec::Unit => {
1971            if !payload.is_empty() {
1972                return Err(InternalError::persisted_row_field_payload_must_be_empty(
1973                    field_name, "unit",
1974                ));
1975            }
1976            ScalarValueRef::Unit
1977        }
1978    };
1979
1980    Ok(ScalarSlotValueRef::Value(value))
1981}
1982
1983macro_rules! impl_persisted_scalar_signed {
1984    ($($ty:ty),* $(,)?) => {
1985        $(
1986            impl PersistedScalar for $ty {
1987                const CODEC: ScalarCodec = ScalarCodec::Int64;
1988
1989                fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1990                    Ok(i64::from(*self).to_le_bytes().to_vec())
1991                }
1992
1993                fn decode_scalar_payload(
1994                    bytes: &[u8],
1995                    field_name: &'static str,
1996                ) -> Result<Self, InternalError> {
1997                    let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
1998                        InternalError::persisted_row_field_payload_exact_len_required(
1999                            field_name,
2000                            "int",
2001                            SCALAR_WORD64_PAYLOAD_LEN,
2002                        )
2003                    })?;
2004                    <$ty>::try_from(i64::from_le_bytes(raw)).map_err(|_| {
2005                        InternalError::persisted_row_field_payload_out_of_range(
2006                            field_name,
2007                            "integer",
2008                        )
2009                    })
2010                }
2011            }
2012        )*
2013    };
2014}
2015
2016macro_rules! impl_persisted_scalar_unsigned {
2017    ($($ty:ty),* $(,)?) => {
2018        $(
2019            impl PersistedScalar for $ty {
2020                const CODEC: ScalarCodec = ScalarCodec::Uint64;
2021
2022                fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
2023                    Ok(u64::from(*self).to_le_bytes().to_vec())
2024                }
2025
2026                fn decode_scalar_payload(
2027                    bytes: &[u8],
2028                    field_name: &'static str,
2029                ) -> Result<Self, InternalError> {
2030                    let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
2031                        InternalError::persisted_row_field_payload_exact_len_required(
2032                            field_name,
2033                            "uint",
2034                            SCALAR_WORD64_PAYLOAD_LEN,
2035                        )
2036                    })?;
2037                    <$ty>::try_from(u64::from_le_bytes(raw)).map_err(|_| {
2038                        InternalError::persisted_row_field_payload_out_of_range(
2039                            field_name,
2040                            "unsigned",
2041                        )
2042                    })
2043                }
2044            }
2045        )*
2046    };
2047}
2048
2049impl_persisted_scalar_signed!(i8, i16, i32, i64);
2050impl_persisted_scalar_unsigned!(u8, u16, u32, u64);
2051
2052impl PersistedScalar for bool {
2053    const CODEC: ScalarCodec = ScalarCodec::Bool;
2054
2055    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
2056        Ok(vec![u8::from(*self)])
2057    }
2058
2059    fn decode_scalar_payload(
2060        bytes: &[u8],
2061        field_name: &'static str,
2062    ) -> Result<Self, InternalError> {
2063        let [value] = bytes else {
2064            return Err(
2065                InternalError::persisted_row_field_payload_exact_len_required(
2066                    field_name,
2067                    "bool",
2068                    SCALAR_BOOL_PAYLOAD_LEN,
2069                ),
2070            );
2071        };
2072
2073        match *value {
2074            SCALAR_BOOL_FALSE_TAG => Ok(false),
2075            SCALAR_BOOL_TRUE_TAG => Ok(true),
2076            _ => Err(InternalError::persisted_row_field_payload_invalid_byte(
2077                field_name, "bool", *value,
2078            )),
2079        }
2080    }
2081}
2082
2083impl PersistedScalar for String {
2084    const CODEC: ScalarCodec = ScalarCodec::Text;
2085
2086    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
2087        Ok(self.as_bytes().to_vec())
2088    }
2089
2090    fn decode_scalar_payload(
2091        bytes: &[u8],
2092        field_name: &'static str,
2093    ) -> Result<Self, InternalError> {
2094        str::from_utf8(bytes).map(str::to_owned).map_err(|err| {
2095            InternalError::persisted_row_field_text_payload_invalid_utf8(field_name, err)
2096        })
2097    }
2098}
2099
2100impl PersistedScalar for Vec<u8> {
2101    const CODEC: ScalarCodec = ScalarCodec::Blob;
2102
2103    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
2104        Ok(self.clone())
2105    }
2106
2107    fn decode_scalar_payload(
2108        bytes: &[u8],
2109        _field_name: &'static str,
2110    ) -> Result<Self, InternalError> {
2111        Ok(bytes.to_vec())
2112    }
2113}
2114
2115impl PersistedScalar for Blob {
2116    const CODEC: ScalarCodec = ScalarCodec::Blob;
2117
2118    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
2119        Ok(self.to_vec())
2120    }
2121
2122    fn decode_scalar_payload(
2123        bytes: &[u8],
2124        _field_name: &'static str,
2125    ) -> Result<Self, InternalError> {
2126        Ok(Self::from(bytes))
2127    }
2128}
2129
2130impl PersistedScalar for Ulid {
2131    const CODEC: ScalarCodec = ScalarCodec::Ulid;
2132
2133    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
2134        Ok(self.to_bytes().to_vec())
2135    }
2136
2137    fn decode_scalar_payload(
2138        bytes: &[u8],
2139        field_name: &'static str,
2140    ) -> Result<Self, InternalError> {
2141        Self::try_from_bytes(bytes)
2142            .map_err(|err| InternalError::persisted_row_field_decode_failed(field_name, err))
2143    }
2144}
2145
2146impl PersistedScalar for Timestamp {
2147    const CODEC: ScalarCodec = ScalarCodec::Timestamp;
2148
2149    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
2150        Ok(self.as_millis().to_le_bytes().to_vec())
2151    }
2152
2153    fn decode_scalar_payload(
2154        bytes: &[u8],
2155        field_name: &'static str,
2156    ) -> Result<Self, InternalError> {
2157        let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
2158            InternalError::persisted_row_field_payload_exact_len_required(
2159                field_name,
2160                "timestamp",
2161                SCALAR_WORD64_PAYLOAD_LEN,
2162            )
2163        })?;
2164
2165        Ok(Self::from_millis(i64::from_le_bytes(raw)))
2166    }
2167}
2168
2169impl PersistedScalar for Date {
2170    const CODEC: ScalarCodec = ScalarCodec::Date;
2171
2172    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
2173        Ok(self.as_days_since_epoch().to_le_bytes().to_vec())
2174    }
2175
2176    fn decode_scalar_payload(
2177        bytes: &[u8],
2178        field_name: &'static str,
2179    ) -> Result<Self, InternalError> {
2180        let raw: [u8; SCALAR_WORD32_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
2181            InternalError::persisted_row_field_payload_exact_len_required(
2182                field_name,
2183                "date",
2184                SCALAR_WORD32_PAYLOAD_LEN,
2185            )
2186        })?;
2187
2188        Ok(Self::from_days_since_epoch(i32::from_le_bytes(raw)))
2189    }
2190}
2191
2192impl PersistedScalar for Duration {
2193    const CODEC: ScalarCodec = ScalarCodec::Duration;
2194
2195    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
2196        Ok(self.as_millis().to_le_bytes().to_vec())
2197    }
2198
2199    fn decode_scalar_payload(
2200        bytes: &[u8],
2201        field_name: &'static str,
2202    ) -> Result<Self, InternalError> {
2203        let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
2204            InternalError::persisted_row_field_payload_exact_len_required(
2205                field_name,
2206                "duration",
2207                SCALAR_WORD64_PAYLOAD_LEN,
2208            )
2209        })?;
2210
2211        Ok(Self::from_millis(u64::from_le_bytes(raw)))
2212    }
2213}
2214
2215impl PersistedScalar for Float32 {
2216    const CODEC: ScalarCodec = ScalarCodec::Float32;
2217
2218    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
2219        Ok(self.get().to_bits().to_le_bytes().to_vec())
2220    }
2221
2222    fn decode_scalar_payload(
2223        bytes: &[u8],
2224        field_name: &'static str,
2225    ) -> Result<Self, InternalError> {
2226        let raw: [u8; SCALAR_WORD32_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
2227            InternalError::persisted_row_field_payload_exact_len_required(
2228                field_name,
2229                "float32",
2230                SCALAR_WORD32_PAYLOAD_LEN,
2231            )
2232        })?;
2233        let value = f32::from_bits(u32::from_le_bytes(raw));
2234
2235        Self::try_new(value).ok_or_else(|| {
2236            InternalError::persisted_row_field_payload_non_finite(field_name, "float32")
2237        })
2238    }
2239}
2240
2241impl PersistedScalar for Float64 {
2242    const CODEC: ScalarCodec = ScalarCodec::Float64;
2243
2244    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
2245        Ok(self.get().to_bits().to_le_bytes().to_vec())
2246    }
2247
2248    fn decode_scalar_payload(
2249        bytes: &[u8],
2250        field_name: &'static str,
2251    ) -> Result<Self, InternalError> {
2252        let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
2253            InternalError::persisted_row_field_payload_exact_len_required(
2254                field_name,
2255                "float64",
2256                SCALAR_WORD64_PAYLOAD_LEN,
2257            )
2258        })?;
2259        let value = f64::from_bits(u64::from_le_bytes(raw));
2260
2261        Self::try_new(value).ok_or_else(|| {
2262            InternalError::persisted_row_field_payload_non_finite(field_name, "float64")
2263        })
2264    }
2265}
2266
2267impl PersistedScalar for Principal {
2268    const CODEC: ScalarCodec = ScalarCodec::Principal;
2269
2270    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
2271        self.to_bytes()
2272            .map_err(|err| InternalError::persisted_row_field_encode_failed("principal", err))
2273    }
2274
2275    fn decode_scalar_payload(
2276        bytes: &[u8],
2277        field_name: &'static str,
2278    ) -> Result<Self, InternalError> {
2279        Self::try_from_bytes(bytes)
2280            .map_err(|err| InternalError::persisted_row_field_decode_failed(field_name, err))
2281    }
2282}
2283
2284impl PersistedScalar for Subaccount {
2285    const CODEC: ScalarCodec = ScalarCodec::Subaccount;
2286
2287    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
2288        Ok(self.to_bytes().to_vec())
2289    }
2290
2291    fn decode_scalar_payload(
2292        bytes: &[u8],
2293        field_name: &'static str,
2294    ) -> Result<Self, InternalError> {
2295        let raw: [u8; SCALAR_SUBACCOUNT_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
2296            InternalError::persisted_row_field_payload_exact_len_required(
2297                field_name,
2298                "subaccount",
2299                SCALAR_SUBACCOUNT_PAYLOAD_LEN,
2300            )
2301        })?;
2302
2303        Ok(Self::from_array(raw))
2304    }
2305}
2306
2307impl PersistedScalar for () {
2308    const CODEC: ScalarCodec = ScalarCodec::Unit;
2309
2310    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
2311        Ok(Vec::new())
2312    }
2313
2314    fn decode_scalar_payload(
2315        bytes: &[u8],
2316        field_name: &'static str,
2317    ) -> Result<Self, InternalError> {
2318        if !bytes.is_empty() {
2319            return Err(InternalError::persisted_row_field_payload_must_be_empty(
2320                field_name, "unit",
2321            ));
2322        }
2323
2324        Ok(())
2325    }
2326}
2327
2328impl PersistedScalar for Unit {
2329    const CODEC: ScalarCodec = ScalarCodec::Unit;
2330
2331    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
2332        Ok(Vec::new())
2333    }
2334
2335    fn decode_scalar_payload(
2336        bytes: &[u8],
2337        field_name: &'static str,
2338    ) -> Result<Self, InternalError> {
2339        if !bytes.is_empty() {
2340            return Err(InternalError::persisted_row_field_payload_must_be_empty(
2341                field_name, "unit",
2342            ));
2343        }
2344
2345        Ok(Self)
2346    }
2347}
2348
2349///
2350/// TESTS
2351///
2352
2353#[cfg(test)]
2354mod tests {
2355    use super::{
2356        FieldSlot, ScalarSlotValueRef, ScalarValueRef, SerializedFieldUpdate,
2357        SerializedPatchWriter, SerializedUpdatePatch, SlotBufferWriter, SlotReader, SlotWriter,
2358        UpdatePatch, apply_serialized_update_patch_to_raw_row, apply_update_patch_to_raw_row,
2359        decode_persisted_custom_many_slot_payload, decode_persisted_custom_slot_payload,
2360        decode_persisted_non_null_slot_payload, decode_persisted_option_slot_payload,
2361        decode_persisted_slot_payload, decode_slot_value_by_contract, decode_slot_value_from_bytes,
2362        encode_persisted_custom_many_slot_payload, encode_persisted_custom_slot_payload,
2363        encode_scalar_slot_value, encode_slot_payload_from_parts, encode_slot_value_from_value,
2364        serialize_entity_slots_as_update_patch, serialize_update_patch_fields,
2365    };
2366    use crate::{
2367        db::{
2368            codec::serialize_row_payload,
2369            data::{RawRow, StructuralSlotReader},
2370        },
2371        error::InternalError,
2372        model::{
2373            EntityModel,
2374            field::{EnumVariantModel, FieldKind, FieldModel, FieldStorageDecode},
2375        },
2376        testing::SIMPLE_ENTITY_TAG,
2377        traits::{EntitySchema, FieldValue},
2378        types::{Account, Principal, Subaccount},
2379        value::{Value, ValueEnum},
2380    };
2381    use icydb_derive::{FieldProjection, PersistedRow};
2382    use serde::{Deserialize, Serialize};
2383
2384    crate::test_canister! {
2385        ident = PersistedRowPatchBridgeCanister,
2386        commit_memory_id = crate::testing::test_commit_memory_id(),
2387    }
2388
2389    crate::test_store! {
2390        ident = PersistedRowPatchBridgeStore,
2391        canister = PersistedRowPatchBridgeCanister,
2392    }
2393
2394    ///
2395    /// PersistedRowPatchBridgeEntity
2396    ///
2397    /// PersistedRowPatchBridgeEntity
2398    ///
2399    /// PersistedRowPatchBridgeEntity is the smallest derive-owned entity used
2400    /// to validate the typed-entity -> serialized-patch bridge.
2401    /// It lets the persisted-row tests exercise the same dense slot writer the
2402    /// save/update path now uses.
2403    ///
2404
2405    #[derive(
2406        Clone, Debug, Default, Deserialize, FieldProjection, PartialEq, PersistedRow, Serialize,
2407    )]
2408    struct PersistedRowPatchBridgeEntity {
2409        id: crate::types::Ulid,
2410        name: String,
2411    }
2412
2413    #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
2414    struct PersistedRowProfileValue {
2415        bio: String,
2416    }
2417
2418    impl FieldValue for PersistedRowProfileValue {
2419        fn kind() -> crate::traits::FieldValueKind {
2420            crate::traits::FieldValueKind::Structured { queryable: false }
2421        }
2422
2423        fn to_value(&self) -> Value {
2424            Value::from_map(vec![(
2425                Value::Text("bio".to_string()),
2426                Value::Text(self.bio.clone()),
2427            )])
2428            .expect("profile test value should encode as canonical map")
2429        }
2430
2431        fn from_value(value: &Value) -> Option<Self> {
2432            let Value::Map(entries) = value else {
2433                return None;
2434            };
2435            let normalized = Value::normalize_map_entries(entries.clone()).ok()?;
2436            let bio = normalized
2437                .iter()
2438                .find_map(|(entry_key, entry_value)| match entry_key {
2439                    Value::Text(entry_key) if entry_key == "bio" => match entry_value {
2440                        Value::Text(bio) => Some(bio.clone()),
2441                        _ => None,
2442                    },
2443                    _ => None,
2444                })?;
2445
2446            if normalized.len() != 1 {
2447                return None;
2448            }
2449
2450            Some(Self { bio })
2451        }
2452    }
2453
2454    crate::test_entity_schema! {
2455        ident = PersistedRowPatchBridgeEntity,
2456        id = crate::types::Ulid,
2457        id_field = id,
2458        entity_name = "PersistedRowPatchBridgeEntity",
2459        entity_tag = SIMPLE_ENTITY_TAG,
2460        pk_index = 0,
2461        fields = [
2462            ("id", FieldKind::Ulid),
2463            ("name", FieldKind::Text),
2464        ],
2465        indexes = [],
2466        store = PersistedRowPatchBridgeStore,
2467        canister = PersistedRowPatchBridgeCanister,
2468    }
2469
2470    static STATE_VARIANTS: &[EnumVariantModel] = &[EnumVariantModel::new(
2471        "Loaded",
2472        Some(&FieldKind::Uint),
2473        FieldStorageDecode::ByKind,
2474    )];
2475    static FIELD_MODELS: [FieldModel; 2] = [
2476        FieldModel::new("name", FieldKind::Text),
2477        FieldModel::new_with_storage_decode("payload", FieldKind::Text, FieldStorageDecode::Value),
2478    ];
2479    static LIST_FIELD_MODELS: [FieldModel; 1] =
2480        [FieldModel::new("tags", FieldKind::List(&FieldKind::Text))];
2481    static MAP_FIELD_MODELS: [FieldModel; 1] = [FieldModel::new(
2482        "props",
2483        FieldKind::Map {
2484            key: &FieldKind::Text,
2485            value: &FieldKind::Uint,
2486        },
2487    )];
2488    static ENUM_FIELD_MODELS: [FieldModel; 1] = [FieldModel::new(
2489        "state",
2490        FieldKind::Enum {
2491            path: "tests::State",
2492            variants: STATE_VARIANTS,
2493        },
2494    )];
2495    static ACCOUNT_FIELD_MODELS: [FieldModel; 1] = [FieldModel::new("owner", FieldKind::Account)];
2496    static REQUIRED_STRUCTURED_FIELD_MODELS: [FieldModel; 1] = [FieldModel::new(
2497        "profile",
2498        FieldKind::Structured { queryable: false },
2499    )];
2500    static OPTIONAL_STRUCTURED_FIELD_MODELS: [FieldModel; 1] =
2501        [FieldModel::new_with_storage_decode_and_nullability(
2502            "profile",
2503            FieldKind::Structured { queryable: false },
2504            FieldStorageDecode::ByKind,
2505            true,
2506        )];
2507    static STRUCTURED_MAP_VALUE_KIND: FieldKind = FieldKind::Structured { queryable: false };
2508    static STRUCTURED_MAP_VALUE_STORAGE_FIELD_MODELS: [FieldModel; 1] =
2509        [FieldModel::new_with_storage_decode(
2510            "projects",
2511            FieldKind::Map {
2512                key: &FieldKind::Principal,
2513                value: &STRUCTURED_MAP_VALUE_KIND,
2514            },
2515            FieldStorageDecode::Value,
2516        )];
2517    static INDEX_MODELS: [&crate::model::index::IndexModel; 0] = [];
2518    static TEST_MODEL: EntityModel = EntityModel::new(
2519        "tests::PersistedRowFieldCodecEntity",
2520        "persisted_row_field_codec_entity",
2521        &FIELD_MODELS[0],
2522        &FIELD_MODELS,
2523        &INDEX_MODELS,
2524    );
2525    static LIST_MODEL: EntityModel = EntityModel::new(
2526        "tests::PersistedRowListFieldCodecEntity",
2527        "persisted_row_list_field_codec_entity",
2528        &LIST_FIELD_MODELS[0],
2529        &LIST_FIELD_MODELS,
2530        &INDEX_MODELS,
2531    );
2532    static MAP_MODEL: EntityModel = EntityModel::new(
2533        "tests::PersistedRowMapFieldCodecEntity",
2534        "persisted_row_map_field_codec_entity",
2535        &MAP_FIELD_MODELS[0],
2536        &MAP_FIELD_MODELS,
2537        &INDEX_MODELS,
2538    );
2539    static ENUM_MODEL: EntityModel = EntityModel::new(
2540        "tests::PersistedRowEnumFieldCodecEntity",
2541        "persisted_row_enum_field_codec_entity",
2542        &ENUM_FIELD_MODELS[0],
2543        &ENUM_FIELD_MODELS,
2544        &INDEX_MODELS,
2545    );
2546    static ACCOUNT_MODEL: EntityModel = EntityModel::new(
2547        "tests::PersistedRowAccountFieldCodecEntity",
2548        "persisted_row_account_field_codec_entity",
2549        &ACCOUNT_FIELD_MODELS[0],
2550        &ACCOUNT_FIELD_MODELS,
2551        &INDEX_MODELS,
2552    );
2553    static REQUIRED_STRUCTURED_MODEL: EntityModel = EntityModel::new(
2554        "tests::PersistedRowRequiredStructuredFieldCodecEntity",
2555        "persisted_row_required_structured_field_codec_entity",
2556        &REQUIRED_STRUCTURED_FIELD_MODELS[0],
2557        &REQUIRED_STRUCTURED_FIELD_MODELS,
2558        &INDEX_MODELS,
2559    );
2560    static OPTIONAL_STRUCTURED_MODEL: EntityModel = EntityModel::new(
2561        "tests::PersistedRowOptionalStructuredFieldCodecEntity",
2562        "persisted_row_optional_structured_field_codec_entity",
2563        &OPTIONAL_STRUCTURED_FIELD_MODELS[0],
2564        &OPTIONAL_STRUCTURED_FIELD_MODELS,
2565        &INDEX_MODELS,
2566    );
2567    static STRUCTURED_MAP_VALUE_STORAGE_MODEL: EntityModel = EntityModel::new(
2568        "tests::PersistedRowStructuredMapValueStorageEntity",
2569        "persisted_row_structured_map_value_storage_entity",
2570        &STRUCTURED_MAP_VALUE_STORAGE_FIELD_MODELS[0],
2571        &STRUCTURED_MAP_VALUE_STORAGE_FIELD_MODELS,
2572        &INDEX_MODELS,
2573    );
2574
2575    fn encode_slot_payload_allowing_missing_for_tests(
2576        model: &'static EntityModel,
2577        slots: &[Option<&[u8]>],
2578    ) -> Result<Vec<u8>, InternalError> {
2579        if slots.len() != model.fields().len() {
2580            return Err(InternalError::persisted_row_encode_failed(format!(
2581                "noncanonical slot payload test helper expected {} slots for entity '{}', found {}",
2582                model.fields().len(),
2583                model.path(),
2584                slots.len()
2585            )));
2586        }
2587        let mut payload_bytes = Vec::new();
2588        let mut slot_table = Vec::with_capacity(slots.len());
2589
2590        for slot_payload in slots {
2591            match slot_payload {
2592                Some(bytes) => {
2593                    let start = u32::try_from(payload_bytes.len()).map_err(|_| {
2594                        InternalError::persisted_row_encode_failed(
2595                            "slot payload start exceeds u32 range",
2596                        )
2597                    })?;
2598                    let len = u32::try_from(bytes.len()).map_err(|_| {
2599                        InternalError::persisted_row_encode_failed(
2600                            "slot payload length exceeds u32 range",
2601                        )
2602                    })?;
2603                    payload_bytes.extend_from_slice(bytes);
2604                    slot_table.push((start, len));
2605                }
2606                None => slot_table.push((0_u32, 0_u32)),
2607            }
2608        }
2609
2610        encode_slot_payload_from_parts(slots.len(), slot_table.as_slice(), payload_bytes.as_slice())
2611    }
2612
2613    #[test]
2614    fn decode_slot_value_from_bytes_decodes_scalar_slots_through_one_owner() {
2615        let payload =
2616            encode_scalar_slot_value(ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")));
2617        let value =
2618            decode_slot_value_from_bytes(&TEST_MODEL, 0, payload.as_slice()).expect("decode slot");
2619
2620        assert_eq!(value, Value::Text("Ada".to_string()));
2621    }
2622
2623    #[test]
2624    fn decode_slot_value_from_bytes_reports_scalar_prefix_bytes() {
2625        let err = decode_slot_value_from_bytes(&TEST_MODEL, 0, &[0x00, 1])
2626            .expect_err("invalid scalar slot prefix should fail closed");
2627
2628        assert!(
2629            err.message
2630                .contains("expected slot envelope prefix byte 0xFF, found 0x00"),
2631            "unexpected error: {err:?}"
2632        );
2633    }
2634
2635    #[test]
2636    fn decode_slot_value_from_bytes_respects_value_storage_decode_contract() {
2637        let payload = crate::serialize::serialize(&Value::Text("Ada".to_string()))
2638            .expect("encode value-storage payload");
2639
2640        let value =
2641            decode_slot_value_from_bytes(&TEST_MODEL, 1, payload.as_slice()).expect("decode slot");
2642
2643        assert_eq!(value, Value::Text("Ada".to_string()));
2644    }
2645
2646    #[test]
2647    fn encode_slot_value_from_value_roundtrips_scalar_slots() {
2648        let payload = encode_slot_value_from_value(&TEST_MODEL, 0, &Value::Text("Ada".to_string()))
2649            .expect("encode slot");
2650        let decoded =
2651            decode_slot_value_from_bytes(&TEST_MODEL, 0, payload.as_slice()).expect("decode slot");
2652
2653        assert_eq!(decoded, Value::Text("Ada".to_string()));
2654    }
2655
2656    #[test]
2657    fn encode_slot_value_from_value_roundtrips_value_storage_slots() {
2658        let payload = encode_slot_value_from_value(&TEST_MODEL, 1, &Value::Text("Ada".to_string()))
2659            .expect("encode slot");
2660        let decoded =
2661            decode_slot_value_from_bytes(&TEST_MODEL, 1, payload.as_slice()).expect("decode slot");
2662
2663        assert_eq!(decoded, Value::Text("Ada".to_string()));
2664    }
2665
2666    #[test]
2667    fn encode_slot_value_from_value_roundtrips_list_by_kind_slots() {
2668        let payload = encode_slot_value_from_value(
2669            &LIST_MODEL,
2670            0,
2671            &Value::List(vec![Value::Text("alpha".to_string())]),
2672        )
2673        .expect("encode list slot");
2674        let decoded =
2675            decode_slot_value_from_bytes(&LIST_MODEL, 0, payload.as_slice()).expect("decode slot");
2676
2677        assert_eq!(decoded, Value::List(vec![Value::Text("alpha".to_string())]),);
2678    }
2679
2680    #[test]
2681    fn encode_slot_value_from_value_roundtrips_map_by_kind_slots() {
2682        let payload = encode_slot_value_from_value(
2683            &MAP_MODEL,
2684            0,
2685            &Value::Map(vec![(Value::Text("alpha".to_string()), Value::Uint(7))]),
2686        )
2687        .expect("encode map slot");
2688        let decoded =
2689            decode_slot_value_from_bytes(&MAP_MODEL, 0, payload.as_slice()).expect("decode slot");
2690
2691        assert_eq!(
2692            decoded,
2693            Value::Map(vec![(Value::Text("alpha".to_string()), Value::Uint(7))]),
2694        );
2695    }
2696
2697    #[test]
2698    fn encode_slot_value_from_value_accepts_value_storage_maps_with_structured_values() {
2699        let principal = Principal::dummy(7);
2700        let project = Value::from_map(vec![
2701            (Value::Text("pid".to_string()), Value::Principal(principal)),
2702            (
2703                Value::Text("status".to_string()),
2704                Value::Enum(ValueEnum::new(
2705                    "Saved",
2706                    Some("design::app::user::customise::project::ProjectStatus"),
2707                )),
2708            ),
2709        ])
2710        .expect("project value should normalize into a canonical map");
2711        let projects = Value::from_map(vec![(Value::Principal(principal), project)])
2712            .expect("outer map should normalize into a canonical map");
2713
2714        let payload =
2715            encode_slot_value_from_value(&STRUCTURED_MAP_VALUE_STORAGE_MODEL, 0, &projects)
2716                .expect("encode structured map slot");
2717        let decoded = decode_slot_value_from_bytes(
2718            &STRUCTURED_MAP_VALUE_STORAGE_MODEL,
2719            0,
2720            payload.as_slice(),
2721        )
2722        .expect("decode structured map slot");
2723
2724        assert_eq!(decoded, projects);
2725    }
2726
2727    #[test]
2728    fn encode_slot_value_from_value_roundtrips_enum_by_kind_slots() {
2729        let payload = encode_slot_value_from_value(
2730            &ENUM_MODEL,
2731            0,
2732            &Value::Enum(
2733                ValueEnum::new("Loaded", Some("tests::State")).with_payload(Value::Uint(7)),
2734            ),
2735        )
2736        .expect("encode enum slot");
2737        let decoded =
2738            decode_slot_value_from_bytes(&ENUM_MODEL, 0, payload.as_slice()).expect("decode slot");
2739
2740        assert_eq!(
2741            decoded,
2742            Value::Enum(
2743                ValueEnum::new("Loaded", Some("tests::State")).with_payload(Value::Uint(7,))
2744            ),
2745        );
2746    }
2747
2748    #[test]
2749    fn encode_slot_value_from_value_roundtrips_leaf_by_kind_wrapper_slots() {
2750        let account = Account::from_parts(Principal::dummy(7), Some(Subaccount::from([7_u8; 32])));
2751        let payload = encode_slot_value_from_value(&ACCOUNT_MODEL, 0, &Value::Account(account))
2752            .expect("encode account slot");
2753        let decoded = decode_slot_value_from_bytes(&ACCOUNT_MODEL, 0, payload.as_slice())
2754            .expect("decode slot");
2755
2756        assert_eq!(decoded, Value::Account(account));
2757    }
2758
2759    #[test]
2760    fn custom_slot_payload_roundtrips_structured_field_value() {
2761        let profile = PersistedRowProfileValue {
2762            bio: "Ada".to_string(),
2763        };
2764        let payload = encode_persisted_custom_slot_payload(&profile, "profile")
2765            .expect("encode custom structured payload");
2766        let decoded = decode_persisted_custom_slot_payload::<PersistedRowProfileValue>(
2767            payload.as_slice(),
2768            "profile",
2769        )
2770        .expect("decode custom structured payload");
2771
2772        assert_eq!(decoded, profile);
2773        assert_eq!(
2774            decode_persisted_slot_payload::<Value>(payload.as_slice(), "profile")
2775                .expect("decode raw value payload"),
2776            profile.to_value(),
2777        );
2778    }
2779
2780    #[test]
2781    fn custom_many_slot_payload_roundtrips_structured_value_lists() {
2782        let profiles = vec![
2783            PersistedRowProfileValue {
2784                bio: "Ada".to_string(),
2785            },
2786            PersistedRowProfileValue {
2787                bio: "Grace".to_string(),
2788            },
2789        ];
2790        let payload = encode_persisted_custom_many_slot_payload(profiles.as_slice(), "profiles")
2791            .expect("encode custom structured list payload");
2792        let decoded = decode_persisted_custom_many_slot_payload::<PersistedRowProfileValue>(
2793            payload.as_slice(),
2794            "profiles",
2795        )
2796        .expect("decode custom structured list payload");
2797
2798        assert_eq!(decoded, profiles);
2799    }
2800
2801    #[test]
2802    fn decode_persisted_non_null_slot_payload_rejects_null_for_required_structured_fields() {
2803        let err =
2804            decode_persisted_non_null_slot_payload::<PersistedRowProfileValue>(&[0xF6], "profile")
2805                .expect_err("required structured payload must reject null");
2806
2807        assert!(
2808            err.message
2809                .contains("unexpected null for non-nullable field"),
2810            "unexpected error: {err:?}"
2811        );
2812    }
2813
2814    #[test]
2815    fn decode_persisted_option_slot_payload_treats_cbor_null_as_none() {
2816        let decoded =
2817            decode_persisted_option_slot_payload::<PersistedRowProfileValue>(&[0xF6], "profile")
2818                .expect("optional structured payload should decode");
2819
2820        assert_eq!(decoded, None);
2821    }
2822
2823    #[test]
2824    fn encode_slot_value_from_value_rejects_null_for_required_structured_slots() {
2825        let err = encode_slot_value_from_value(&REQUIRED_STRUCTURED_MODEL, 0, &Value::Null)
2826            .expect_err("required structured slot must reject null");
2827
2828        assert!(
2829            err.message.contains("required field cannot store null"),
2830            "unexpected error: {err:?}"
2831        );
2832    }
2833
2834    #[test]
2835    fn encode_slot_value_from_value_allows_null_for_optional_structured_slots() {
2836        let payload = encode_slot_value_from_value(&OPTIONAL_STRUCTURED_MODEL, 0, &Value::Null)
2837            .expect("optional structured slot should allow null");
2838        let decoded =
2839            decode_slot_value_from_bytes(&OPTIONAL_STRUCTURED_MODEL, 0, payload.as_slice())
2840                .expect("optional structured slot should decode");
2841
2842        assert_eq!(decoded, Value::Null);
2843    }
2844
2845    #[test]
2846    fn encode_slot_value_from_value_rejects_unknown_enum_payload_variants() {
2847        let err = encode_slot_value_from_value(
2848            &ENUM_MODEL,
2849            0,
2850            &Value::Enum(
2851                ValueEnum::new("Unknown", Some("tests::State")).with_payload(Value::Uint(7)),
2852            ),
2853        )
2854        .expect_err("unknown enum payload should fail closed");
2855
2856        assert!(err.message.contains("unknown enum variant"));
2857    }
2858
2859    #[test]
2860    fn structural_slot_reader_and_direct_decode_share_the_same_field_codec_boundary() {
2861        let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
2862        let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
2863            .expect("encode value-storage payload");
2864        writer
2865            .write_scalar(0, ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")))
2866            .expect("write scalar slot");
2867        writer
2868            .write_slot(1, Some(payload.as_slice()))
2869            .expect("write value-storage slot");
2870        let raw_row = RawRow::try_new(
2871            serialize_row_payload(writer.finish().expect("finish slot payload"))
2872                .expect("serialize row payload"),
2873        )
2874        .expect("build raw row");
2875
2876        let direct_slots =
2877            StructuralSlotReader::from_raw_row(&raw_row, &TEST_MODEL).expect("decode row");
2878        let mut cached_slots =
2879            StructuralSlotReader::from_raw_row(&raw_row, &TEST_MODEL).expect("decode row");
2880
2881        let direct_name = decode_slot_value_by_contract(&direct_slots, 0).expect("decode name");
2882        let direct_payload =
2883            decode_slot_value_by_contract(&direct_slots, 1).expect("decode payload");
2884        let cached_name = cached_slots.get_value(0).expect("cached name");
2885        let cached_payload = cached_slots.get_value(1).expect("cached payload");
2886
2887        assert_eq!(direct_name, cached_name);
2888        assert_eq!(direct_payload, cached_payload);
2889    }
2890
2891    #[test]
2892    fn apply_update_patch_to_raw_row_updates_only_targeted_slots() {
2893        let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
2894        let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
2895            .expect("encode value-storage payload");
2896        writer
2897            .write_scalar(0, ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")))
2898            .expect("write scalar slot");
2899        writer
2900            .write_slot(1, Some(payload.as_slice()))
2901            .expect("write value-storage slot");
2902        let raw_row = RawRow::try_new(
2903            serialize_row_payload(writer.finish().expect("finish slot payload"))
2904                .expect("serialize row payload"),
2905        )
2906        .expect("build raw row");
2907        let patch = UpdatePatch::new().set(
2908            FieldSlot::from_index(&TEST_MODEL, 0).expect("resolve slot"),
2909            Value::Text("Grace".to_string()),
2910        );
2911
2912        let patched =
2913            apply_update_patch_to_raw_row(&TEST_MODEL, &raw_row, &patch).expect("apply patch");
2914        let mut reader =
2915            StructuralSlotReader::from_raw_row(&patched, &TEST_MODEL).expect("decode row");
2916
2917        assert_eq!(
2918            reader.get_value(0).expect("decode slot"),
2919            Some(Value::Text("Grace".to_string()))
2920        );
2921        assert_eq!(
2922            reader.get_value(1).expect("decode slot"),
2923            Some(Value::Text("payload".to_string()))
2924        );
2925    }
2926
2927    #[test]
2928    fn serialize_update_patch_fields_encodes_canonical_slot_payloads() {
2929        let patch = UpdatePatch::new()
2930            .set(
2931                FieldSlot::from_index(&TEST_MODEL, 0).expect("resolve slot"),
2932                Value::Text("Grace".to_string()),
2933            )
2934            .set(
2935                FieldSlot::from_index(&TEST_MODEL, 1).expect("resolve slot"),
2936                Value::Text("payload".to_string()),
2937            );
2938
2939        let serialized =
2940            serialize_update_patch_fields(&TEST_MODEL, &patch).expect("serialize patch");
2941
2942        assert_eq!(serialized.entries().len(), 2);
2943        assert_eq!(
2944            decode_slot_value_from_bytes(
2945                &TEST_MODEL,
2946                serialized.entries()[0].slot().index(),
2947                serialized.entries()[0].payload(),
2948            )
2949            .expect("decode slot payload"),
2950            Value::Text("Grace".to_string())
2951        );
2952        assert_eq!(
2953            decode_slot_value_from_bytes(
2954                &TEST_MODEL,
2955                serialized.entries()[1].slot().index(),
2956                serialized.entries()[1].payload(),
2957            )
2958            .expect("decode slot payload"),
2959            Value::Text("payload".to_string())
2960        );
2961    }
2962
2963    #[test]
2964    fn serialized_patch_writer_rejects_clear_slots() {
2965        let mut writer = SerializedPatchWriter::for_model(&TEST_MODEL);
2966
2967        let err = writer
2968            .write_slot(0, None)
2969            .expect_err("0.65 patch staging must reject missing-slot clears");
2970
2971        assert!(
2972            err.message
2973                .contains("serialized patch writer cannot clear slot 0"),
2974            "unexpected error: {err:?}"
2975        );
2976        assert!(
2977            err.message.contains(TEST_MODEL.path()),
2978            "unexpected error: {err:?}"
2979        );
2980    }
2981
2982    #[test]
2983    fn slot_buffer_writer_rejects_clear_slots() {
2984        let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
2985
2986        let err = writer
2987            .write_slot(0, None)
2988            .expect_err("canonical row staging must reject missing-slot clears");
2989
2990        assert!(
2991            err.message
2992                .contains("slot buffer writer cannot clear slot 0"),
2993            "unexpected error: {err:?}"
2994        );
2995        assert!(
2996            err.message.contains(TEST_MODEL.path()),
2997            "unexpected error: {err:?}"
2998        );
2999    }
3000
3001    #[test]
3002    fn apply_update_patch_to_raw_row_uses_last_write_wins() {
3003        let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
3004        let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
3005            .expect("encode value-storage payload");
3006        writer
3007            .write_scalar(0, ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")))
3008            .expect("write scalar slot");
3009        writer
3010            .write_slot(1, Some(payload.as_slice()))
3011            .expect("write value-storage slot");
3012        let raw_row = RawRow::try_new(
3013            serialize_row_payload(writer.finish().expect("finish slot payload"))
3014                .expect("serialize row payload"),
3015        )
3016        .expect("build raw row");
3017        let slot = FieldSlot::from_index(&TEST_MODEL, 0).expect("resolve slot");
3018        let patch = UpdatePatch::new()
3019            .set(slot, Value::Text("Grace".to_string()))
3020            .set(slot, Value::Text("Lin".to_string()));
3021
3022        let patched =
3023            apply_update_patch_to_raw_row(&TEST_MODEL, &raw_row, &patch).expect("apply patch");
3024        let mut reader =
3025            StructuralSlotReader::from_raw_row(&patched, &TEST_MODEL).expect("decode row");
3026
3027        assert_eq!(
3028            reader.get_value(0).expect("decode slot"),
3029            Some(Value::Text("Lin".to_string()))
3030        );
3031    }
3032
3033    #[test]
3034    fn apply_update_patch_to_raw_row_rejects_noncanonical_missing_slot_baseline() {
3035        let empty_slots = vec![None::<&[u8]>; TEST_MODEL.fields().len()];
3036        let raw_row = RawRow::try_new(
3037            serialize_row_payload(
3038                encode_slot_payload_allowing_missing_for_tests(&TEST_MODEL, empty_slots.as_slice())
3039                    .expect("encode malformed slot payload"),
3040            )
3041            .expect("serialize row payload"),
3042        )
3043        .expect("build raw row");
3044        let patch = UpdatePatch::new().set(
3045            FieldSlot::from_index(&TEST_MODEL, 1).expect("resolve slot"),
3046            Value::Text("payload".to_string()),
3047        );
3048
3049        let err = apply_update_patch_to_raw_row(&TEST_MODEL, &raw_row, &patch)
3050            .expect_err("noncanonical rows with missing slots must fail closed");
3051
3052        assert_eq!(
3053            err.message,
3054            "row decode failed: missing slot payload: slot=0"
3055        );
3056    }
3057
3058    #[test]
3059    fn apply_serialized_update_patch_to_raw_row_rejects_noncanonical_scalar_baseline() {
3060        let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
3061            .expect("encode value-storage payload");
3062        let malformed_slots = [Some([0xF6].as_slice()), Some(payload.as_slice())];
3063        let raw_row = RawRow::try_new(
3064            serialize_row_payload(
3065                encode_slot_payload_allowing_missing_for_tests(&TEST_MODEL, &malformed_slots)
3066                    .expect("encode malformed slot payload"),
3067            )
3068            .expect("serialize row payload"),
3069        )
3070        .expect("build raw row");
3071        let patch = UpdatePatch::new().set(
3072            FieldSlot::from_index(&TEST_MODEL, 1).expect("resolve slot"),
3073            Value::Text("patched".to_string()),
3074        );
3075        let serialized =
3076            serialize_update_patch_fields(&TEST_MODEL, &patch).expect("serialize patch");
3077
3078        let err = apply_serialized_update_patch_to_raw_row(&TEST_MODEL, &raw_row, &serialized)
3079            .expect_err("noncanonical scalar baseline must fail closed");
3080
3081        assert!(
3082            err.message.contains("field 'name'"),
3083            "unexpected error: {err:?}"
3084        );
3085        assert!(
3086            err.message
3087                .contains("expected slot envelope prefix byte 0xFF, found 0xF6"),
3088            "unexpected error: {err:?}"
3089        );
3090    }
3091
3092    #[test]
3093    fn apply_serialized_update_patch_to_raw_row_rejects_noncanonical_scalar_patch_payload() {
3094        let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
3095        let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
3096            .expect("encode value-storage payload");
3097        writer
3098            .write_scalar(0, ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")))
3099            .expect("write scalar slot");
3100        writer
3101            .write_slot(1, Some(payload.as_slice()))
3102            .expect("write value-storage slot");
3103        let raw_row = RawRow::try_new(
3104            serialize_row_payload(writer.finish().expect("finish slot payload"))
3105                .expect("serialize row payload"),
3106        )
3107        .expect("build raw row");
3108        let serialized = SerializedUpdatePatch::new(vec![SerializedFieldUpdate::new(
3109            FieldSlot::from_index(&TEST_MODEL, 0).expect("resolve slot"),
3110            vec![0xF6],
3111        )]);
3112
3113        let err = apply_serialized_update_patch_to_raw_row(&TEST_MODEL, &raw_row, &serialized)
3114            .expect_err("noncanonical serialized patch payload must fail closed");
3115
3116        assert!(
3117            err.message.contains("field 'name'"),
3118            "unexpected error: {err:?}"
3119        );
3120        assert!(
3121            err.message
3122                .contains("expected slot envelope prefix byte 0xFF, found 0xF6"),
3123            "unexpected error: {err:?}"
3124        );
3125    }
3126
3127    #[test]
3128    fn structural_slot_reader_rejects_slot_count_mismatch() {
3129        let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
3130        let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
3131            .expect("encode payload");
3132        writer
3133            .write_scalar(0, ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")))
3134            .expect("write scalar slot");
3135        writer
3136            .write_slot(1, Some(payload.as_slice()))
3137            .expect("write payload slot");
3138        let mut payload = writer.finish().expect("finish slot payload");
3139        payload[..2].copy_from_slice(&1_u16.to_be_bytes());
3140        let raw_row =
3141            RawRow::try_new(serialize_row_payload(payload).expect("serialize row payload"))
3142                .expect("build raw row");
3143
3144        let err = StructuralSlotReader::from_raw_row(&raw_row, &TEST_MODEL)
3145            .err()
3146            .expect("slot-count drift must fail closed");
3147
3148        assert_eq!(
3149            err.message,
3150            "row decode failed: slot count mismatch: expected 2, found 1"
3151        );
3152    }
3153
3154    #[test]
3155    fn structural_slot_reader_rejects_slot_span_exceeds_payload_length() {
3156        let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
3157        let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
3158            .expect("encode payload");
3159        writer
3160            .write_scalar(0, ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")))
3161            .expect("write scalar slot");
3162        writer
3163            .write_slot(1, Some(payload.as_slice()))
3164            .expect("write payload slot");
3165        let mut payload = writer.finish().expect("finish slot payload");
3166
3167        // Corrupt the second slot span so the payload table points past the
3168        // available data section.
3169        payload[14..18].copy_from_slice(&u32::MAX.to_be_bytes());
3170        let raw_row =
3171            RawRow::try_new(serialize_row_payload(payload).expect("serialize row payload"))
3172                .expect("build raw row");
3173
3174        let err = StructuralSlotReader::from_raw_row(&raw_row, &TEST_MODEL)
3175            .err()
3176            .expect("slot span drift must fail closed");
3177
3178        assert_eq!(
3179            err.message,
3180            "row decode failed: slot span exceeds payload length"
3181        );
3182    }
3183
3184    #[test]
3185    fn apply_serialized_update_patch_to_raw_row_replays_preencoded_slots() {
3186        let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
3187        let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
3188            .expect("encode value-storage payload");
3189        writer
3190            .write_scalar(0, ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")))
3191            .expect("write scalar slot");
3192        writer
3193            .write_slot(1, Some(payload.as_slice()))
3194            .expect("write value-storage slot");
3195        let raw_row = RawRow::try_new(
3196            serialize_row_payload(writer.finish().expect("finish slot payload"))
3197                .expect("serialize row payload"),
3198        )
3199        .expect("build raw row");
3200        let patch = UpdatePatch::new().set(
3201            FieldSlot::from_index(&TEST_MODEL, 0).expect("resolve slot"),
3202            Value::Text("Grace".to_string()),
3203        );
3204        let serialized =
3205            serialize_update_patch_fields(&TEST_MODEL, &patch).expect("serialize patch");
3206
3207        let patched = raw_row
3208            .apply_serialized_update_patch(&TEST_MODEL, &serialized)
3209            .expect("apply serialized patch");
3210        let mut reader =
3211            StructuralSlotReader::from_raw_row(&patched, &TEST_MODEL).expect("decode row");
3212
3213        assert_eq!(
3214            reader.get_value(0).expect("decode slot"),
3215            Some(Value::Text("Grace".to_string()))
3216        );
3217    }
3218
3219    #[test]
3220    fn serialize_entity_slots_as_update_patch_replays_full_typed_after_image() {
3221        let old_entity = PersistedRowPatchBridgeEntity {
3222            id: crate::types::Ulid::from_u128(7),
3223            name: "Ada".to_string(),
3224        };
3225        let new_entity = PersistedRowPatchBridgeEntity {
3226            id: crate::types::Ulid::from_u128(7),
3227            name: "Grace".to_string(),
3228        };
3229        let raw_row = RawRow::from_entity(&old_entity).expect("encode old row");
3230        let old_decoded = raw_row
3231            .try_decode::<PersistedRowPatchBridgeEntity>()
3232            .expect("decode old entity");
3233        let serialized =
3234            serialize_entity_slots_as_update_patch(&new_entity).expect("serialize entity patch");
3235        let direct =
3236            RawRow::from_serialized_update_patch(PersistedRowPatchBridgeEntity::MODEL, &serialized)
3237                .expect("direct row emission should succeed");
3238
3239        let patched = raw_row
3240            .apply_serialized_update_patch(PersistedRowPatchBridgeEntity::MODEL, &serialized)
3241            .expect("apply serialized patch");
3242        let decoded = patched
3243            .try_decode::<PersistedRowPatchBridgeEntity>()
3244            .expect("decode patched entity");
3245
3246        assert_eq!(
3247            direct, patched,
3248            "fresh row emission and replayed full-image patch must converge on identical bytes",
3249        );
3250        assert_eq!(old_decoded, old_entity);
3251        assert_eq!(decoded, new_entity);
3252    }
3253
3254    #[test]
3255    fn canonical_row_from_raw_row_replays_canonical_full_image_bytes() {
3256        let entity = PersistedRowPatchBridgeEntity {
3257            id: crate::types::Ulid::from_u128(11),
3258            name: "Ada".to_string(),
3259        };
3260        let raw_row = RawRow::from_entity(&entity).expect("encode canonical row");
3261        let canonical = crate::db::data::canonical_row_from_raw_row(
3262            PersistedRowPatchBridgeEntity::MODEL,
3263            &raw_row,
3264        )
3265        .expect("canonical re-emission should succeed");
3266
3267        assert_eq!(
3268            canonical.as_bytes(),
3269            raw_row.as_bytes(),
3270            "canonical raw-row rebuild must preserve already canonical row bytes",
3271        );
3272    }
3273
3274    #[test]
3275    fn canonical_row_from_raw_row_rejects_noncanonical_scalar_payload() {
3276        let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
3277            .expect("encode value-storage payload");
3278        let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
3279        writer
3280            .write_slot(0, Some(&[0xF6]))
3281            .expect("write malformed scalar slot");
3282        writer
3283            .write_slot(1, Some(payload.as_slice()))
3284            .expect("write value-storage slot");
3285        let raw_row = RawRow::try_new(
3286            serialize_row_payload(writer.finish().expect("finish slot payload"))
3287                .expect("serialize malformed row"),
3288        )
3289        .expect("build malformed raw row");
3290
3291        let err = crate::db::data::canonical_row_from_raw_row(&TEST_MODEL, &raw_row)
3292            .expect_err("canonical raw-row rebuild must reject malformed scalar payloads");
3293
3294        assert!(
3295            err.message.contains("field 'name'"),
3296            "unexpected error: {err:?}"
3297        );
3298        assert!(
3299            err.message
3300                .contains("expected slot envelope prefix byte 0xFF, found 0xF6"),
3301            "unexpected error: {err:?}"
3302        );
3303    }
3304
3305    #[test]
3306    fn raw_row_from_serialized_update_patch_rejects_noncanonical_scalar_payload() {
3307        let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
3308            .expect("encode value-storage payload");
3309        let serialized = SerializedUpdatePatch::new(vec![
3310            SerializedFieldUpdate::new(
3311                FieldSlot::from_index(&TEST_MODEL, 0).expect("resolve slot"),
3312                vec![0xF6],
3313            ),
3314            SerializedFieldUpdate::new(
3315                FieldSlot::from_index(&TEST_MODEL, 1).expect("resolve slot"),
3316                payload,
3317            ),
3318        ]);
3319
3320        let err = RawRow::from_serialized_update_patch(&TEST_MODEL, &serialized)
3321            .expect_err("fresh row emission must reject noncanonical serialized patch payloads");
3322
3323        assert!(
3324            err.message.contains("field 'name'"),
3325            "unexpected error: {err:?}"
3326        );
3327        assert!(
3328            err.message
3329                .contains("expected slot envelope prefix byte 0xFF, found 0xF6"),
3330            "unexpected error: {err:?}"
3331        );
3332    }
3333
3334    #[test]
3335    fn raw_row_from_serialized_update_patch_rejects_incomplete_slot_image() {
3336        let serialized = SerializedUpdatePatch::new(vec![SerializedFieldUpdate::new(
3337            FieldSlot::from_index(&TEST_MODEL, 1).expect("resolve slot"),
3338            crate::serialize::serialize(&Value::Text("payload".to_string()))
3339                .expect("encode value-storage payload"),
3340        )]);
3341
3342        let err = RawRow::from_serialized_update_patch(&TEST_MODEL, &serialized)
3343            .expect_err("fresh row emission must reject missing declared slots");
3344
3345        assert!(
3346            err.message.contains("serialized patch did not emit slot 0"),
3347            "unexpected error: {err:?}"
3348        );
3349    }
3350}