Skip to main content

icydb_core/db/data/persisted_row/
mod.rs

1//! Module: data::persisted_row
2//! Responsibility: slot-oriented persisted-row seams over runtime row bytes.
3//! Does not own: row envelope versions, typed entity materialization, or query semantics.
4//! Boundary: commit/index planning, row writes, and typed materialization all
5//! consume the canonical slot-oriented persisted-row boundary here.
6
7mod codec;
8
9use crate::{
10    db::{
11        codec::serialize_row_payload,
12        data::{
13            CanonicalRow, DataKey, RawRow, StructuralRowDecodeError, StructuralRowFieldBytes,
14            decode_storage_key_field_bytes, decode_structural_field_by_kind_bytes,
15            decode_structural_value_storage_bytes,
16        },
17        scalar_expr::compile_scalar_literal_expr_value,
18        schema::{field_type_from_model_kind, literal_matches_type},
19    },
20    error::InternalError,
21    model::{
22        entity::{EntityModel, resolve_field_slot, resolve_primary_key_slot},
23        field::{FieldKind, FieldModel, FieldStorageDecode, LeafCodec},
24    },
25    serialize::serialize,
26    traits::EntityKind,
27    value::{StorageKey, Value, ValueEnum},
28};
29use serde_cbor::{Value as CborValue, value::to_value as to_cbor_value};
30use std::{cmp::Ordering, collections::BTreeMap};
31
32use self::codec::{decode_scalar_slot_value, encode_scalar_slot_value};
33
34pub use self::codec::{
35    PersistedScalar, ScalarSlotValueRef, ScalarValueRef, decode_persisted_custom_many_slot_payload,
36    decode_persisted_custom_slot_payload, decode_persisted_non_null_slot_payload,
37    decode_persisted_option_scalar_slot_payload, decode_persisted_option_slot_payload,
38    decode_persisted_scalar_slot_payload, decode_persisted_slot_payload,
39    encode_persisted_custom_many_slot_payload, encode_persisted_custom_slot_payload,
40    encode_persisted_option_scalar_slot_payload, encode_persisted_scalar_slot_payload,
41    encode_persisted_slot_payload,
42};
43
44///
45/// FieldSlot
46///
47/// FieldSlot
48///
49/// FieldSlot is the structural stable slot reference used by the `0.64`
50/// patching path.
51/// It intentionally carries only the model-local slot index so field-level
52/// mutation stays structural instead of reintroducing typed entity helpers.
53///
54
55#[allow(dead_code)]
56#[derive(Clone, Copy, Debug, Eq, PartialEq)]
57pub(in crate::db) struct FieldSlot {
58    index: usize,
59}
60
61#[allow(dead_code)]
62impl FieldSlot {
63    /// Resolve one stable field slot by runtime field name.
64    #[must_use]
65    pub(in crate::db) fn resolve(model: &'static EntityModel, field_name: &str) -> Option<Self> {
66        resolve_field_slot(model, field_name).map(|index| Self { index })
67    }
68
69    /// Build one stable field slot from an already validated index.
70    pub(in crate::db) fn from_index(
71        model: &'static EntityModel,
72        index: usize,
73    ) -> Result<Self, InternalError> {
74        field_model_for_slot(model, index)?;
75
76        Ok(Self { index })
77    }
78
79    /// Return the stable slot index inside `EntityModel::fields`.
80    #[must_use]
81    pub(in crate::db) const fn index(self) -> usize {
82        self.index
83    }
84}
85
86///
87/// FieldUpdate
88///
89/// FieldUpdate
90///
91/// FieldUpdate carries one ordered field-level mutation over the structural
92/// persisted-row boundary.
93/// `UpdatePatch` applies these entries in order and last write wins for the
94/// same slot.
95///
96
97#[allow(dead_code)]
98#[derive(Clone, Debug, Eq, PartialEq)]
99pub(in crate::db) struct FieldUpdate {
100    slot: FieldSlot,
101    value: Value,
102}
103
104#[allow(dead_code)]
105impl FieldUpdate {
106    /// Build one field-level structural update.
107    #[must_use]
108    pub(in crate::db) const fn new(slot: FieldSlot, value: Value) -> Self {
109        Self { slot, value }
110    }
111
112    /// Return the stable target slot.
113    #[must_use]
114    pub(in crate::db) const fn slot(&self) -> FieldSlot {
115        self.slot
116    }
117
118    /// Return the runtime value payload for this update.
119    #[must_use]
120    pub(in crate::db) const fn value(&self) -> &Value {
121        &self.value
122    }
123}
124
125///
126/// UpdatePatch
127///
128/// UpdatePatch
129///
130/// UpdatePatch is the ordered structural mutation program applied to one
131/// persisted row.
132/// This is the phase-1 `0.64` patch container: it updates slot values
133/// structurally and then re-encodes the full row.
134///
135
136#[derive(Clone, Debug, Default, Eq, PartialEq)]
137pub struct UpdatePatch {
138    entries: Vec<FieldUpdate>,
139}
140
141impl UpdatePatch {
142    /// Build one empty patch.
143    #[must_use]
144    pub const fn new() -> Self {
145        Self {
146            entries: Vec::new(),
147        }
148    }
149
150    /// Append one structural field update in declaration order.
151    #[must_use]
152    pub(in crate::db) fn set(mut self, slot: FieldSlot, value: Value) -> Self {
153        self.entries.push(FieldUpdate::new(slot, value));
154        self
155    }
156
157    /// Resolve one field name and append its structural update.
158    pub fn set_field(
159        self,
160        model: &'static EntityModel,
161        field_name: &str,
162        value: Value,
163    ) -> Result<Self, InternalError> {
164        let Some(slot) = FieldSlot::resolve(model, field_name) else {
165            return Err(InternalError::mutation_structural_field_unknown(
166                model.path(),
167                field_name,
168            ));
169        };
170
171        Ok(self.set(slot, value))
172    }
173
174    /// Borrow the ordered field updates carried by this patch.
175    #[must_use]
176    pub(in crate::db) const fn entries(&self) -> &[FieldUpdate] {
177        self.entries.as_slice()
178    }
179
180    /// Return whether this patch carries no field updates.
181    #[must_use]
182    pub(in crate::db) const fn is_empty(&self) -> bool {
183        self.entries.is_empty()
184    }
185}
186
187///
188/// SerializedFieldUpdate
189///
190/// SerializedFieldUpdate
191///
192/// SerializedFieldUpdate carries one ordered field-level mutation after the
193/// owning persisted-row field codec has already lowered the runtime `Value`
194/// into canonical slot payload bytes.
195/// This lets later patch-application stages consume one mechanical slot-patch
196/// artifact instead of rebuilding per-field encode dispatch.
197///
198
199#[allow(dead_code)]
200#[derive(Clone, Debug, Eq, PartialEq)]
201pub(in crate::db) struct SerializedFieldUpdate {
202    slot: FieldSlot,
203    payload: Vec<u8>,
204}
205
206#[allow(dead_code)]
207impl SerializedFieldUpdate {
208    /// Build one serialized structural field update.
209    #[must_use]
210    pub(in crate::db) const fn new(slot: FieldSlot, payload: Vec<u8>) -> Self {
211        Self { slot, payload }
212    }
213
214    /// Return the stable target slot.
215    #[must_use]
216    pub(in crate::db) const fn slot(&self) -> FieldSlot {
217        self.slot
218    }
219
220    /// Borrow the canonical slot payload bytes for this update when present.
221    #[must_use]
222    pub(in crate::db) const fn payload(&self) -> &[u8] {
223        self.payload.as_slice()
224    }
225}
226
227///
228/// SerializedUpdatePatch
229///
230/// SerializedUpdatePatch
231///
232/// SerializedUpdatePatch is the canonical serialized form of `UpdatePatch`
233/// over persisted-row slot payload bytes.
234/// This is the structural patch artifact later write-path stages can stage or
235/// replay without re-entering field-contract encode logic.
236///
237
238#[allow(dead_code)]
239#[derive(Clone, Debug, Default, Eq, PartialEq)]
240pub(in crate::db) struct SerializedUpdatePatch {
241    entries: Vec<SerializedFieldUpdate>,
242}
243
244#[allow(dead_code)]
245impl SerializedUpdatePatch {
246    /// Build one serialized patch from already encoded slot payloads.
247    #[must_use]
248    pub(in crate::db) const fn new(entries: Vec<SerializedFieldUpdate>) -> Self {
249        Self { entries }
250    }
251
252    /// Borrow the ordered serialized field updates carried by this patch.
253    #[must_use]
254    pub(in crate::db) const fn entries(&self) -> &[SerializedFieldUpdate] {
255        self.entries.as_slice()
256    }
257
258    /// Return whether this serialized patch carries no field updates.
259    #[must_use]
260    pub(in crate::db) const fn is_empty(&self) -> bool {
261        self.entries.is_empty()
262    }
263}
264
265///
266/// SlotReader
267///
268/// SlotReader exposes one persisted row as stable slot-addressable fields.
269/// Callers may inspect field presence, borrow raw field bytes, or decode one
270/// field value on demand.
271///
272
273pub trait SlotReader {
274    /// Return the structural model that owns this slot mapping.
275    fn model(&self) -> &'static EntityModel;
276
277    /// Return whether the given slot is present in the persisted row.
278    fn has(&self, slot: usize) -> bool;
279
280    /// Borrow the raw persisted payload for one slot when present.
281    fn get_bytes(&self, slot: usize) -> Option<&[u8]>;
282
283    /// Decode one slot as a scalar leaf when the field model declares a scalar codec.
284    fn get_scalar(&self, slot: usize) -> Result<Option<ScalarSlotValueRef<'_>>, InternalError>;
285
286    /// Decode one slot value on demand using the field contract declared by the model.
287    fn get_value(&mut self, slot: usize) -> Result<Option<Value>, InternalError>;
288}
289
290///
291/// CanonicalSlotReader
292///
293/// CanonicalSlotReader
294///
295/// CanonicalSlotReader is the stricter structural row-reader contract used
296/// once `0.65` canonical-row invariants are in force.
297/// Declared slots must already exist, so callers can fail closed on missing
298/// payloads instead of carrying absent-slot fallback branches.
299///
300
301pub(in crate::db) trait CanonicalSlotReader: SlotReader {
302    /// Borrow one declared slot payload, erroring when the persisted row is not canonical.
303    fn required_bytes(&self, slot: usize) -> Result<&[u8], InternalError> {
304        let field = field_model_for_slot(self.model(), slot)?;
305
306        self.get_bytes(slot)
307            .ok_or_else(|| InternalError::persisted_row_declared_field_missing(field.name()))
308    }
309
310    /// Read one scalar slot through the structural fast path without allowing
311    /// declared-slot absence.
312    fn required_scalar(&self, slot: usize) -> Result<ScalarSlotValueRef<'_>, InternalError> {
313        let field = field_model_for_slot(self.model(), slot)?;
314        debug_assert!(matches!(field.leaf_codec(), LeafCodec::Scalar(_)));
315
316        self.get_scalar(slot)?
317            .ok_or_else(|| InternalError::persisted_row_declared_field_missing(field.name()))
318    }
319
320    /// Decode one declared slot through the owning field contract without
321    /// allowing absent payloads.
322    fn required_value_by_contract(&self, slot: usize) -> Result<Value, InternalError> {
323        decode_slot_value_from_bytes(self.model(), slot, self.required_bytes(slot)?)
324    }
325}
326
327///
328/// SlotWriter
329///
330/// SlotWriter is the canonical row-container output seam used by persisted-row
331/// writers.
332///
333
334pub trait SlotWriter {
335    /// Record one slot payload for the current row.
336    fn write_slot(&mut self, slot: usize, payload: Option<&[u8]>) -> Result<(), InternalError>;
337
338    /// Record one scalar slot payload using the canonical scalar leaf envelope.
339    fn write_scalar(
340        &mut self,
341        slot: usize,
342        value: ScalarSlotValueRef<'_>,
343    ) -> Result<(), InternalError> {
344        let payload = encode_scalar_slot_value(value);
345
346        self.write_slot(slot, Some(payload.as_slice()))
347    }
348}
349
350// Resolve one staged slot cell by layout index before writer-specific payload handling.
351fn slot_cell_mut<T>(slots: &mut [T], slot: usize) -> Result<&mut T, InternalError> {
352    slots.get_mut(slot).ok_or_else(|| {
353        InternalError::persisted_row_encode_failed(
354            format!("slot {slot} is outside the row layout",),
355        )
356    })
357}
358
359// Reject slot clears at the canonical slot-image staging boundary while keeping
360// writer-specific error wording at the call site.
361fn required_slot_payload_bytes<'a>(
362    model: &'static EntityModel,
363    writer_label: &str,
364    slot: usize,
365    payload: Option<&'a [u8]>,
366) -> Result<&'a [u8], InternalError> {
367    payload.ok_or_else(|| {
368        InternalError::persisted_row_encode_failed(format!(
369            "{writer_label} cannot clear slot {slot} for entity '{}'",
370            model.path()
371        ))
372    })
373}
374
375// Encode one fixed-width slot table plus concatenated slot payload bytes into
376// the canonical row payload container.
377fn encode_slot_payload_from_parts(
378    slot_count: usize,
379    slot_table: &[(u32, u32)],
380    payload_bytes: &[u8],
381) -> Result<Vec<u8>, InternalError> {
382    let field_count = u16::try_from(slot_count).map_err(|_| {
383        InternalError::persisted_row_encode_failed(format!(
384            "field count {slot_count} exceeds u16 slot table capacity",
385        ))
386    })?;
387    let mut encoded = Vec::with_capacity(
388        usize::from(field_count) * (u32::BITS as usize / 4) + 2 + payload_bytes.len(),
389    );
390    encoded.extend_from_slice(&field_count.to_be_bytes());
391    for (start, len) in slot_table {
392        encoded.extend_from_slice(&start.to_be_bytes());
393        encoded.extend_from_slice(&len.to_be_bytes());
394    }
395    encoded.extend_from_slice(payload_bytes);
396
397    Ok(encoded)
398}
399
400///
401/// PersistedRow
402///
403/// PersistedRow is the derive-owned bridge between typed entities and
404/// slot-addressable persisted rows.
405/// It owns entity-specific materialization/default semantics while runtime
406/// paths stay structural at the row boundary.
407///
408
409pub trait PersistedRow: EntityKind + Sized {
410    /// Materialize one typed entity from one slot reader.
411    fn materialize_from_slots(slots: &mut dyn SlotReader) -> Result<Self, InternalError>;
412
413    /// Write one typed entity into one slot writer.
414    fn write_slots(&self, out: &mut dyn SlotWriter) -> Result<(), InternalError>;
415
416    /// Decode one slot value needed by structural planner/projection consumers.
417    fn project_slot(slots: &mut dyn SlotReader, slot: usize) -> Result<Option<Value>, InternalError>
418    where
419        Self: crate::traits::FieldProjection,
420    {
421        let entity = Self::materialize_from_slots(slots)?;
422
423        Ok(<Self as crate::traits::FieldProjection>::get_value_by_index(&entity, slot))
424    }
425}
426
427/// Decode one slot value through the declared field contract without routing
428/// through `SlotReader::get_value`.
429#[cfg(test)]
430pub(in crate::db) fn decode_slot_value_by_contract(
431    slots: &dyn SlotReader,
432    slot: usize,
433) -> Result<Option<Value>, InternalError> {
434    let Some(raw_value) = slots.get_bytes(slot) else {
435        return Ok(None);
436    };
437
438    decode_slot_value_from_bytes(slots.model(), slot, raw_value).map(Some)
439}
440
441/// Decode one structural slot payload using the owning model field contract.
442///
443/// This is the canonical field-level decode boundary for persisted-row bytes.
444/// Higher-level row readers may still cache decoded values, but they should not
445/// rebuild scalar-vs-CBOR field dispatch themselves.
446pub(in crate::db) fn decode_slot_value_from_bytes(
447    model: &'static EntityModel,
448    slot: usize,
449    raw_value: &[u8],
450) -> Result<Value, InternalError> {
451    let field = field_model_for_slot(model, slot)?;
452
453    match field.leaf_codec() {
454        LeafCodec::Scalar(codec) => match decode_scalar_slot_value(raw_value, codec, field.name())?
455        {
456            ScalarSlotValueRef::Null => Ok(Value::Null),
457            ScalarSlotValueRef::Value(value) => Ok(value.into_value()),
458        },
459        LeafCodec::CborFallback => decode_non_scalar_slot_value(raw_value, field),
460    }
461}
462
463/// Encode one structural slot value using the owning model field contract.
464///
465/// This is the initial `0.64` write-side field-codec boundary. It currently
466/// covers:
467/// - scalar leaf slots
468/// - `FieldStorageDecode::Value` slots
469///
470/// Composite `ByKind` field encoding remains a follow-up slice so the runtime
471/// can add one structural encoder owner instead of quietly rebuilding typed
472/// per-field branches.
473#[allow(dead_code)]
474pub(in crate::db) fn encode_slot_value_from_value(
475    model: &'static EntityModel,
476    slot: usize,
477    value: &Value,
478) -> Result<Vec<u8>, InternalError> {
479    let field = field_model_for_slot(model, slot)?;
480    ensure_slot_value_matches_field_contract(field, value)?;
481
482    match field.storage_decode() {
483        FieldStorageDecode::Value => serialize(value)
484            .map_err(|err| InternalError::persisted_row_field_encode_failed(field.name(), err)),
485        FieldStorageDecode::ByKind => match field.leaf_codec() {
486            LeafCodec::Scalar(_) => {
487                let scalar = compile_scalar_literal_expr_value(value).ok_or_else(|| {
488                    InternalError::persisted_row_field_encode_failed(
489                        field.name(),
490                        format!(
491                            "field kind {:?} requires a scalar runtime value, found {value:?}",
492                            field.kind()
493                        ),
494                    )
495                })?;
496
497                Ok(encode_scalar_slot_value(scalar.as_slot_value_ref()))
498            }
499            LeafCodec::CborFallback => {
500                encode_structural_field_bytes_by_kind(field.kind(), value, field.name())
501            }
502        },
503    }
504}
505
506// Decode one slot payload and immediately re-encode it through the current
507// field contract so every row-emission path normalizes bytes at the boundary.
508fn canonicalize_slot_payload(
509    model: &'static EntityModel,
510    slot: usize,
511    raw_value: &[u8],
512) -> Result<Vec<u8>, InternalError> {
513    let value = decode_slot_value_from_bytes(model, slot, raw_value)?;
514
515    encode_slot_value_from_value(model, slot, &value)
516}
517
518// Build one dense canonical slot image from any slot-addressable payload source.
519// Callers keep ownership of missing-slot policy while this helper centralizes
520// the slot-by-slot canonicalization loop.
521fn dense_canonical_slot_image_from_payload_source<'a, F>(
522    model: &'static EntityModel,
523    mut payload_for_slot: F,
524) -> Result<Vec<Vec<u8>>, InternalError>
525where
526    F: FnMut(usize) -> Result<&'a [u8], InternalError>,
527{
528    let mut slot_payloads = Vec::with_capacity(model.fields().len());
529
530    for slot in 0..model.fields().len() {
531        let payload = payload_for_slot(slot)?;
532        slot_payloads.push(canonicalize_slot_payload(model, slot, payload)?);
533    }
534
535    Ok(slot_payloads)
536}
537
538// Emit one raw row from a dense canonical slot image.
539fn emit_raw_row_from_slot_payloads(
540    model: &'static EntityModel,
541    slot_payloads: &[Vec<u8>],
542) -> Result<CanonicalRow, InternalError> {
543    if slot_payloads.len() != model.fields().len() {
544        return Err(InternalError::persisted_row_encode_failed(format!(
545            "canonical slot image expected {} slots for entity '{}', found {}",
546            model.fields().len(),
547            model.path(),
548            slot_payloads.len()
549        )));
550    }
551
552    let mut writer = SlotBufferWriter::for_model(model);
553
554    // Phase 1: write the already canonicalized dense slot image through the
555    // single row-container authority.
556    for (slot, payload) in slot_payloads.iter().enumerate() {
557        writer.write_slot(slot, Some(payload.as_slice()))?;
558    }
559
560    // Phase 2: wrap the canonical slot container in the shared row envelope.
561    let encoded = serialize_row_payload(writer.finish()?)?;
562    let raw_row = RawRow::from_untrusted_bytes(encoded).map_err(InternalError::from)?;
563
564    Ok(CanonicalRow::from_canonical_raw_row(raw_row))
565}
566
567// Build one dense canonical slot image from a serialized patch, failing closed
568// when any declared slot is missing or any payload is non-canonical.
569fn dense_canonical_slot_image_from_serialized_patch(
570    model: &'static EntityModel,
571    patch: &SerializedUpdatePatch,
572) -> Result<Vec<Vec<u8>>, InternalError> {
573    let patch_payloads = serialized_patch_payload_by_slot(model, patch)?;
574
575    dense_canonical_slot_image_from_payload_source(model, |slot| {
576        patch_payloads[slot].ok_or_else(|| {
577            InternalError::persisted_row_encode_failed(format!(
578                "serialized patch did not emit slot {slot} for entity '{}'",
579                model.path()
580            ))
581        })
582    })
583}
584
585/// Build one canonical row from one serialized structural patch that already
586/// describes a full logical row image.
587pub(in crate::db) fn canonical_row_from_serialized_update_patch(
588    model: &'static EntityModel,
589    patch: &SerializedUpdatePatch,
590) -> Result<CanonicalRow, InternalError> {
591    let slot_payloads = dense_canonical_slot_image_from_serialized_patch(model, patch)?;
592
593    emit_raw_row_from_slot_payloads(model, slot_payloads.as_slice())
594}
595
596// Rebuild one full canonical row image from an existing raw row before it
597// crosses a storage write boundary.
598pub(in crate::db) fn canonical_row_from_raw_row(
599    model: &'static EntityModel,
600    raw_row: &RawRow,
601) -> Result<CanonicalRow, InternalError> {
602    let field_bytes = StructuralRowFieldBytes::from_raw_row(raw_row, model)
603        .map_err(StructuralRowDecodeError::into_internal_error)?;
604
605    // Phase 1: canonicalize every declared slot from the existing row image.
606    let slot_payloads = dense_canonical_slot_image_from_payload_source(model, |slot| {
607        field_bytes.field(slot).ok_or_else(|| {
608            InternalError::persisted_row_encode_failed(format!(
609                "slot {slot} is missing from the baseline row for entity '{}'",
610                model.path()
611            ))
612        })
613    })?;
614
615    // Phase 2: re-emit the full image through the single row-emission owner.
616    emit_raw_row_from_slot_payloads(model, slot_payloads.as_slice())
617}
618
619// Rewrap one row already loaded from storage as a canonical write token.
620pub(in crate::db) const fn canonical_row_from_stored_raw_row(raw_row: RawRow) -> CanonicalRow {
621    CanonicalRow::from_canonical_raw_row(raw_row)
622}
623
624/// Apply one ordered structural patch to one raw row using the current
625/// persisted-row field codec authority.
626#[allow(dead_code)]
627pub(in crate::db) fn apply_update_patch_to_raw_row(
628    model: &'static EntityModel,
629    raw_row: &RawRow,
630    patch: &UpdatePatch,
631) -> Result<CanonicalRow, InternalError> {
632    let serialized_patch = serialize_update_patch_fields(model, patch)?;
633
634    apply_serialized_update_patch_to_raw_row(model, raw_row, &serialized_patch)
635}
636
637/// Serialize one ordered structural patch into canonical slot payload bytes.
638///
639/// This is the phase-1 partial-serialization seam for `0.64`: later mutation
640/// stages can stage or replay one field patch without rebuilding the runtime
641/// value-to-bytes contract per consumer.
642#[allow(dead_code)]
643pub(in crate::db) fn serialize_update_patch_fields(
644    model: &'static EntityModel,
645    patch: &UpdatePatch,
646) -> Result<SerializedUpdatePatch, InternalError> {
647    if patch.is_empty() {
648        return Ok(SerializedUpdatePatch::default());
649    }
650
651    let mut entries = Vec::with_capacity(patch.entries().len());
652
653    // Phase 1: validate and encode each ordered field update through the
654    // canonical slot codec owner.
655    for entry in patch.entries() {
656        let slot = entry.slot();
657        let payload = encode_slot_value_from_value(model, slot.index(), entry.value())?;
658        entries.push(SerializedFieldUpdate::new(slot, payload));
659    }
660
661    Ok(SerializedUpdatePatch::new(entries))
662}
663
664/// Serialize one full typed entity image into the canonical serialized patch
665/// artifact used by row-boundary patch replay.
666///
667/// This keeps typed save/update APIs on the existing surface while moving the
668/// actual after-image staging onto the structural slot-patch boundary.
669#[allow(dead_code)]
670pub(in crate::db) fn serialize_entity_slots_as_update_patch<E>(
671    entity: &E,
672) -> Result<SerializedUpdatePatch, InternalError>
673where
674    E: PersistedRow,
675{
676    let mut writer = SerializedPatchWriter::for_model(E::MODEL);
677
678    // Phase 1: let the derive-owned persisted-row writer emit the complete
679    // structural slot image for this entity.
680    entity.write_slots(&mut writer)?;
681
682    // Phase 2: require a dense slot image so save/update replay remains
683    // equivalent to the existing full-row write semantics.
684    writer.finish_complete()
685}
686
687/// Apply one serialized structural patch to one raw row.
688///
689/// This mechanical replay step no longer owns any `Value -> bytes` dispatch.
690/// It only replays already encoded slot payloads over the current row layout.
691#[allow(dead_code)]
692pub(in crate::db) fn apply_serialized_update_patch_to_raw_row(
693    model: &'static EntityModel,
694    raw_row: &RawRow,
695    patch: &SerializedUpdatePatch,
696) -> Result<CanonicalRow, InternalError> {
697    if patch.is_empty() {
698        return canonical_row_from_raw_row(model, raw_row);
699    }
700
701    let field_bytes = StructuralRowFieldBytes::from_raw_row(raw_row, model)
702        .map_err(StructuralRowDecodeError::into_internal_error)?;
703    let patch_payloads = serialized_patch_payload_by_slot(model, patch)?;
704
705    // Phase 1: replay the current row layout slot-by-slot.
706    // Both patch and baseline bytes are normalized through the field contract
707    // so no opaque payload can cross into the emitted row image.
708    let slot_payloads = dense_canonical_slot_image_from_payload_source(model, |slot| {
709        if let Some(payload) = patch_payloads[slot] {
710            Ok(payload)
711        } else {
712            field_bytes.field(slot).ok_or_else(|| {
713                InternalError::persisted_row_encode_failed(format!(
714                    "slot {slot} is missing from the baseline row for entity '{}'",
715                    model.path()
716                ))
717            })
718        }
719    })?;
720
721    // Phase 2: emit the rebuilt row through the single row-construction owner.
722    emit_raw_row_from_slot_payloads(model, slot_payloads.as_slice())
723}
724
725// Decode one non-scalar slot through the exact persisted contract declared by
726// the field model.
727fn decode_non_scalar_slot_value(
728    raw_value: &[u8],
729    field: &FieldModel,
730) -> Result<Value, InternalError> {
731    let decoded = match field.storage_decode() {
732        crate::model::field::FieldStorageDecode::ByKind => {
733            decode_structural_field_by_kind_bytes(raw_value, field.kind())
734        }
735        crate::model::field::FieldStorageDecode::Value => {
736            decode_structural_value_storage_bytes(raw_value)
737        }
738    };
739
740    decoded.map_err(|err| {
741        InternalError::persisted_row_field_kind_decode_failed(field.name(), field.kind(), err)
742    })
743}
744
745// Validate one runtime value against the persisted field contract before field-
746// level structural encoding writes bytes into a row slot.
747#[allow(dead_code)]
748fn ensure_slot_value_matches_field_contract(
749    field: &FieldModel,
750    value: &Value,
751) -> Result<(), InternalError> {
752    if matches!(value, Value::Null) {
753        if field.nullable() {
754            return Ok(());
755        }
756
757        return Err(InternalError::persisted_row_field_encode_failed(
758            field.name(),
759            "required field cannot store null",
760        ));
761    }
762
763    // `FieldStorageDecode::Value` fields persist the generic `Value` envelope
764    // directly, so storage-side validation must accept structured leaves nested
765    // under collection contracts instead of reusing the predicate literal gate.
766    if matches!(field.storage_decode(), FieldStorageDecode::Value) {
767        if !storage_value_matches_field_kind(field.kind(), value) {
768            return Err(InternalError::persisted_row_field_encode_failed(
769                field.name(),
770                format!(
771                    "field kind {:?} does not accept runtime value {value:?}",
772                    field.kind()
773                ),
774            ));
775        }
776
777        ensure_decimal_scale_matches(field.name(), field.kind(), value)?;
778
779        return ensure_value_is_deterministic_for_storage(field.name(), field.kind(), value);
780    }
781
782    let field_type = field_type_from_model_kind(&field.kind());
783    if !literal_matches_type(value, &field_type) {
784        return Err(InternalError::persisted_row_field_encode_failed(
785            field.name(),
786            format!(
787                "field kind {:?} does not accept runtime value {value:?}",
788                field.kind()
789            ),
790        ));
791    }
792
793    ensure_decimal_scale_matches(field.name(), field.kind(), value)?;
794    ensure_value_is_deterministic_for_storage(field.name(), field.kind(), value)
795}
796
797// Match one runtime value against the semantic field kind used by value-backed
798// storage. Unlike predicate literals, non-queryable structured leaves are valid
799// persisted payloads when they arrive as canonical `Value::List` / `Value::Map`
800// shapes.
801fn storage_value_matches_field_kind(kind: FieldKind, value: &Value) -> bool {
802    match (kind, value) {
803        (FieldKind::Account, Value::Account(_))
804        | (FieldKind::Blob, Value::Blob(_))
805        | (FieldKind::Bool, Value::Bool(_))
806        | (FieldKind::Date, Value::Date(_))
807        | (FieldKind::Decimal { .. }, Value::Decimal(_))
808        | (FieldKind::Duration, Value::Duration(_))
809        | (FieldKind::Enum { .. }, Value::Enum(_))
810        | (FieldKind::Float32, Value::Float32(_))
811        | (FieldKind::Float64, Value::Float64(_))
812        | (FieldKind::Int, Value::Int(_))
813        | (FieldKind::Int128, Value::Int128(_))
814        | (FieldKind::IntBig, Value::IntBig(_))
815        | (FieldKind::Principal, Value::Principal(_))
816        | (FieldKind::Subaccount, Value::Subaccount(_))
817        | (FieldKind::Text, Value::Text(_))
818        | (FieldKind::Timestamp, Value::Timestamp(_))
819        | (FieldKind::Uint, Value::Uint(_))
820        | (FieldKind::Uint128, Value::Uint128(_))
821        | (FieldKind::UintBig, Value::UintBig(_))
822        | (FieldKind::Ulid, Value::Ulid(_))
823        | (FieldKind::Unit, Value::Unit)
824        | (FieldKind::Structured { .. }, Value::List(_) | Value::Map(_)) => true,
825        (FieldKind::Relation { key_kind, .. }, value) => {
826            storage_value_matches_field_kind(*key_kind, value)
827        }
828        (FieldKind::List(inner) | FieldKind::Set(inner), Value::List(items)) => items
829            .iter()
830            .all(|item| storage_value_matches_field_kind(*inner, item)),
831        (FieldKind::Map { key, value }, Value::Map(entries)) => {
832            if Value::validate_map_entries(entries.as_slice()).is_err() {
833                return false;
834            }
835
836            entries.iter().all(|(entry_key, entry_value)| {
837                storage_value_matches_field_kind(*key, entry_key)
838                    && storage_value_matches_field_kind(*value, entry_value)
839            })
840        }
841        _ => false,
842    }
843}
844
845// Enforce fixed decimal scales through nested collection/map shapes before a
846// field-level patch value is persisted.
847#[allow(dead_code)]
848fn ensure_decimal_scale_matches(
849    field_name: &str,
850    kind: FieldKind,
851    value: &Value,
852) -> Result<(), InternalError> {
853    if matches!(value, Value::Null) {
854        return Ok(());
855    }
856
857    match (kind, value) {
858        (FieldKind::Decimal { scale }, Value::Decimal(decimal)) => {
859            if decimal.scale() != scale {
860                return Err(InternalError::persisted_row_field_encode_failed(
861                    field_name,
862                    format!(
863                        "decimal scale mismatch: expected {scale}, found {}",
864                        decimal.scale()
865                    ),
866                ));
867            }
868
869            Ok(())
870        }
871        (FieldKind::Relation { key_kind, .. }, value) => {
872            ensure_decimal_scale_matches(field_name, *key_kind, value)
873        }
874        (FieldKind::List(inner) | FieldKind::Set(inner), Value::List(items)) => {
875            for item in items {
876                ensure_decimal_scale_matches(field_name, *inner, item)?;
877            }
878
879            Ok(())
880        }
881        (
882            FieldKind::Map {
883                key,
884                value: map_value,
885            },
886            Value::Map(entries),
887        ) => {
888            for (entry_key, entry_value) in entries {
889                ensure_decimal_scale_matches(field_name, *key, entry_key)?;
890                ensure_decimal_scale_matches(field_name, *map_value, entry_value)?;
891            }
892
893            Ok(())
894        }
895        _ => Ok(()),
896    }
897}
898
899// Enforce the canonical persisted ordering rules for set/map shapes before one
900// field-level patch value becomes row bytes.
901#[allow(dead_code)]
902fn ensure_value_is_deterministic_for_storage(
903    field_name: &str,
904    kind: FieldKind,
905    value: &Value,
906) -> Result<(), InternalError> {
907    match (kind, value) {
908        (FieldKind::Set(_), Value::List(items)) => {
909            for pair in items.windows(2) {
910                let [left, right] = pair else {
911                    continue;
912                };
913                if Value::canonical_cmp(left, right) != Ordering::Less {
914                    return Err(InternalError::persisted_row_field_encode_failed(
915                        field_name,
916                        "set payload must already be canonical and deduplicated",
917                    ));
918                }
919            }
920
921            Ok(())
922        }
923        (FieldKind::Map { .. }, Value::Map(entries)) => {
924            Value::validate_map_entries(entries.as_slice())
925                .map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err))?;
926
927            if !Value::map_entries_are_strictly_canonical(entries.as_slice()) {
928                return Err(InternalError::persisted_row_field_encode_failed(
929                    field_name,
930                    "map payload must already be canonical and deduplicated",
931                ));
932            }
933
934            Ok(())
935        }
936        _ => Ok(()),
937    }
938}
939
940// Materialize the last-write-wins serialized patch view indexed by stable slot.
941fn serialized_patch_payload_by_slot<'a>(
942    model: &'static EntityModel,
943    patch: &'a SerializedUpdatePatch,
944) -> Result<Vec<Option<&'a [u8]>>, InternalError> {
945    let mut payloads = vec![None; model.fields().len()];
946
947    for entry in patch.entries() {
948        let slot = entry.slot().index();
949        field_model_for_slot(model, slot)?;
950        payloads[slot] = Some(entry.payload());
951    }
952
953    Ok(payloads)
954}
955
956// Encode one `ByKind` field payload into the raw CBOR shape expected by the
957// structural field decoder.
958fn encode_structural_field_bytes_by_kind(
959    kind: FieldKind,
960    value: &Value,
961    field_name: &str,
962) -> Result<Vec<u8>, InternalError> {
963    let cbor_value = encode_structural_field_cbor_by_kind(kind, value, field_name)?;
964
965    serialize(&cbor_value)
966        .map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err))
967}
968
969// Encode one `ByKind` field payload into its raw CBOR value form.
970fn encode_structural_field_cbor_by_kind(
971    kind: FieldKind,
972    value: &Value,
973    field_name: &str,
974) -> Result<CborValue, InternalError> {
975    match (kind, value) {
976        (_, Value::Null) => Ok(CborValue::Null),
977        (FieldKind::Blob, Value::Blob(value)) => Ok(CborValue::Bytes(value.clone())),
978        (FieldKind::Bool, Value::Bool(value)) => Ok(CborValue::Bool(*value)),
979        (FieldKind::Text, Value::Text(value)) => Ok(CborValue::Text(value.clone())),
980        (FieldKind::Int, Value::Int(value)) => Ok(CborValue::Integer(i128::from(*value))),
981        (FieldKind::Uint, Value::Uint(value)) => Ok(CborValue::Integer(i128::from(*value))),
982        (FieldKind::Float32, Value::Float32(value)) => to_cbor_value(value)
983            .map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err)),
984        (FieldKind::Float64, Value::Float64(value)) => to_cbor_value(value)
985            .map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err)),
986        (FieldKind::Int128, Value::Int128(value)) => to_cbor_value(value)
987            .map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err)),
988        (FieldKind::Uint128, Value::Uint128(value)) => to_cbor_value(value)
989            .map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err)),
990        (FieldKind::Ulid, Value::Ulid(value)) => Ok(CborValue::Text(value.to_string())),
991        (FieldKind::Account, Value::Account(value)) => encode_leaf_cbor_value(value, field_name),
992        (FieldKind::Date, Value::Date(value)) => encode_leaf_cbor_value(value, field_name),
993        (FieldKind::Decimal { .. }, Value::Decimal(value)) => {
994            encode_leaf_cbor_value(value, field_name)
995        }
996        (FieldKind::Duration, Value::Duration(value)) => encode_leaf_cbor_value(value, field_name),
997        (FieldKind::IntBig, Value::IntBig(value)) => encode_leaf_cbor_value(value, field_name),
998        (FieldKind::Principal, Value::Principal(value)) => {
999            encode_leaf_cbor_value(value, field_name)
1000        }
1001        (FieldKind::Subaccount, Value::Subaccount(value)) => {
1002            encode_leaf_cbor_value(value, field_name)
1003        }
1004        (FieldKind::Timestamp, Value::Timestamp(value)) => {
1005            encode_leaf_cbor_value(value, field_name)
1006        }
1007        (FieldKind::UintBig, Value::UintBig(value)) => encode_leaf_cbor_value(value, field_name),
1008        (FieldKind::Unit, Value::Unit) => encode_leaf_cbor_value(&(), field_name),
1009        (FieldKind::Relation { key_kind, .. }, value) => {
1010            encode_structural_field_cbor_by_kind(*key_kind, value, field_name)
1011        }
1012        (FieldKind::List(inner) | FieldKind::Set(inner), Value::List(items)) => {
1013            Ok(CborValue::Array(
1014                items
1015                    .iter()
1016                    .map(|item| encode_structural_field_cbor_by_kind(*inner, item, field_name))
1017                    .collect::<Result<Vec<_>, _>>()?,
1018            ))
1019        }
1020        (FieldKind::Map { key, value }, Value::Map(entries)) => {
1021            let mut encoded = BTreeMap::new();
1022            for (entry_key, entry_value) in entries {
1023                encoded.insert(
1024                    encode_structural_field_cbor_by_kind(*key, entry_key, field_name)?,
1025                    encode_structural_field_cbor_by_kind(*value, entry_value, field_name)?,
1026                );
1027            }
1028
1029            Ok(CborValue::Map(encoded))
1030        }
1031        (FieldKind::Enum { path, variants }, Value::Enum(value)) => {
1032            encode_enum_cbor_value(path, variants, value, field_name)
1033        }
1034        (FieldKind::Structured { .. }, _) => Err(InternalError::persisted_row_field_encode_failed(
1035            field_name,
1036            "structured ByKind field encoding is unsupported",
1037        )),
1038        _ => Err(InternalError::persisted_row_field_encode_failed(
1039            field_name,
1040            format!("field kind {kind:?} does not accept runtime value {value:?}"),
1041        )),
1042    }
1043}
1044
1045// Encode one typed leaf wrapper into its raw CBOR value form.
1046fn encode_leaf_cbor_value<T>(value: &T, field_name: &str) -> Result<CborValue, InternalError>
1047where
1048    T: serde::Serialize,
1049{
1050    to_cbor_value(value)
1051        .map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err))
1052}
1053
1054// Encode one enum field using the same unit-vs-one-entry-map envelope expected
1055// by structural enum decode.
1056fn encode_enum_cbor_value(
1057    path: &'static str,
1058    variants: &'static [crate::model::field::EnumVariantModel],
1059    value: &ValueEnum,
1060    field_name: &str,
1061) -> Result<CborValue, InternalError> {
1062    if let Some(actual_path) = value.path()
1063        && actual_path != path
1064    {
1065        return Err(InternalError::persisted_row_field_encode_failed(
1066            field_name,
1067            format!("enum path mismatch: expected '{path}', found '{actual_path}'"),
1068        ));
1069    }
1070
1071    let Some(payload) = value.payload() else {
1072        return Ok(CborValue::Text(value.variant().to_string()));
1073    };
1074
1075    let Some(variant_model) = variants.iter().find(|item| item.ident() == value.variant()) else {
1076        return Err(InternalError::persisted_row_field_encode_failed(
1077            field_name,
1078            format!(
1079                "unknown enum variant '{}' for path '{path}'",
1080                value.variant()
1081            ),
1082        ));
1083    };
1084    let Some(payload_kind) = variant_model.payload_kind() else {
1085        return Err(InternalError::persisted_row_field_encode_failed(
1086            field_name,
1087            format!(
1088                "enum variant '{}' does not accept a payload",
1089                value.variant()
1090            ),
1091        ));
1092    };
1093
1094    let payload_value = match variant_model.payload_storage_decode() {
1095        FieldStorageDecode::ByKind => {
1096            encode_structural_field_cbor_by_kind(*payload_kind, payload, field_name)?
1097        }
1098        FieldStorageDecode::Value => to_cbor_value(payload)
1099            .map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err))?,
1100    };
1101
1102    let mut encoded = BTreeMap::new();
1103    encoded.insert(CborValue::Text(value.variant().to_string()), payload_value);
1104
1105    Ok(CborValue::Map(encoded))
1106}
1107
1108// Resolve one field model entry by stable slot index.
1109fn field_model_for_slot(
1110    model: &'static EntityModel,
1111    slot: usize,
1112) -> Result<&'static FieldModel, InternalError> {
1113    model
1114        .fields()
1115        .get(slot)
1116        .ok_or_else(|| InternalError::persisted_row_slot_lookup_out_of_bounds(model.path(), slot))
1117}
1118
1119///
1120/// SlotBufferWriter
1121///
1122/// SlotBufferWriter captures one dense canonical row worth of slot payloads
1123/// before they are encoded into the canonical slot container.
1124///
1125
1126pub(in crate::db) struct SlotBufferWriter {
1127    model: &'static EntityModel,
1128    slots: Vec<SlotBufferSlot>,
1129}
1130
1131impl SlotBufferWriter {
1132    /// Build one empty slot buffer for one entity model.
1133    pub(in crate::db) fn for_model(model: &'static EntityModel) -> Self {
1134        Self {
1135            model,
1136            slots: vec![SlotBufferSlot::Missing; model.fields().len()],
1137        }
1138    }
1139
1140    /// Encode the buffered slots into the canonical row payload.
1141    pub(in crate::db) fn finish(self) -> Result<Vec<u8>, InternalError> {
1142        let slot_count = self.slots.len();
1143        let mut payload_bytes = Vec::new();
1144        let mut slot_table = Vec::with_capacity(slot_count);
1145
1146        // Phase 1: require one payload for every declared slot before the row
1147        // can cross the canonical persisted-row boundary.
1148        for (slot, slot_payload) in self.slots.into_iter().enumerate() {
1149            match slot_payload {
1150                SlotBufferSlot::Set(bytes) => {
1151                    let start = u32::try_from(payload_bytes.len()).map_err(|_| {
1152                        InternalError::persisted_row_encode_failed(
1153                            "slot payload start exceeds u32 range",
1154                        )
1155                    })?;
1156                    let len = u32::try_from(bytes.len()).map_err(|_| {
1157                        InternalError::persisted_row_encode_failed(
1158                            "slot payload length exceeds u32 range",
1159                        )
1160                    })?;
1161                    payload_bytes.extend_from_slice(&bytes);
1162                    slot_table.push((start, len));
1163                }
1164                SlotBufferSlot::Missing => {
1165                    return Err(InternalError::persisted_row_encode_failed(format!(
1166                        "slot buffer writer did not emit slot {slot} for entity '{}'",
1167                        self.model.path()
1168                    )));
1169                }
1170            }
1171        }
1172
1173        // Phase 2: flatten the slot table plus payload bytes into the canonical row image.
1174        encode_slot_payload_from_parts(slot_count, slot_table.as_slice(), payload_bytes.as_slice())
1175    }
1176}
1177
1178impl SlotWriter for SlotBufferWriter {
1179    fn write_slot(&mut self, slot: usize, payload: Option<&[u8]>) -> Result<(), InternalError> {
1180        let entry = slot_cell_mut(self.slots.as_mut_slice(), slot)?;
1181        let payload = required_slot_payload_bytes(self.model, "slot buffer writer", slot, payload)?;
1182        *entry = SlotBufferSlot::Set(payload.to_vec());
1183
1184        Ok(())
1185    }
1186}
1187
1188///
1189/// SlotBufferSlot
1190///
1191/// SlotBufferSlot tracks whether one canonical row encoder has emitted a
1192/// payload for every declared slot before flattening the row payload.
1193///
1194
1195#[derive(Clone, Debug, Eq, PartialEq)]
1196enum SlotBufferSlot {
1197    Missing,
1198    Set(Vec<u8>),
1199}
1200
1201///
1202/// SerializedPatchWriter
1203///
1204/// SerializedPatchWriter
1205///
1206/// SerializedPatchWriter captures a dense typed entity slot image into the
1207/// serialized patch artifact used by `0.64` mutation staging.
1208/// Unlike `SlotBufferWriter`, this writer does not flatten into one row payload;
1209/// it preserves slot-level ownership so later stages can replay the row through
1210/// the structural patch boundary.
1211///
1212
1213struct SerializedPatchWriter {
1214    model: &'static EntityModel,
1215    slots: Vec<PatchWriterSlot>,
1216}
1217
1218impl SerializedPatchWriter {
1219    /// Build one empty serialized patch writer for one entity model.
1220    fn for_model(model: &'static EntityModel) -> Self {
1221        Self {
1222            model,
1223            slots: vec![PatchWriterSlot::Missing; model.fields().len()],
1224        }
1225    }
1226
1227    /// Materialize one dense serialized patch, erroring if the writer failed
1228    /// to emit any declared slot.
1229    fn finish_complete(self) -> Result<SerializedUpdatePatch, InternalError> {
1230        let mut entries = Vec::with_capacity(self.slots.len());
1231
1232        // Phase 1: require a complete slot image so typed save/update staging
1233        // stays equivalent to the existing full-row encoder.
1234        for (slot, payload) in self.slots.into_iter().enumerate() {
1235            let field_slot = FieldSlot::from_index(self.model, slot)?;
1236            let serialized = match payload {
1237                PatchWriterSlot::Set(payload) => SerializedFieldUpdate::new(field_slot, payload),
1238                PatchWriterSlot::Missing => {
1239                    return Err(InternalError::persisted_row_encode_failed(format!(
1240                        "serialized patch writer did not emit slot {slot} for entity '{}'",
1241                        self.model.path()
1242                    )));
1243                }
1244            };
1245            entries.push(serialized);
1246        }
1247
1248        Ok(SerializedUpdatePatch::new(entries))
1249    }
1250}
1251
1252impl SlotWriter for SerializedPatchWriter {
1253    fn write_slot(&mut self, slot: usize, payload: Option<&[u8]>) -> Result<(), InternalError> {
1254        let entry = slot_cell_mut(self.slots.as_mut_slice(), slot)?;
1255        let payload =
1256            required_slot_payload_bytes(self.model, "serialized patch writer", slot, payload)?;
1257        *entry = PatchWriterSlot::Set(payload.to_vec());
1258
1259        Ok(())
1260    }
1261}
1262
1263///
1264/// PatchWriterSlot
1265///
1266/// PatchWriterSlot
1267///
1268/// PatchWriterSlot tracks whether one dense slot-image writer has emitted a
1269/// payload or failed to visit the slot at all.
1270/// That lets the typed save/update bridge reject incomplete writers instead of
1271/// silently leaving stale bytes in the baseline row.
1272///
1273
1274#[derive(Clone, Debug, Eq, PartialEq)]
1275enum PatchWriterSlot {
1276    Missing,
1277    Set(Vec<u8>),
1278}
1279
1280///
1281/// StructuralSlotReader
1282///
1283/// StructuralSlotReader adapts the current persisted-row bytes into the
1284/// canonical slot-reader seam.
1285/// It validates row shape and fully decodes every declared field before any
1286/// consumer can observe the row, then keeps those decoded values cached so
1287/// later index/predicate reads do not re-run field decoders.
1288///
1289
1290pub(in crate::db) struct StructuralSlotReader<'a> {
1291    model: &'static EntityModel,
1292    field_bytes: StructuralRowFieldBytes<'a>,
1293    cached_values: Vec<CachedSlotValue>,
1294}
1295
1296impl<'a> StructuralSlotReader<'a> {
1297    /// Build one slot reader over one persisted row using the current structural row scanner.
1298    pub(in crate::db) fn from_raw_row(
1299        raw_row: &'a RawRow,
1300        model: &'static EntityModel,
1301    ) -> Result<Self, InternalError> {
1302        let field_bytes = StructuralRowFieldBytes::from_raw_row(raw_row, model)
1303            .map_err(StructuralRowDecodeError::into_internal_error)?;
1304        let cached_values = std::iter::repeat_with(|| CachedSlotValue::Pending)
1305            .take(model.fields().len())
1306            .collect();
1307        let mut reader = Self {
1308            model,
1309            field_bytes,
1310            cached_values,
1311        };
1312
1313        // Phase 1: force every declared slot through the owned field decode
1314        // contract once so malformed but unreferenced payloads cannot stay
1315        // latent behind consumer-specific partial decode paths.
1316        reader.decode_all_declared_slots()?;
1317
1318        Ok(reader)
1319    }
1320
1321    /// Validate the decoded primary-key slot against the authoritative row key.
1322    pub(in crate::db) fn validate_storage_key(
1323        &self,
1324        data_key: &DataKey,
1325    ) -> Result<(), InternalError> {
1326        let Some(primary_key_slot) = resolve_primary_key_slot(self.model) else {
1327            return Err(InternalError::persisted_row_primary_key_field_missing(
1328                self.model.path(),
1329            ));
1330        };
1331        let field = self.field_model(primary_key_slot)?;
1332        let decoded_key = match self.get_scalar(primary_key_slot)? {
1333            Some(ScalarSlotValueRef::Null) => None,
1334            Some(ScalarSlotValueRef::Value(value)) => storage_key_from_scalar_ref(value),
1335            None => Some(
1336                decode_storage_key_field_bytes(
1337                    self.required_field_bytes(primary_key_slot, field.name())?,
1338                    field.kind,
1339                )
1340                .map_err(|err| {
1341                    InternalError::persisted_row_primary_key_not_storage_encodable(data_key, err)
1342                })?,
1343            ),
1344        };
1345        let Some(decoded_key) = decoded_key else {
1346            return Err(InternalError::persisted_row_primary_key_slot_missing(
1347                data_key,
1348            ));
1349        };
1350        let expected_key = data_key.storage_key();
1351
1352        if decoded_key != expected_key {
1353            return Err(InternalError::persisted_row_key_mismatch(
1354                expected_key,
1355                decoded_key,
1356            ));
1357        }
1358
1359        Ok(())
1360    }
1361
1362    // Resolve one field model entry by stable slot index.
1363    fn field_model(&self, slot: usize) -> Result<&FieldModel, InternalError> {
1364        field_model_for_slot(self.model, slot)
1365    }
1366
1367    // Decode every declared slot exactly once at the structural row boundary so
1368    // later consumers inherit one globally enforced canonical-row contract.
1369    fn decode_all_declared_slots(&mut self) -> Result<(), InternalError> {
1370        for slot in 0..self.model.fields().len() {
1371            let _ = self.get_value(slot)?;
1372        }
1373
1374        Ok(())
1375    }
1376
1377    // Borrow one declared slot payload, treating absence as a persisted-row
1378    // invariant violation instead of a normal structural branch.
1379    pub(in crate::db) fn required_field_bytes(
1380        &self,
1381        slot: usize,
1382        field_name: &str,
1383    ) -> Result<&[u8], InternalError> {
1384        self.field_bytes
1385            .field(slot)
1386            .ok_or_else(|| InternalError::persisted_row_declared_field_missing(field_name))
1387    }
1388}
1389
1390// Convert one scalar slot fast-path value into its storage-key form when the
1391// field kind is storage-key-compatible.
1392const fn storage_key_from_scalar_ref(value: ScalarValueRef<'_>) -> Option<StorageKey> {
1393    match value {
1394        ScalarValueRef::Int(value) => Some(StorageKey::Int(value)),
1395        ScalarValueRef::Principal(value) => Some(StorageKey::Principal(value)),
1396        ScalarValueRef::Subaccount(value) => Some(StorageKey::Subaccount(value)),
1397        ScalarValueRef::Timestamp(value) => Some(StorageKey::Timestamp(value)),
1398        ScalarValueRef::Uint(value) => Some(StorageKey::Uint(value)),
1399        ScalarValueRef::Ulid(value) => Some(StorageKey::Ulid(value)),
1400        ScalarValueRef::Unit => Some(StorageKey::Unit),
1401        _ => None,
1402    }
1403}
1404
1405impl SlotReader for StructuralSlotReader<'_> {
1406    fn model(&self) -> &'static EntityModel {
1407        self.model
1408    }
1409
1410    fn has(&self, slot: usize) -> bool {
1411        self.field_bytes.field(slot).is_some()
1412    }
1413
1414    fn get_bytes(&self, slot: usize) -> Option<&[u8]> {
1415        self.field_bytes.field(slot)
1416    }
1417
1418    fn get_scalar(&self, slot: usize) -> Result<Option<ScalarSlotValueRef<'_>>, InternalError> {
1419        let field = self.field_model(slot)?;
1420
1421        match field.leaf_codec() {
1422            LeafCodec::Scalar(codec) => decode_scalar_slot_value(
1423                self.required_field_bytes(slot, field.name())?,
1424                codec,
1425                field.name(),
1426            )
1427            .map(Some),
1428            LeafCodec::CborFallback => Ok(None),
1429        }
1430    }
1431
1432    fn get_value(&mut self, slot: usize) -> Result<Option<Value>, InternalError> {
1433        let cached = self.cached_values.get(slot).ok_or_else(|| {
1434            InternalError::persisted_row_slot_cache_lookup_out_of_bounds(self.model.path(), slot)
1435        })?;
1436        if let CachedSlotValue::Decoded(value) = cached {
1437            return Ok(Some(value.clone()));
1438        }
1439
1440        let field = self.field_model(slot)?;
1441        let value = decode_slot_value_from_bytes(
1442            self.model,
1443            slot,
1444            self.required_field_bytes(slot, field.name())?,
1445        )?;
1446        self.cached_values[slot] = CachedSlotValue::Decoded(value.clone());
1447
1448        Ok(Some(value))
1449    }
1450}
1451
1452impl CanonicalSlotReader for StructuralSlotReader<'_> {}
1453
1454///
1455/// CachedSlotValue
1456///
1457/// CachedSlotValue tracks whether one slot has already been decoded during the
1458/// current structural row access pass.
1459///
1460
1461enum CachedSlotValue {
1462    Pending,
1463    Decoded(Value),
1464}
1465
1466///
1467/// TESTS
1468///
1469
1470#[cfg(test)]
1471mod tests {
1472    use super::{
1473        FieldSlot, ScalarSlotValueRef, ScalarValueRef, SerializedFieldUpdate,
1474        SerializedPatchWriter, SerializedUpdatePatch, SlotBufferWriter, SlotReader, SlotWriter,
1475        UpdatePatch, apply_serialized_update_patch_to_raw_row, apply_update_patch_to_raw_row,
1476        decode_persisted_custom_many_slot_payload, decode_persisted_custom_slot_payload,
1477        decode_persisted_non_null_slot_payload, decode_persisted_option_slot_payload,
1478        decode_persisted_slot_payload, decode_slot_value_by_contract, decode_slot_value_from_bytes,
1479        encode_persisted_custom_many_slot_payload, encode_persisted_custom_slot_payload,
1480        encode_scalar_slot_value, encode_slot_payload_from_parts, encode_slot_value_from_value,
1481        serialize_entity_slots_as_update_patch, serialize_update_patch_fields,
1482    };
1483    use crate::{
1484        db::{
1485            codec::serialize_row_payload,
1486            data::{RawRow, StructuralSlotReader},
1487        },
1488        error::InternalError,
1489        model::{
1490            EntityModel,
1491            field::{EnumVariantModel, FieldKind, FieldModel, FieldStorageDecode},
1492        },
1493        testing::SIMPLE_ENTITY_TAG,
1494        traits::{EntitySchema, FieldValue},
1495        types::{Account, Principal, Subaccount},
1496        value::{Value, ValueEnum},
1497    };
1498    use icydb_derive::{FieldProjection, PersistedRow};
1499    use serde::{Deserialize, Serialize};
1500
1501    crate::test_canister! {
1502        ident = PersistedRowPatchBridgeCanister,
1503        commit_memory_id = crate::testing::test_commit_memory_id(),
1504    }
1505
1506    crate::test_store! {
1507        ident = PersistedRowPatchBridgeStore,
1508        canister = PersistedRowPatchBridgeCanister,
1509    }
1510
1511    ///
1512    /// PersistedRowPatchBridgeEntity
1513    ///
1514    /// PersistedRowPatchBridgeEntity
1515    ///
1516    /// PersistedRowPatchBridgeEntity is the smallest derive-owned entity used
1517    /// to validate the typed-entity -> serialized-patch bridge.
1518    /// It lets the persisted-row tests exercise the same dense slot writer the
1519    /// save/update path now uses.
1520    ///
1521
1522    #[derive(
1523        Clone, Debug, Default, Deserialize, FieldProjection, PartialEq, PersistedRow, Serialize,
1524    )]
1525    struct PersistedRowPatchBridgeEntity {
1526        id: crate::types::Ulid,
1527        name: String,
1528    }
1529
1530    #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
1531    struct PersistedRowProfileValue {
1532        bio: String,
1533    }
1534
1535    impl FieldValue for PersistedRowProfileValue {
1536        fn kind() -> crate::traits::FieldValueKind {
1537            crate::traits::FieldValueKind::Structured { queryable: false }
1538        }
1539
1540        fn to_value(&self) -> Value {
1541            Value::from_map(vec![(
1542                Value::Text("bio".to_string()),
1543                Value::Text(self.bio.clone()),
1544            )])
1545            .expect("profile test value should encode as canonical map")
1546        }
1547
1548        fn from_value(value: &Value) -> Option<Self> {
1549            let Value::Map(entries) = value else {
1550                return None;
1551            };
1552            let normalized = Value::normalize_map_entries(entries.clone()).ok()?;
1553            let bio = normalized
1554                .iter()
1555                .find_map(|(entry_key, entry_value)| match entry_key {
1556                    Value::Text(entry_key) if entry_key == "bio" => match entry_value {
1557                        Value::Text(bio) => Some(bio.clone()),
1558                        _ => None,
1559                    },
1560                    _ => None,
1561                })?;
1562
1563            if normalized.len() != 1 {
1564                return None;
1565            }
1566
1567            Some(Self { bio })
1568        }
1569    }
1570
1571    crate::test_entity_schema! {
1572        ident = PersistedRowPatchBridgeEntity,
1573        id = crate::types::Ulid,
1574        id_field = id,
1575        entity_name = "PersistedRowPatchBridgeEntity",
1576        entity_tag = SIMPLE_ENTITY_TAG,
1577        pk_index = 0,
1578        fields = [
1579            ("id", FieldKind::Ulid),
1580            ("name", FieldKind::Text),
1581        ],
1582        indexes = [],
1583        store = PersistedRowPatchBridgeStore,
1584        canister = PersistedRowPatchBridgeCanister,
1585    }
1586
1587    static STATE_VARIANTS: &[EnumVariantModel] = &[EnumVariantModel::new(
1588        "Loaded",
1589        Some(&FieldKind::Uint),
1590        FieldStorageDecode::ByKind,
1591    )];
1592    static FIELD_MODELS: [FieldModel; 2] = [
1593        FieldModel::new("name", FieldKind::Text),
1594        FieldModel::new_with_storage_decode("payload", FieldKind::Text, FieldStorageDecode::Value),
1595    ];
1596    static LIST_FIELD_MODELS: [FieldModel; 1] =
1597        [FieldModel::new("tags", FieldKind::List(&FieldKind::Text))];
1598    static MAP_FIELD_MODELS: [FieldModel; 1] = [FieldModel::new(
1599        "props",
1600        FieldKind::Map {
1601            key: &FieldKind::Text,
1602            value: &FieldKind::Uint,
1603        },
1604    )];
1605    static ENUM_FIELD_MODELS: [FieldModel; 1] = [FieldModel::new(
1606        "state",
1607        FieldKind::Enum {
1608            path: "tests::State",
1609            variants: STATE_VARIANTS,
1610        },
1611    )];
1612    static ACCOUNT_FIELD_MODELS: [FieldModel; 1] = [FieldModel::new("owner", FieldKind::Account)];
1613    static REQUIRED_STRUCTURED_FIELD_MODELS: [FieldModel; 1] = [FieldModel::new(
1614        "profile",
1615        FieldKind::Structured { queryable: false },
1616    )];
1617    static OPTIONAL_STRUCTURED_FIELD_MODELS: [FieldModel; 1] =
1618        [FieldModel::new_with_storage_decode_and_nullability(
1619            "profile",
1620            FieldKind::Structured { queryable: false },
1621            FieldStorageDecode::ByKind,
1622            true,
1623        )];
1624    static STRUCTURED_MAP_VALUE_KIND: FieldKind = FieldKind::Structured { queryable: false };
1625    static STRUCTURED_MAP_VALUE_STORAGE_FIELD_MODELS: [FieldModel; 1] =
1626        [FieldModel::new_with_storage_decode(
1627            "projects",
1628            FieldKind::Map {
1629                key: &FieldKind::Principal,
1630                value: &STRUCTURED_MAP_VALUE_KIND,
1631            },
1632            FieldStorageDecode::Value,
1633        )];
1634    static INDEX_MODELS: [&crate::model::index::IndexModel; 0] = [];
1635    static TEST_MODEL: EntityModel = EntityModel::new(
1636        "tests::PersistedRowFieldCodecEntity",
1637        "persisted_row_field_codec_entity",
1638        &FIELD_MODELS[0],
1639        &FIELD_MODELS,
1640        &INDEX_MODELS,
1641    );
1642    static LIST_MODEL: EntityModel = EntityModel::new(
1643        "tests::PersistedRowListFieldCodecEntity",
1644        "persisted_row_list_field_codec_entity",
1645        &LIST_FIELD_MODELS[0],
1646        &LIST_FIELD_MODELS,
1647        &INDEX_MODELS,
1648    );
1649    static MAP_MODEL: EntityModel = EntityModel::new(
1650        "tests::PersistedRowMapFieldCodecEntity",
1651        "persisted_row_map_field_codec_entity",
1652        &MAP_FIELD_MODELS[0],
1653        &MAP_FIELD_MODELS,
1654        &INDEX_MODELS,
1655    );
1656    static ENUM_MODEL: EntityModel = EntityModel::new(
1657        "tests::PersistedRowEnumFieldCodecEntity",
1658        "persisted_row_enum_field_codec_entity",
1659        &ENUM_FIELD_MODELS[0],
1660        &ENUM_FIELD_MODELS,
1661        &INDEX_MODELS,
1662    );
1663    static ACCOUNT_MODEL: EntityModel = EntityModel::new(
1664        "tests::PersistedRowAccountFieldCodecEntity",
1665        "persisted_row_account_field_codec_entity",
1666        &ACCOUNT_FIELD_MODELS[0],
1667        &ACCOUNT_FIELD_MODELS,
1668        &INDEX_MODELS,
1669    );
1670    static REQUIRED_STRUCTURED_MODEL: EntityModel = EntityModel::new(
1671        "tests::PersistedRowRequiredStructuredFieldCodecEntity",
1672        "persisted_row_required_structured_field_codec_entity",
1673        &REQUIRED_STRUCTURED_FIELD_MODELS[0],
1674        &REQUIRED_STRUCTURED_FIELD_MODELS,
1675        &INDEX_MODELS,
1676    );
1677    static OPTIONAL_STRUCTURED_MODEL: EntityModel = EntityModel::new(
1678        "tests::PersistedRowOptionalStructuredFieldCodecEntity",
1679        "persisted_row_optional_structured_field_codec_entity",
1680        &OPTIONAL_STRUCTURED_FIELD_MODELS[0],
1681        &OPTIONAL_STRUCTURED_FIELD_MODELS,
1682        &INDEX_MODELS,
1683    );
1684    static STRUCTURED_MAP_VALUE_STORAGE_MODEL: EntityModel = EntityModel::new(
1685        "tests::PersistedRowStructuredMapValueStorageEntity",
1686        "persisted_row_structured_map_value_storage_entity",
1687        &STRUCTURED_MAP_VALUE_STORAGE_FIELD_MODELS[0],
1688        &STRUCTURED_MAP_VALUE_STORAGE_FIELD_MODELS,
1689        &INDEX_MODELS,
1690    );
1691
1692    fn encode_slot_payload_allowing_missing_for_tests(
1693        model: &'static EntityModel,
1694        slots: &[Option<&[u8]>],
1695    ) -> Result<Vec<u8>, InternalError> {
1696        if slots.len() != model.fields().len() {
1697            return Err(InternalError::persisted_row_encode_failed(format!(
1698                "noncanonical slot payload test helper expected {} slots for entity '{}', found {}",
1699                model.fields().len(),
1700                model.path(),
1701                slots.len()
1702            )));
1703        }
1704        let mut payload_bytes = Vec::new();
1705        let mut slot_table = Vec::with_capacity(slots.len());
1706
1707        for slot_payload in slots {
1708            match slot_payload {
1709                Some(bytes) => {
1710                    let start = u32::try_from(payload_bytes.len()).map_err(|_| {
1711                        InternalError::persisted_row_encode_failed(
1712                            "slot payload start exceeds u32 range",
1713                        )
1714                    })?;
1715                    let len = u32::try_from(bytes.len()).map_err(|_| {
1716                        InternalError::persisted_row_encode_failed(
1717                            "slot payload length exceeds u32 range",
1718                        )
1719                    })?;
1720                    payload_bytes.extend_from_slice(bytes);
1721                    slot_table.push((start, len));
1722                }
1723                None => slot_table.push((0_u32, 0_u32)),
1724            }
1725        }
1726
1727        encode_slot_payload_from_parts(slots.len(), slot_table.as_slice(), payload_bytes.as_slice())
1728    }
1729
1730    #[test]
1731    fn decode_slot_value_from_bytes_decodes_scalar_slots_through_one_owner() {
1732        let payload =
1733            encode_scalar_slot_value(ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")));
1734        let value =
1735            decode_slot_value_from_bytes(&TEST_MODEL, 0, payload.as_slice()).expect("decode slot");
1736
1737        assert_eq!(value, Value::Text("Ada".to_string()));
1738    }
1739
1740    #[test]
1741    fn decode_slot_value_from_bytes_reports_scalar_prefix_bytes() {
1742        let err = decode_slot_value_from_bytes(&TEST_MODEL, 0, &[0x00, 1])
1743            .expect_err("invalid scalar slot prefix should fail closed");
1744
1745        assert!(
1746            err.message
1747                .contains("expected slot envelope prefix byte 0xFF, found 0x00"),
1748            "unexpected error: {err:?}"
1749        );
1750    }
1751
1752    #[test]
1753    fn decode_slot_value_from_bytes_respects_value_storage_decode_contract() {
1754        let payload = crate::serialize::serialize(&Value::Text("Ada".to_string()))
1755            .expect("encode value-storage payload");
1756
1757        let value =
1758            decode_slot_value_from_bytes(&TEST_MODEL, 1, payload.as_slice()).expect("decode slot");
1759
1760        assert_eq!(value, Value::Text("Ada".to_string()));
1761    }
1762
1763    #[test]
1764    fn encode_slot_value_from_value_roundtrips_scalar_slots() {
1765        let payload = encode_slot_value_from_value(&TEST_MODEL, 0, &Value::Text("Ada".to_string()))
1766            .expect("encode slot");
1767        let decoded =
1768            decode_slot_value_from_bytes(&TEST_MODEL, 0, payload.as_slice()).expect("decode slot");
1769
1770        assert_eq!(decoded, Value::Text("Ada".to_string()));
1771    }
1772
1773    #[test]
1774    fn encode_slot_value_from_value_roundtrips_value_storage_slots() {
1775        let payload = encode_slot_value_from_value(&TEST_MODEL, 1, &Value::Text("Ada".to_string()))
1776            .expect("encode slot");
1777        let decoded =
1778            decode_slot_value_from_bytes(&TEST_MODEL, 1, payload.as_slice()).expect("decode slot");
1779
1780        assert_eq!(decoded, Value::Text("Ada".to_string()));
1781    }
1782
1783    #[test]
1784    fn encode_slot_value_from_value_roundtrips_list_by_kind_slots() {
1785        let payload = encode_slot_value_from_value(
1786            &LIST_MODEL,
1787            0,
1788            &Value::List(vec![Value::Text("alpha".to_string())]),
1789        )
1790        .expect("encode list slot");
1791        let decoded =
1792            decode_slot_value_from_bytes(&LIST_MODEL, 0, payload.as_slice()).expect("decode slot");
1793
1794        assert_eq!(decoded, Value::List(vec![Value::Text("alpha".to_string())]),);
1795    }
1796
1797    #[test]
1798    fn encode_slot_value_from_value_roundtrips_map_by_kind_slots() {
1799        let payload = encode_slot_value_from_value(
1800            &MAP_MODEL,
1801            0,
1802            &Value::Map(vec![(Value::Text("alpha".to_string()), Value::Uint(7))]),
1803        )
1804        .expect("encode map slot");
1805        let decoded =
1806            decode_slot_value_from_bytes(&MAP_MODEL, 0, payload.as_slice()).expect("decode slot");
1807
1808        assert_eq!(
1809            decoded,
1810            Value::Map(vec![(Value::Text("alpha".to_string()), Value::Uint(7))]),
1811        );
1812    }
1813
1814    #[test]
1815    fn encode_slot_value_from_value_accepts_value_storage_maps_with_structured_values() {
1816        let principal = Principal::dummy(7);
1817        let project = Value::from_map(vec![
1818            (Value::Text("pid".to_string()), Value::Principal(principal)),
1819            (
1820                Value::Text("status".to_string()),
1821                Value::Enum(ValueEnum::new(
1822                    "Saved",
1823                    Some("design::app::user::customise::project::ProjectStatus"),
1824                )),
1825            ),
1826        ])
1827        .expect("project value should normalize into a canonical map");
1828        let projects = Value::from_map(vec![(Value::Principal(principal), project)])
1829            .expect("outer map should normalize into a canonical map");
1830
1831        let payload =
1832            encode_slot_value_from_value(&STRUCTURED_MAP_VALUE_STORAGE_MODEL, 0, &projects)
1833                .expect("encode structured map slot");
1834        let decoded = decode_slot_value_from_bytes(
1835            &STRUCTURED_MAP_VALUE_STORAGE_MODEL,
1836            0,
1837            payload.as_slice(),
1838        )
1839        .expect("decode structured map slot");
1840
1841        assert_eq!(decoded, projects);
1842    }
1843
1844    #[test]
1845    fn encode_slot_value_from_value_roundtrips_enum_by_kind_slots() {
1846        let payload = encode_slot_value_from_value(
1847            &ENUM_MODEL,
1848            0,
1849            &Value::Enum(
1850                ValueEnum::new("Loaded", Some("tests::State")).with_payload(Value::Uint(7)),
1851            ),
1852        )
1853        .expect("encode enum slot");
1854        let decoded =
1855            decode_slot_value_from_bytes(&ENUM_MODEL, 0, payload.as_slice()).expect("decode slot");
1856
1857        assert_eq!(
1858            decoded,
1859            Value::Enum(
1860                ValueEnum::new("Loaded", Some("tests::State")).with_payload(Value::Uint(7,))
1861            ),
1862        );
1863    }
1864
1865    #[test]
1866    fn encode_slot_value_from_value_roundtrips_leaf_by_kind_wrapper_slots() {
1867        let account = Account::from_parts(Principal::dummy(7), Some(Subaccount::from([7_u8; 32])));
1868        let payload = encode_slot_value_from_value(&ACCOUNT_MODEL, 0, &Value::Account(account))
1869            .expect("encode account slot");
1870        let decoded = decode_slot_value_from_bytes(&ACCOUNT_MODEL, 0, payload.as_slice())
1871            .expect("decode slot");
1872
1873        assert_eq!(decoded, Value::Account(account));
1874    }
1875
1876    #[test]
1877    fn custom_slot_payload_roundtrips_structured_field_value() {
1878        let profile = PersistedRowProfileValue {
1879            bio: "Ada".to_string(),
1880        };
1881        let payload = encode_persisted_custom_slot_payload(&profile, "profile")
1882            .expect("encode custom structured payload");
1883        let decoded = decode_persisted_custom_slot_payload::<PersistedRowProfileValue>(
1884            payload.as_slice(),
1885            "profile",
1886        )
1887        .expect("decode custom structured payload");
1888
1889        assert_eq!(decoded, profile);
1890        assert_eq!(
1891            decode_persisted_slot_payload::<Value>(payload.as_slice(), "profile")
1892                .expect("decode raw value payload"),
1893            profile.to_value(),
1894        );
1895    }
1896
1897    #[test]
1898    fn custom_many_slot_payload_roundtrips_structured_value_lists() {
1899        let profiles = vec![
1900            PersistedRowProfileValue {
1901                bio: "Ada".to_string(),
1902            },
1903            PersistedRowProfileValue {
1904                bio: "Grace".to_string(),
1905            },
1906        ];
1907        let payload = encode_persisted_custom_many_slot_payload(profiles.as_slice(), "profiles")
1908            .expect("encode custom structured list payload");
1909        let decoded = decode_persisted_custom_many_slot_payload::<PersistedRowProfileValue>(
1910            payload.as_slice(),
1911            "profiles",
1912        )
1913        .expect("decode custom structured list payload");
1914
1915        assert_eq!(decoded, profiles);
1916    }
1917
1918    #[test]
1919    fn decode_persisted_non_null_slot_payload_rejects_null_for_required_structured_fields() {
1920        let err =
1921            decode_persisted_non_null_slot_payload::<PersistedRowProfileValue>(&[0xF6], "profile")
1922                .expect_err("required structured payload must reject null");
1923
1924        assert!(
1925            err.message
1926                .contains("unexpected null for non-nullable field"),
1927            "unexpected error: {err:?}"
1928        );
1929    }
1930
1931    #[test]
1932    fn decode_persisted_option_slot_payload_treats_cbor_null_as_none() {
1933        let decoded =
1934            decode_persisted_option_slot_payload::<PersistedRowProfileValue>(&[0xF6], "profile")
1935                .expect("optional structured payload should decode");
1936
1937        assert_eq!(decoded, None);
1938    }
1939
1940    #[test]
1941    fn encode_slot_value_from_value_rejects_null_for_required_structured_slots() {
1942        let err = encode_slot_value_from_value(&REQUIRED_STRUCTURED_MODEL, 0, &Value::Null)
1943            .expect_err("required structured slot must reject null");
1944
1945        assert!(
1946            err.message.contains("required field cannot store null"),
1947            "unexpected error: {err:?}"
1948        );
1949    }
1950
1951    #[test]
1952    fn encode_slot_value_from_value_allows_null_for_optional_structured_slots() {
1953        let payload = encode_slot_value_from_value(&OPTIONAL_STRUCTURED_MODEL, 0, &Value::Null)
1954            .expect("optional structured slot should allow null");
1955        let decoded =
1956            decode_slot_value_from_bytes(&OPTIONAL_STRUCTURED_MODEL, 0, payload.as_slice())
1957                .expect("optional structured slot should decode");
1958
1959        assert_eq!(decoded, Value::Null);
1960    }
1961
1962    #[test]
1963    fn encode_slot_value_from_value_rejects_unknown_enum_payload_variants() {
1964        let err = encode_slot_value_from_value(
1965            &ENUM_MODEL,
1966            0,
1967            &Value::Enum(
1968                ValueEnum::new("Unknown", Some("tests::State")).with_payload(Value::Uint(7)),
1969            ),
1970        )
1971        .expect_err("unknown enum payload should fail closed");
1972
1973        assert!(err.message.contains("unknown enum variant"));
1974    }
1975
1976    #[test]
1977    fn structural_slot_reader_and_direct_decode_share_the_same_field_codec_boundary() {
1978        let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
1979        let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
1980            .expect("encode value-storage payload");
1981        writer
1982            .write_scalar(0, ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")))
1983            .expect("write scalar slot");
1984        writer
1985            .write_slot(1, Some(payload.as_slice()))
1986            .expect("write value-storage slot");
1987        let raw_row = RawRow::try_new(
1988            serialize_row_payload(writer.finish().expect("finish slot payload"))
1989                .expect("serialize row payload"),
1990        )
1991        .expect("build raw row");
1992
1993        let direct_slots =
1994            StructuralSlotReader::from_raw_row(&raw_row, &TEST_MODEL).expect("decode row");
1995        let mut cached_slots =
1996            StructuralSlotReader::from_raw_row(&raw_row, &TEST_MODEL).expect("decode row");
1997
1998        let direct_name = decode_slot_value_by_contract(&direct_slots, 0).expect("decode name");
1999        let direct_payload =
2000            decode_slot_value_by_contract(&direct_slots, 1).expect("decode payload");
2001        let cached_name = cached_slots.get_value(0).expect("cached name");
2002        let cached_payload = cached_slots.get_value(1).expect("cached payload");
2003
2004        assert_eq!(direct_name, cached_name);
2005        assert_eq!(direct_payload, cached_payload);
2006    }
2007
2008    #[test]
2009    fn apply_update_patch_to_raw_row_updates_only_targeted_slots() {
2010        let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
2011        let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
2012            .expect("encode value-storage payload");
2013        writer
2014            .write_scalar(0, ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")))
2015            .expect("write scalar slot");
2016        writer
2017            .write_slot(1, Some(payload.as_slice()))
2018            .expect("write value-storage slot");
2019        let raw_row = RawRow::try_new(
2020            serialize_row_payload(writer.finish().expect("finish slot payload"))
2021                .expect("serialize row payload"),
2022        )
2023        .expect("build raw row");
2024        let patch = UpdatePatch::new().set(
2025            FieldSlot::from_index(&TEST_MODEL, 0).expect("resolve slot"),
2026            Value::Text("Grace".to_string()),
2027        );
2028
2029        let patched =
2030            apply_update_patch_to_raw_row(&TEST_MODEL, &raw_row, &patch).expect("apply patch");
2031        let mut reader =
2032            StructuralSlotReader::from_raw_row(&patched, &TEST_MODEL).expect("decode row");
2033
2034        assert_eq!(
2035            reader.get_value(0).expect("decode slot"),
2036            Some(Value::Text("Grace".to_string()))
2037        );
2038        assert_eq!(
2039            reader.get_value(1).expect("decode slot"),
2040            Some(Value::Text("payload".to_string()))
2041        );
2042    }
2043
2044    #[test]
2045    fn serialize_update_patch_fields_encodes_canonical_slot_payloads() {
2046        let patch = UpdatePatch::new()
2047            .set(
2048                FieldSlot::from_index(&TEST_MODEL, 0).expect("resolve slot"),
2049                Value::Text("Grace".to_string()),
2050            )
2051            .set(
2052                FieldSlot::from_index(&TEST_MODEL, 1).expect("resolve slot"),
2053                Value::Text("payload".to_string()),
2054            );
2055
2056        let serialized =
2057            serialize_update_patch_fields(&TEST_MODEL, &patch).expect("serialize patch");
2058
2059        assert_eq!(serialized.entries().len(), 2);
2060        assert_eq!(
2061            decode_slot_value_from_bytes(
2062                &TEST_MODEL,
2063                serialized.entries()[0].slot().index(),
2064                serialized.entries()[0].payload(),
2065            )
2066            .expect("decode slot payload"),
2067            Value::Text("Grace".to_string())
2068        );
2069        assert_eq!(
2070            decode_slot_value_from_bytes(
2071                &TEST_MODEL,
2072                serialized.entries()[1].slot().index(),
2073                serialized.entries()[1].payload(),
2074            )
2075            .expect("decode slot payload"),
2076            Value::Text("payload".to_string())
2077        );
2078    }
2079
2080    #[test]
2081    fn serialized_patch_writer_rejects_clear_slots() {
2082        let mut writer = SerializedPatchWriter::for_model(&TEST_MODEL);
2083
2084        let err = writer
2085            .write_slot(0, None)
2086            .expect_err("0.65 patch staging must reject missing-slot clears");
2087
2088        assert!(
2089            err.message
2090                .contains("serialized patch writer cannot clear slot 0"),
2091            "unexpected error: {err:?}"
2092        );
2093        assert!(
2094            err.message.contains(TEST_MODEL.path()),
2095            "unexpected error: {err:?}"
2096        );
2097    }
2098
2099    #[test]
2100    fn slot_buffer_writer_rejects_clear_slots() {
2101        let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
2102
2103        let err = writer
2104            .write_slot(0, None)
2105            .expect_err("canonical row staging must reject missing-slot clears");
2106
2107        assert!(
2108            err.message
2109                .contains("slot buffer writer cannot clear slot 0"),
2110            "unexpected error: {err:?}"
2111        );
2112        assert!(
2113            err.message.contains(TEST_MODEL.path()),
2114            "unexpected error: {err:?}"
2115        );
2116    }
2117
2118    #[test]
2119    fn apply_update_patch_to_raw_row_uses_last_write_wins() {
2120        let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
2121        let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
2122            .expect("encode value-storage payload");
2123        writer
2124            .write_scalar(0, ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")))
2125            .expect("write scalar slot");
2126        writer
2127            .write_slot(1, Some(payload.as_slice()))
2128            .expect("write value-storage slot");
2129        let raw_row = RawRow::try_new(
2130            serialize_row_payload(writer.finish().expect("finish slot payload"))
2131                .expect("serialize row payload"),
2132        )
2133        .expect("build raw row");
2134        let slot = FieldSlot::from_index(&TEST_MODEL, 0).expect("resolve slot");
2135        let patch = UpdatePatch::new()
2136            .set(slot, Value::Text("Grace".to_string()))
2137            .set(slot, Value::Text("Lin".to_string()));
2138
2139        let patched =
2140            apply_update_patch_to_raw_row(&TEST_MODEL, &raw_row, &patch).expect("apply patch");
2141        let mut reader =
2142            StructuralSlotReader::from_raw_row(&patched, &TEST_MODEL).expect("decode row");
2143
2144        assert_eq!(
2145            reader.get_value(0).expect("decode slot"),
2146            Some(Value::Text("Lin".to_string()))
2147        );
2148    }
2149
2150    #[test]
2151    fn apply_update_patch_to_raw_row_rejects_noncanonical_missing_slot_baseline() {
2152        let empty_slots = vec![None::<&[u8]>; TEST_MODEL.fields().len()];
2153        let raw_row = RawRow::try_new(
2154            serialize_row_payload(
2155                encode_slot_payload_allowing_missing_for_tests(&TEST_MODEL, empty_slots.as_slice())
2156                    .expect("encode malformed slot payload"),
2157            )
2158            .expect("serialize row payload"),
2159        )
2160        .expect("build raw row");
2161        let patch = UpdatePatch::new().set(
2162            FieldSlot::from_index(&TEST_MODEL, 1).expect("resolve slot"),
2163            Value::Text("payload".to_string()),
2164        );
2165
2166        let err = apply_update_patch_to_raw_row(&TEST_MODEL, &raw_row, &patch)
2167            .expect_err("noncanonical rows with missing slots must fail closed");
2168
2169        assert_eq!(err.message, "row decode: missing slot payload: slot=0");
2170    }
2171
2172    #[test]
2173    fn apply_serialized_update_patch_to_raw_row_rejects_noncanonical_scalar_baseline() {
2174        let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
2175            .expect("encode value-storage payload");
2176        let malformed_slots = [Some([0xF6].as_slice()), Some(payload.as_slice())];
2177        let raw_row = RawRow::try_new(
2178            serialize_row_payload(
2179                encode_slot_payload_allowing_missing_for_tests(&TEST_MODEL, &malformed_slots)
2180                    .expect("encode malformed slot payload"),
2181            )
2182            .expect("serialize row payload"),
2183        )
2184        .expect("build raw row");
2185        let patch = UpdatePatch::new().set(
2186            FieldSlot::from_index(&TEST_MODEL, 1).expect("resolve slot"),
2187            Value::Text("patched".to_string()),
2188        );
2189        let serialized =
2190            serialize_update_patch_fields(&TEST_MODEL, &patch).expect("serialize patch");
2191
2192        let err = apply_serialized_update_patch_to_raw_row(&TEST_MODEL, &raw_row, &serialized)
2193            .expect_err("noncanonical scalar baseline must fail closed");
2194
2195        assert!(
2196            err.message.contains("field 'name'"),
2197            "unexpected error: {err:?}"
2198        );
2199        assert!(
2200            err.message
2201                .contains("expected slot envelope prefix byte 0xFF, found 0xF6"),
2202            "unexpected error: {err:?}"
2203        );
2204    }
2205
2206    #[test]
2207    fn apply_serialized_update_patch_to_raw_row_rejects_noncanonical_scalar_patch_payload() {
2208        let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
2209        let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
2210            .expect("encode value-storage payload");
2211        writer
2212            .write_scalar(0, ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")))
2213            .expect("write scalar slot");
2214        writer
2215            .write_slot(1, Some(payload.as_slice()))
2216            .expect("write value-storage slot");
2217        let raw_row = RawRow::try_new(
2218            serialize_row_payload(writer.finish().expect("finish slot payload"))
2219                .expect("serialize row payload"),
2220        )
2221        .expect("build raw row");
2222        let serialized = SerializedUpdatePatch::new(vec![SerializedFieldUpdate::new(
2223            FieldSlot::from_index(&TEST_MODEL, 0).expect("resolve slot"),
2224            vec![0xF6],
2225        )]);
2226
2227        let err = apply_serialized_update_patch_to_raw_row(&TEST_MODEL, &raw_row, &serialized)
2228            .expect_err("noncanonical serialized patch payload must fail closed");
2229
2230        assert!(
2231            err.message.contains("field 'name'"),
2232            "unexpected error: {err:?}"
2233        );
2234        assert!(
2235            err.message
2236                .contains("expected slot envelope prefix byte 0xFF, found 0xF6"),
2237            "unexpected error: {err:?}"
2238        );
2239    }
2240
2241    #[test]
2242    fn structural_slot_reader_rejects_slot_count_mismatch() {
2243        let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
2244        let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
2245            .expect("encode payload");
2246        writer
2247            .write_scalar(0, ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")))
2248            .expect("write scalar slot");
2249        writer
2250            .write_slot(1, Some(payload.as_slice()))
2251            .expect("write payload slot");
2252        let mut payload = writer.finish().expect("finish slot payload");
2253        payload[..2].copy_from_slice(&1_u16.to_be_bytes());
2254        let raw_row =
2255            RawRow::try_new(serialize_row_payload(payload).expect("serialize row payload"))
2256                .expect("build raw row");
2257
2258        let err = StructuralSlotReader::from_raw_row(&raw_row, &TEST_MODEL)
2259            .err()
2260            .expect("slot-count drift must fail closed");
2261
2262        assert_eq!(
2263            err.message,
2264            "row decode: slot count mismatch: expected 2, found 1"
2265        );
2266    }
2267
2268    #[test]
2269    fn structural_slot_reader_rejects_slot_span_exceeds_payload_length() {
2270        let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
2271        let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
2272            .expect("encode payload");
2273        writer
2274            .write_scalar(0, ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")))
2275            .expect("write scalar slot");
2276        writer
2277            .write_slot(1, Some(payload.as_slice()))
2278            .expect("write payload slot");
2279        let mut payload = writer.finish().expect("finish slot payload");
2280
2281        // Corrupt the second slot span so the payload table points past the
2282        // available data section.
2283        payload[14..18].copy_from_slice(&u32::MAX.to_be_bytes());
2284        let raw_row =
2285            RawRow::try_new(serialize_row_payload(payload).expect("serialize row payload"))
2286                .expect("build raw row");
2287
2288        let err = StructuralSlotReader::from_raw_row(&raw_row, &TEST_MODEL)
2289            .err()
2290            .expect("slot span drift must fail closed");
2291
2292        assert_eq!(err.message, "row decode: slot span exceeds payload length");
2293    }
2294
2295    #[test]
2296    fn apply_serialized_update_patch_to_raw_row_replays_preencoded_slots() {
2297        let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
2298        let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
2299            .expect("encode value-storage payload");
2300        writer
2301            .write_scalar(0, ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")))
2302            .expect("write scalar slot");
2303        writer
2304            .write_slot(1, Some(payload.as_slice()))
2305            .expect("write value-storage slot");
2306        let raw_row = RawRow::try_new(
2307            serialize_row_payload(writer.finish().expect("finish slot payload"))
2308                .expect("serialize row payload"),
2309        )
2310        .expect("build raw row");
2311        let patch = UpdatePatch::new().set(
2312            FieldSlot::from_index(&TEST_MODEL, 0).expect("resolve slot"),
2313            Value::Text("Grace".to_string()),
2314        );
2315        let serialized =
2316            serialize_update_patch_fields(&TEST_MODEL, &patch).expect("serialize patch");
2317
2318        let patched = raw_row
2319            .apply_serialized_update_patch(&TEST_MODEL, &serialized)
2320            .expect("apply serialized patch");
2321        let mut reader =
2322            StructuralSlotReader::from_raw_row(&patched, &TEST_MODEL).expect("decode row");
2323
2324        assert_eq!(
2325            reader.get_value(0).expect("decode slot"),
2326            Some(Value::Text("Grace".to_string()))
2327        );
2328    }
2329
2330    #[test]
2331    fn serialize_entity_slots_as_update_patch_replays_full_typed_after_image() {
2332        let old_entity = PersistedRowPatchBridgeEntity {
2333            id: crate::types::Ulid::from_u128(7),
2334            name: "Ada".to_string(),
2335        };
2336        let new_entity = PersistedRowPatchBridgeEntity {
2337            id: crate::types::Ulid::from_u128(7),
2338            name: "Grace".to_string(),
2339        };
2340        let raw_row = RawRow::from_entity(&old_entity).expect("encode old row");
2341        let old_decoded = raw_row
2342            .try_decode::<PersistedRowPatchBridgeEntity>()
2343            .expect("decode old entity");
2344        let serialized =
2345            serialize_entity_slots_as_update_patch(&new_entity).expect("serialize entity patch");
2346        let direct =
2347            RawRow::from_serialized_update_patch(PersistedRowPatchBridgeEntity::MODEL, &serialized)
2348                .expect("direct row emission should succeed");
2349
2350        let patched = raw_row
2351            .apply_serialized_update_patch(PersistedRowPatchBridgeEntity::MODEL, &serialized)
2352            .expect("apply serialized patch");
2353        let decoded = patched
2354            .try_decode::<PersistedRowPatchBridgeEntity>()
2355            .expect("decode patched entity");
2356
2357        assert_eq!(
2358            direct, patched,
2359            "fresh row emission and replayed full-image patch must converge on identical bytes",
2360        );
2361        assert_eq!(old_decoded, old_entity);
2362        assert_eq!(decoded, new_entity);
2363    }
2364
2365    #[test]
2366    fn canonical_row_from_raw_row_replays_canonical_full_image_bytes() {
2367        let entity = PersistedRowPatchBridgeEntity {
2368            id: crate::types::Ulid::from_u128(11),
2369            name: "Ada".to_string(),
2370        };
2371        let raw_row = RawRow::from_entity(&entity).expect("encode canonical row");
2372        let canonical = crate::db::data::canonical_row_from_raw_row(
2373            PersistedRowPatchBridgeEntity::MODEL,
2374            &raw_row,
2375        )
2376        .expect("canonical re-emission should succeed");
2377
2378        assert_eq!(
2379            canonical.as_bytes(),
2380            raw_row.as_bytes(),
2381            "canonical raw-row rebuild must preserve already canonical row bytes",
2382        );
2383    }
2384
2385    #[test]
2386    fn canonical_row_from_raw_row_rejects_noncanonical_scalar_payload() {
2387        let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
2388            .expect("encode value-storage payload");
2389        let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
2390        writer
2391            .write_slot(0, Some(&[0xF6]))
2392            .expect("write malformed scalar slot");
2393        writer
2394            .write_slot(1, Some(payload.as_slice()))
2395            .expect("write value-storage slot");
2396        let raw_row = RawRow::try_new(
2397            serialize_row_payload(writer.finish().expect("finish slot payload"))
2398                .expect("serialize malformed row"),
2399        )
2400        .expect("build malformed raw row");
2401
2402        let err = crate::db::data::canonical_row_from_raw_row(&TEST_MODEL, &raw_row)
2403            .expect_err("canonical raw-row rebuild must reject malformed scalar payloads");
2404
2405        assert!(
2406            err.message.contains("field 'name'"),
2407            "unexpected error: {err:?}"
2408        );
2409        assert!(
2410            err.message
2411                .contains("expected slot envelope prefix byte 0xFF, found 0xF6"),
2412            "unexpected error: {err:?}"
2413        );
2414    }
2415
2416    #[test]
2417    fn raw_row_from_serialized_update_patch_rejects_noncanonical_scalar_payload() {
2418        let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
2419            .expect("encode value-storage payload");
2420        let serialized = SerializedUpdatePatch::new(vec![
2421            SerializedFieldUpdate::new(
2422                FieldSlot::from_index(&TEST_MODEL, 0).expect("resolve slot"),
2423                vec![0xF6],
2424            ),
2425            SerializedFieldUpdate::new(
2426                FieldSlot::from_index(&TEST_MODEL, 1).expect("resolve slot"),
2427                payload,
2428            ),
2429        ]);
2430
2431        let err = RawRow::from_serialized_update_patch(&TEST_MODEL, &serialized)
2432            .expect_err("fresh row emission must reject noncanonical serialized patch payloads");
2433
2434        assert!(
2435            err.message.contains("field 'name'"),
2436            "unexpected error: {err:?}"
2437        );
2438        assert!(
2439            err.message
2440                .contains("expected slot envelope prefix byte 0xFF, found 0xF6"),
2441            "unexpected error: {err:?}"
2442        );
2443    }
2444
2445    #[test]
2446    fn raw_row_from_serialized_update_patch_rejects_incomplete_slot_image() {
2447        let serialized = SerializedUpdatePatch::new(vec![SerializedFieldUpdate::new(
2448            FieldSlot::from_index(&TEST_MODEL, 1).expect("resolve slot"),
2449            crate::serialize::serialize(&Value::Text("payload".to_string()))
2450                .expect("encode value-storage payload"),
2451        )]);
2452
2453        let err = RawRow::from_serialized_update_patch(&TEST_MODEL, &serialized)
2454            .expect_err("fresh row emission must reject missing declared slots");
2455
2456        assert!(
2457            err.message.contains("serialized patch did not emit slot 0"),
2458            "unexpected error: {err:?}"
2459        );
2460    }
2461}