Skip to main content

icydb_core/db/data/
persisted_row.rs

1//! Module: data::persisted_row
2//! Responsibility: slot-oriented persisted-row seams over runtime row bytes.
3//! Does not own: row envelope versions, typed entity materialization, or query semantics.
4//! Boundary: commit/index planning, row writes, and typed materialization all
5//! consume the canonical slot-oriented persisted-row boundary here.
6
7use crate::{
8    db::{
9        codec::serialize_row_payload,
10        data::{
11            CanonicalRow, DataKey, RawRow, StructuralRowDecodeError, StructuralRowFieldBytes,
12            decode_storage_key_field_bytes, decode_structural_field_by_kind_bytes,
13            decode_structural_value_storage_bytes,
14        },
15        scalar_expr::compile_scalar_literal_expr_value,
16        schema::{field_type_from_model_kind, literal_matches_type},
17    },
18    error::InternalError,
19    model::{
20        entity::{EntityModel, resolve_field_slot, resolve_primary_key_slot},
21        field::{FieldKind, FieldModel, FieldStorageDecode, LeafCodec, ScalarCodec},
22    },
23    serialize::{deserialize, serialize},
24    traits::EntityKind,
25    types::{Blob, Date, Duration, Float32, Float64, Principal, Subaccount, Timestamp, Ulid},
26    value::{StorageKey, Value, ValueEnum},
27};
28use serde_cbor::{Value as CborValue, value::to_value as to_cbor_value};
29use std::{cmp::Ordering, collections::BTreeMap, str};
30
31const SCALAR_SLOT_PREFIX: u8 = 0xFF;
32const SCALAR_SLOT_TAG_NULL: u8 = 0;
33const SCALAR_SLOT_TAG_VALUE: u8 = 1;
34
35const SCALAR_BOOL_PAYLOAD_LEN: usize = 1;
36const SCALAR_WORD32_PAYLOAD_LEN: usize = 4;
37const SCALAR_WORD64_PAYLOAD_LEN: usize = 8;
38const SCALAR_ULID_PAYLOAD_LEN: usize = 16;
39const SCALAR_SUBACCOUNT_PAYLOAD_LEN: usize = 32;
40
41const SCALAR_BOOL_FALSE_TAG: u8 = 0;
42const SCALAR_BOOL_TRUE_TAG: u8 = 1;
43
44///
45/// FieldSlot
46///
47/// FieldSlot
48///
49/// FieldSlot is the structural stable slot reference used by the `0.64`
50/// patching path.
51/// It intentionally carries only the model-local slot index so field-level
52/// mutation stays structural instead of reintroducing typed entity helpers.
53///
54
55#[allow(dead_code)]
56#[derive(Clone, Copy, Debug, Eq, PartialEq)]
57pub(in crate::db) struct FieldSlot {
58    index: usize,
59}
60
61#[allow(dead_code)]
62impl FieldSlot {
63    /// Resolve one stable field slot by runtime field name.
64    #[must_use]
65    pub(in crate::db) fn resolve(model: &'static EntityModel, field_name: &str) -> Option<Self> {
66        resolve_field_slot(model, field_name).map(|index| Self { index })
67    }
68
69    /// Build one stable field slot from an already validated index.
70    pub(in crate::db) fn from_index(
71        model: &'static EntityModel,
72        index: usize,
73    ) -> Result<Self, InternalError> {
74        field_model_for_slot(model, index)?;
75
76        Ok(Self { index })
77    }
78
79    /// Return the stable slot index inside `EntityModel::fields`.
80    #[must_use]
81    pub(in crate::db) const fn index(self) -> usize {
82        self.index
83    }
84}
85
86///
87/// FieldUpdate
88///
89/// FieldUpdate
90///
91/// FieldUpdate carries one ordered field-level mutation over the structural
92/// persisted-row boundary.
93/// `UpdatePatch` applies these entries in order and last write wins for the
94/// same slot.
95///
96
97#[allow(dead_code)]
98#[derive(Clone, Debug, Eq, PartialEq)]
99pub(in crate::db) struct FieldUpdate {
100    slot: FieldSlot,
101    value: Value,
102}
103
104#[allow(dead_code)]
105impl FieldUpdate {
106    /// Build one field-level structural update.
107    #[must_use]
108    pub(in crate::db) const fn new(slot: FieldSlot, value: Value) -> Self {
109        Self { slot, value }
110    }
111
112    /// Return the stable target slot.
113    #[must_use]
114    pub(in crate::db) const fn slot(&self) -> FieldSlot {
115        self.slot
116    }
117
118    /// Return the runtime value payload for this update.
119    #[must_use]
120    pub(in crate::db) const fn value(&self) -> &Value {
121        &self.value
122    }
123}
124
125///
126/// UpdatePatch
127///
128/// UpdatePatch
129///
130/// UpdatePatch is the ordered structural mutation program applied to one
131/// persisted row.
132/// This is the phase-1 `0.64` patch container: it updates slot values
133/// structurally and then re-encodes the full row.
134///
135
136#[derive(Clone, Debug, Default, Eq, PartialEq)]
137pub struct UpdatePatch {
138    entries: Vec<FieldUpdate>,
139}
140
141impl UpdatePatch {
142    /// Build one empty patch.
143    #[must_use]
144    pub const fn new() -> Self {
145        Self {
146            entries: Vec::new(),
147        }
148    }
149
150    /// Append one structural field update in declaration order.
151    #[must_use]
152    pub(in crate::db) fn set(mut self, slot: FieldSlot, value: Value) -> Self {
153        self.entries.push(FieldUpdate::new(slot, value));
154        self
155    }
156
157    /// Resolve one field name and append its structural update.
158    pub fn set_field(
159        self,
160        model: &'static EntityModel,
161        field_name: &str,
162        value: Value,
163    ) -> Result<Self, InternalError> {
164        let Some(slot) = FieldSlot::resolve(model, field_name) else {
165            return Err(InternalError::mutation_structural_field_unknown(
166                model.path(),
167                field_name,
168            ));
169        };
170
171        Ok(self.set(slot, value))
172    }
173
174    /// Borrow the ordered field updates carried by this patch.
175    #[must_use]
176    pub(in crate::db) const fn entries(&self) -> &[FieldUpdate] {
177        self.entries.as_slice()
178    }
179
180    /// Return whether this patch carries no field updates.
181    #[must_use]
182    pub(in crate::db) const fn is_empty(&self) -> bool {
183        self.entries.is_empty()
184    }
185}
186
187///
188/// SerializedFieldUpdate
189///
190/// SerializedFieldUpdate
191///
192/// SerializedFieldUpdate carries one ordered field-level mutation after the
193/// owning persisted-row field codec has already lowered the runtime `Value`
194/// into canonical slot payload bytes.
195/// This lets later patch-application stages consume one mechanical slot-patch
196/// artifact instead of rebuilding per-field encode dispatch.
197///
198
199#[allow(dead_code)]
200#[derive(Clone, Debug, Eq, PartialEq)]
201pub(in crate::db) struct SerializedFieldUpdate {
202    slot: FieldSlot,
203    payload: Vec<u8>,
204}
205
206#[allow(dead_code)]
207impl SerializedFieldUpdate {
208    /// Build one serialized structural field update.
209    #[must_use]
210    pub(in crate::db) const fn new(slot: FieldSlot, payload: Vec<u8>) -> Self {
211        Self { slot, payload }
212    }
213
214    /// Return the stable target slot.
215    #[must_use]
216    pub(in crate::db) const fn slot(&self) -> FieldSlot {
217        self.slot
218    }
219
220    /// Borrow the canonical slot payload bytes for this update when present.
221    #[must_use]
222    pub(in crate::db) const fn payload(&self) -> &[u8] {
223        self.payload.as_slice()
224    }
225}
226
227///
228/// SerializedUpdatePatch
229///
230/// SerializedUpdatePatch
231///
232/// SerializedUpdatePatch is the canonical serialized form of `UpdatePatch`
233/// over persisted-row slot payload bytes.
234/// This is the structural patch artifact later write-path stages can stage or
235/// replay without re-entering field-contract encode logic.
236///
237
238#[allow(dead_code)]
239#[derive(Clone, Debug, Default, Eq, PartialEq)]
240pub(in crate::db) struct SerializedUpdatePatch {
241    entries: Vec<SerializedFieldUpdate>,
242}
243
244#[allow(dead_code)]
245impl SerializedUpdatePatch {
246    /// Build one serialized patch from already encoded slot payloads.
247    #[must_use]
248    pub(in crate::db) const fn new(entries: Vec<SerializedFieldUpdate>) -> Self {
249        Self { entries }
250    }
251
252    /// Borrow the ordered serialized field updates carried by this patch.
253    #[must_use]
254    pub(in crate::db) const fn entries(&self) -> &[SerializedFieldUpdate] {
255        self.entries.as_slice()
256    }
257
258    /// Return whether this serialized patch carries no field updates.
259    #[must_use]
260    pub(in crate::db) const fn is_empty(&self) -> bool {
261        self.entries.is_empty()
262    }
263}
264
265///
266/// SlotReader
267///
268/// SlotReader exposes one persisted row as stable slot-addressable fields.
269/// Callers may inspect field presence, borrow raw field bytes, or decode one
270/// field value on demand.
271///
272
273pub trait SlotReader {
274    /// Return the structural model that owns this slot mapping.
275    fn model(&self) -> &'static EntityModel;
276
277    /// Return whether the given slot is present in the persisted row.
278    fn has(&self, slot: usize) -> bool;
279
280    /// Borrow the raw persisted payload for one slot when present.
281    fn get_bytes(&self, slot: usize) -> Option<&[u8]>;
282
283    /// Decode one slot as a scalar leaf when the field model declares a scalar codec.
284    fn get_scalar(&self, slot: usize) -> Result<Option<ScalarSlotValueRef<'_>>, InternalError>;
285
286    /// Decode one slot value on demand using the field contract declared by the model.
287    fn get_value(&mut self, slot: usize) -> Result<Option<Value>, InternalError>;
288}
289
290///
291/// CanonicalSlotReader
292///
293/// CanonicalSlotReader
294///
295/// CanonicalSlotReader is the stricter structural row-reader contract used
296/// once `0.65` canonical-row invariants are in force.
297/// Declared slots must already exist, so callers can fail closed on missing
298/// payloads instead of carrying absent-slot fallback branches.
299///
300
301pub(in crate::db) trait CanonicalSlotReader: SlotReader {
302    /// Borrow one declared slot payload, erroring when the persisted row is not canonical.
303    fn required_bytes(&self, slot: usize) -> Result<&[u8], InternalError> {
304        let field = field_model_for_slot(self.model(), slot)?;
305
306        self.get_bytes(slot)
307            .ok_or_else(|| InternalError::persisted_row_declared_field_missing(field.name()))
308    }
309
310    /// Read one scalar slot through the structural fast path without allowing
311    /// declared-slot absence.
312    fn required_scalar(&self, slot: usize) -> Result<ScalarSlotValueRef<'_>, InternalError> {
313        let field = field_model_for_slot(self.model(), slot)?;
314        debug_assert!(matches!(field.leaf_codec(), LeafCodec::Scalar(_)));
315
316        self.get_scalar(slot)?
317            .ok_or_else(|| InternalError::persisted_row_declared_field_missing(field.name()))
318    }
319
320    /// Decode one declared slot through the owning field contract without
321    /// allowing absent payloads.
322    fn required_value_by_contract(&self, slot: usize) -> Result<Value, InternalError> {
323        decode_slot_value_from_bytes(self.model(), slot, self.required_bytes(slot)?)
324    }
325}
326
327///
328/// SlotWriter
329///
330/// SlotWriter is the canonical row-container output seam used by persisted-row
331/// writers.
332///
333
334pub trait SlotWriter {
335    /// Record one slot payload for the current row.
336    fn write_slot(&mut self, slot: usize, payload: Option<&[u8]>) -> Result<(), InternalError>;
337
338    /// Record one scalar slot payload using the canonical scalar leaf envelope.
339    fn write_scalar(
340        &mut self,
341        slot: usize,
342        value: ScalarSlotValueRef<'_>,
343    ) -> Result<(), InternalError> {
344        let payload = encode_scalar_slot_value(value);
345
346        self.write_slot(slot, Some(payload.as_slice()))
347    }
348}
349
350// Resolve one staged slot cell by layout index before writer-specific payload handling.
351fn slot_cell_mut<T>(slots: &mut [T], slot: usize) -> Result<&mut T, InternalError> {
352    slots.get_mut(slot).ok_or_else(|| {
353        InternalError::persisted_row_encode_failed(
354            format!("slot {slot} is outside the row layout",),
355        )
356    })
357}
358
359// Reject slot clears at the canonical slot-image staging boundary while keeping
360// writer-specific error wording at the call site.
361fn required_slot_payload_bytes<'a>(
362    model: &'static EntityModel,
363    writer_label: &str,
364    slot: usize,
365    payload: Option<&'a [u8]>,
366) -> Result<&'a [u8], InternalError> {
367    payload.ok_or_else(|| {
368        InternalError::persisted_row_encode_failed(format!(
369            "{writer_label} cannot clear slot {slot} for entity '{}'",
370            model.path()
371        ))
372    })
373}
374
375// Encode one fixed-width slot table plus concatenated slot payload bytes into
376// the canonical row payload container.
377fn encode_slot_payload_from_parts(
378    slot_count: usize,
379    slot_table: &[(u32, u32)],
380    payload_bytes: &[u8],
381) -> Result<Vec<u8>, InternalError> {
382    let field_count = u16::try_from(slot_count).map_err(|_| {
383        InternalError::persisted_row_encode_failed(format!(
384            "field count {slot_count} exceeds u16 slot table capacity",
385        ))
386    })?;
387    let mut encoded = Vec::with_capacity(
388        usize::from(field_count) * (u32::BITS as usize / 4) + 2 + payload_bytes.len(),
389    );
390    encoded.extend_from_slice(&field_count.to_be_bytes());
391    for (start, len) in slot_table {
392        encoded.extend_from_slice(&start.to_be_bytes());
393        encoded.extend_from_slice(&len.to_be_bytes());
394    }
395    encoded.extend_from_slice(payload_bytes);
396
397    Ok(encoded)
398}
399
400///
401/// PersistedRow
402///
403/// PersistedRow is the derive-owned bridge between typed entities and
404/// slot-addressable persisted rows.
405/// It owns entity-specific materialization/default semantics while runtime
406/// paths stay structural at the row boundary.
407///
408
409pub trait PersistedRow: EntityKind + Sized {
410    /// Materialize one typed entity from one slot reader.
411    fn materialize_from_slots(slots: &mut dyn SlotReader) -> Result<Self, InternalError>;
412
413    /// Write one typed entity into one slot writer.
414    fn write_slots(&self, out: &mut dyn SlotWriter) -> Result<(), InternalError>;
415
416    /// Decode one slot value needed by structural planner/projection consumers.
417    fn project_slot(slots: &mut dyn SlotReader, slot: usize) -> Result<Option<Value>, InternalError>
418    where
419        Self: crate::traits::FieldProjection,
420    {
421        let entity = Self::materialize_from_slots(slots)?;
422
423        Ok(<Self as crate::traits::FieldProjection>::get_value_by_index(&entity, slot))
424    }
425}
426
427/// Decode one slot value through the declared field contract without routing
428/// through `SlotReader::get_value`.
429#[cfg(test)]
430pub(in crate::db) fn decode_slot_value_by_contract(
431    slots: &dyn SlotReader,
432    slot: usize,
433) -> Result<Option<Value>, InternalError> {
434    let Some(raw_value) = slots.get_bytes(slot) else {
435        return Ok(None);
436    };
437
438    decode_slot_value_from_bytes(slots.model(), slot, raw_value).map(Some)
439}
440
441/// Decode one structural slot payload using the owning model field contract.
442///
443/// This is the canonical field-level decode boundary for persisted-row bytes.
444/// Higher-level row readers may still cache decoded values, but they should not
445/// rebuild scalar-vs-CBOR field dispatch themselves.
446pub(in crate::db) fn decode_slot_value_from_bytes(
447    model: &'static EntityModel,
448    slot: usize,
449    raw_value: &[u8],
450) -> Result<Value, InternalError> {
451    let field = field_model_for_slot(model, slot)?;
452
453    match field.leaf_codec() {
454        LeafCodec::Scalar(codec) => match decode_scalar_slot_value(raw_value, codec, field.name())?
455        {
456            ScalarSlotValueRef::Null => Ok(Value::Null),
457            ScalarSlotValueRef::Value(value) => Ok(value.into_value()),
458        },
459        LeafCodec::CborFallback => decode_non_scalar_slot_value(raw_value, field),
460    }
461}
462
463/// Encode one structural slot value using the owning model field contract.
464///
465/// This is the initial `0.64` write-side field-codec boundary. It currently
466/// covers:
467/// - scalar leaf slots
468/// - `FieldStorageDecode::Value` slots
469///
470/// Composite `ByKind` field encoding remains a follow-up slice so the runtime
471/// can add one structural encoder owner instead of quietly rebuilding typed
472/// per-field branches.
473#[allow(dead_code)]
474pub(in crate::db) fn encode_slot_value_from_value(
475    model: &'static EntityModel,
476    slot: usize,
477    value: &Value,
478) -> Result<Vec<u8>, InternalError> {
479    let field = field_model_for_slot(model, slot)?;
480    ensure_slot_value_matches_field_contract(field, value)?;
481
482    match field.storage_decode() {
483        FieldStorageDecode::Value => serialize(value)
484            .map_err(|err| InternalError::persisted_row_field_encode_failed(field.name(), err)),
485        FieldStorageDecode::ByKind => match field.leaf_codec() {
486            LeafCodec::Scalar(_) => {
487                let scalar = compile_scalar_literal_expr_value(value).ok_or_else(|| {
488                    InternalError::persisted_row_field_encode_failed(
489                        field.name(),
490                        format!(
491                            "field kind {:?} requires a scalar runtime value, found {value:?}",
492                            field.kind()
493                        ),
494                    )
495                })?;
496
497                Ok(encode_scalar_slot_value(scalar.as_slot_value_ref()))
498            }
499            LeafCodec::CborFallback => {
500                encode_structural_field_bytes_by_kind(field.kind(), value, field.name())
501            }
502        },
503    }
504}
505
506// Decode one slot payload and immediately re-encode it through the current
507// field contract so every row-emission path normalizes bytes at the boundary.
508fn canonicalize_slot_payload(
509    model: &'static EntityModel,
510    slot: usize,
511    raw_value: &[u8],
512) -> Result<Vec<u8>, InternalError> {
513    let value = decode_slot_value_from_bytes(model, slot, raw_value)?;
514
515    encode_slot_value_from_value(model, slot, &value)
516}
517
518// Emit one raw row from a dense canonical slot image.
519fn emit_raw_row_from_slot_payloads(
520    model: &'static EntityModel,
521    slot_payloads: &[Vec<u8>],
522) -> Result<CanonicalRow, InternalError> {
523    if slot_payloads.len() != model.fields().len() {
524        return Err(InternalError::persisted_row_encode_failed(format!(
525            "canonical slot image expected {} slots for entity '{}', found {}",
526            model.fields().len(),
527            model.path(),
528            slot_payloads.len()
529        )));
530    }
531
532    let mut writer = SlotBufferWriter::for_model(model);
533
534    // Phase 1: write the already canonicalized dense slot image through the
535    // single row-container authority.
536    for (slot, payload) in slot_payloads.iter().enumerate() {
537        writer.write_slot(slot, Some(payload.as_slice()))?;
538    }
539
540    // Phase 2: wrap the canonical slot container in the shared row envelope.
541    let encoded = serialize_row_payload(writer.finish()?)?;
542    let raw_row = RawRow::from_untrusted_bytes(encoded).map_err(InternalError::from)?;
543
544    Ok(CanonicalRow::from_canonical_raw_row(raw_row))
545}
546
547// Build one dense canonical slot image from a serialized patch, failing closed
548// when any declared slot is missing or any payload is non-canonical.
549fn dense_canonical_slot_image_from_serialized_patch(
550    model: &'static EntityModel,
551    patch: &SerializedUpdatePatch,
552) -> Result<Vec<Vec<u8>>, InternalError> {
553    let patch_payloads = serialized_patch_payload_by_slot(model, patch)?;
554    let mut slot_payloads = Vec::with_capacity(model.fields().len());
555
556    for (slot, payload) in patch_payloads.into_iter().enumerate() {
557        let payload = payload.ok_or_else(|| {
558            InternalError::persisted_row_encode_failed(format!(
559                "serialized patch did not emit slot {slot} for entity '{}'",
560                model.path()
561            ))
562        })?;
563        slot_payloads.push(canonicalize_slot_payload(model, slot, payload)?);
564    }
565
566    Ok(slot_payloads)
567}
568
569/// Build one canonical row from one serialized structural patch that already
570/// describes a full logical row image.
571pub(in crate::db) fn canonical_row_from_serialized_update_patch(
572    model: &'static EntityModel,
573    patch: &SerializedUpdatePatch,
574) -> Result<CanonicalRow, InternalError> {
575    let slot_payloads = dense_canonical_slot_image_from_serialized_patch(model, patch)?;
576
577    emit_raw_row_from_slot_payloads(model, slot_payloads.as_slice())
578}
579
580// Rebuild one full canonical row image from an existing raw row before it
581// crosses a storage write boundary.
582pub(in crate::db) fn canonical_row_from_raw_row(
583    model: &'static EntityModel,
584    raw_row: &RawRow,
585) -> Result<CanonicalRow, InternalError> {
586    let field_bytes = StructuralRowFieldBytes::from_raw_row(raw_row, model)
587        .map_err(StructuralRowDecodeError::into_internal_error)?;
588    let mut slot_payloads = Vec::with_capacity(model.fields().len());
589
590    // Phase 1: canonicalize every declared slot from the existing row image.
591    for slot in 0..model.fields().len() {
592        let existing_payload = field_bytes.field(slot).ok_or_else(|| {
593            InternalError::persisted_row_encode_failed(format!(
594                "slot {slot} is missing from the baseline row for entity '{}'",
595                model.path()
596            ))
597        })?;
598        slot_payloads.push(canonicalize_slot_payload(model, slot, existing_payload)?);
599    }
600
601    // Phase 2: re-emit the full image through the single row-emission owner.
602    emit_raw_row_from_slot_payloads(model, slot_payloads.as_slice())
603}
604
605// Rewrap one row already loaded from storage as a canonical write token.
606pub(in crate::db) const fn canonical_row_from_stored_raw_row(raw_row: RawRow) -> CanonicalRow {
607    CanonicalRow::from_canonical_raw_row(raw_row)
608}
609
610/// Apply one ordered structural patch to one raw row using the current
611/// persisted-row field codec authority.
612#[allow(dead_code)]
613pub(in crate::db) fn apply_update_patch_to_raw_row(
614    model: &'static EntityModel,
615    raw_row: &RawRow,
616    patch: &UpdatePatch,
617) -> Result<CanonicalRow, InternalError> {
618    let serialized_patch = serialize_update_patch_fields(model, patch)?;
619
620    apply_serialized_update_patch_to_raw_row(model, raw_row, &serialized_patch)
621}
622
623/// Serialize one ordered structural patch into canonical slot payload bytes.
624///
625/// This is the phase-1 partial-serialization seam for `0.64`: later mutation
626/// stages can stage or replay one field patch without rebuilding the runtime
627/// value-to-bytes contract per consumer.
628#[allow(dead_code)]
629pub(in crate::db) fn serialize_update_patch_fields(
630    model: &'static EntityModel,
631    patch: &UpdatePatch,
632) -> Result<SerializedUpdatePatch, InternalError> {
633    if patch.is_empty() {
634        return Ok(SerializedUpdatePatch::default());
635    }
636
637    let mut entries = Vec::with_capacity(patch.entries().len());
638
639    // Phase 1: validate and encode each ordered field update through the
640    // canonical slot codec owner.
641    for entry in patch.entries() {
642        let slot = entry.slot();
643        let payload = encode_slot_value_from_value(model, slot.index(), entry.value())?;
644        entries.push(SerializedFieldUpdate::new(slot, payload));
645    }
646
647    Ok(SerializedUpdatePatch::new(entries))
648}
649
650/// Serialize one full typed entity image into the canonical serialized patch
651/// artifact used by row-boundary patch replay.
652///
653/// This keeps typed save/update APIs on the existing surface while moving the
654/// actual after-image staging onto the structural slot-patch boundary.
655#[allow(dead_code)]
656pub(in crate::db) fn serialize_entity_slots_as_update_patch<E>(
657    entity: &E,
658) -> Result<SerializedUpdatePatch, InternalError>
659where
660    E: PersistedRow,
661{
662    let mut writer = SerializedPatchWriter::for_model(E::MODEL);
663
664    // Phase 1: let the derive-owned persisted-row writer emit the complete
665    // structural slot image for this entity.
666    entity.write_slots(&mut writer)?;
667
668    // Phase 2: require a dense slot image so save/update replay remains
669    // equivalent to the existing full-row write semantics.
670    writer.finish_complete()
671}
672
673/// Apply one serialized structural patch to one raw row.
674///
675/// This mechanical replay step no longer owns any `Value -> bytes` dispatch.
676/// It only replays already encoded slot payloads over the current row layout.
677#[allow(dead_code)]
678pub(in crate::db) fn apply_serialized_update_patch_to_raw_row(
679    model: &'static EntityModel,
680    raw_row: &RawRow,
681    patch: &SerializedUpdatePatch,
682) -> Result<CanonicalRow, InternalError> {
683    if patch.is_empty() {
684        return canonical_row_from_raw_row(model, raw_row);
685    }
686
687    let field_bytes = StructuralRowFieldBytes::from_raw_row(raw_row, model)
688        .map_err(StructuralRowDecodeError::into_internal_error)?;
689    let patch_payloads = serialized_patch_payload_by_slot(model, patch)?;
690    let mut slot_payloads = Vec::with_capacity(model.fields().len());
691
692    // Phase 1: replay the current row layout slot-by-slot.
693    // Both patch and baseline bytes are normalized through the field contract
694    // so no opaque payload can cross into the emitted row image.
695    for (slot, patch_payload) in patch_payloads.iter().enumerate() {
696        if let Some(payload) = patch_payload {
697            slot_payloads.push(canonicalize_slot_payload(model, slot, payload)?);
698        } else {
699            let existing_payload = field_bytes.field(slot).ok_or_else(|| {
700                InternalError::persisted_row_encode_failed(format!(
701                    "slot {slot} is missing from the baseline row for entity '{}'",
702                    model.path()
703                ))
704            })?;
705            slot_payloads.push(canonicalize_slot_payload(model, slot, existing_payload)?);
706        }
707    }
708
709    // Phase 2: emit the rebuilt row through the single row-construction owner.
710    emit_raw_row_from_slot_payloads(model, slot_payloads.as_slice())
711}
712
713// Decode one non-scalar slot through the exact persisted contract declared by
714// the field model.
715fn decode_non_scalar_slot_value(
716    raw_value: &[u8],
717    field: &FieldModel,
718) -> Result<Value, InternalError> {
719    let decoded = match field.storage_decode() {
720        crate::model::field::FieldStorageDecode::ByKind => {
721            decode_structural_field_by_kind_bytes(raw_value, field.kind())
722        }
723        crate::model::field::FieldStorageDecode::Value => {
724            decode_structural_value_storage_bytes(raw_value)
725        }
726    };
727
728    decoded.map_err(|err| {
729        InternalError::persisted_row_field_kind_decode_failed(field.name(), field.kind(), err)
730    })
731}
732
733// Validate one runtime value against the persisted field contract before field-
734// level structural encoding writes bytes into a row slot.
735#[allow(dead_code)]
736fn ensure_slot_value_matches_field_contract(
737    field: &FieldModel,
738    value: &Value,
739) -> Result<(), InternalError> {
740    if matches!(value, Value::Null) {
741        return Ok(());
742    }
743
744    if matches!(field.kind(), FieldKind::Structured { queryable: false })
745        && matches!(field.storage_decode(), FieldStorageDecode::Value)
746    {
747        return ensure_value_is_deterministic_for_storage(field.name(), field.kind(), value);
748    }
749
750    let field_type = field_type_from_model_kind(&field.kind());
751    if !literal_matches_type(value, &field_type) {
752        return Err(InternalError::persisted_row_field_encode_failed(
753            field.name(),
754            format!(
755                "field kind {:?} does not accept runtime value {value:?}",
756                field.kind()
757            ),
758        ));
759    }
760
761    ensure_decimal_scale_matches(field.name(), field.kind(), value)?;
762    ensure_value_is_deterministic_for_storage(field.name(), field.kind(), value)
763}
764
765// Enforce fixed decimal scales through nested collection/map shapes before a
766// field-level patch value is persisted.
767#[allow(dead_code)]
768fn ensure_decimal_scale_matches(
769    field_name: &str,
770    kind: FieldKind,
771    value: &Value,
772) -> Result<(), InternalError> {
773    if matches!(value, Value::Null) {
774        return Ok(());
775    }
776
777    match (kind, value) {
778        (FieldKind::Decimal { scale }, Value::Decimal(decimal)) => {
779            if decimal.scale() != scale {
780                return Err(InternalError::persisted_row_field_encode_failed(
781                    field_name,
782                    format!(
783                        "decimal scale mismatch: expected {scale}, found {}",
784                        decimal.scale()
785                    ),
786                ));
787            }
788
789            Ok(())
790        }
791        (FieldKind::Relation { key_kind, .. }, value) => {
792            ensure_decimal_scale_matches(field_name, *key_kind, value)
793        }
794        (FieldKind::List(inner) | FieldKind::Set(inner), Value::List(items)) => {
795            for item in items {
796                ensure_decimal_scale_matches(field_name, *inner, item)?;
797            }
798
799            Ok(())
800        }
801        (
802            FieldKind::Map {
803                key,
804                value: map_value,
805            },
806            Value::Map(entries),
807        ) => {
808            for (entry_key, entry_value) in entries {
809                ensure_decimal_scale_matches(field_name, *key, entry_key)?;
810                ensure_decimal_scale_matches(field_name, *map_value, entry_value)?;
811            }
812
813            Ok(())
814        }
815        _ => Ok(()),
816    }
817}
818
819// Enforce the canonical persisted ordering rules for set/map shapes before one
820// field-level patch value becomes row bytes.
821#[allow(dead_code)]
822fn ensure_value_is_deterministic_for_storage(
823    field_name: &str,
824    kind: FieldKind,
825    value: &Value,
826) -> Result<(), InternalError> {
827    match (kind, value) {
828        (FieldKind::Set(_), Value::List(items)) => {
829            for pair in items.windows(2) {
830                let [left, right] = pair else {
831                    continue;
832                };
833                if Value::canonical_cmp(left, right) != Ordering::Less {
834                    return Err(InternalError::persisted_row_field_encode_failed(
835                        field_name,
836                        "set payload must already be canonical and deduplicated",
837                    ));
838                }
839            }
840
841            Ok(())
842        }
843        (FieldKind::Map { .. }, Value::Map(entries)) => {
844            Value::validate_map_entries(entries.as_slice())
845                .map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err))?;
846
847            if !Value::map_entries_are_strictly_canonical(entries.as_slice()) {
848                return Err(InternalError::persisted_row_field_encode_failed(
849                    field_name,
850                    "map payload must already be canonical and deduplicated",
851                ));
852            }
853
854            Ok(())
855        }
856        _ => Ok(()),
857    }
858}
859
860// Materialize the last-write-wins serialized patch view indexed by stable slot.
861fn serialized_patch_payload_by_slot<'a>(
862    model: &'static EntityModel,
863    patch: &'a SerializedUpdatePatch,
864) -> Result<Vec<Option<&'a [u8]>>, InternalError> {
865    let mut payloads = vec![None; model.fields().len()];
866
867    for entry in patch.entries() {
868        let slot = entry.slot().index();
869        field_model_for_slot(model, slot)?;
870        payloads[slot] = Some(entry.payload());
871    }
872
873    Ok(payloads)
874}
875
876// Encode one `ByKind` field payload into the raw CBOR shape expected by the
877// structural field decoder.
878fn encode_structural_field_bytes_by_kind(
879    kind: FieldKind,
880    value: &Value,
881    field_name: &str,
882) -> Result<Vec<u8>, InternalError> {
883    let cbor_value = encode_structural_field_cbor_by_kind(kind, value, field_name)?;
884
885    serialize(&cbor_value)
886        .map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err))
887}
888
889// Encode one `ByKind` field payload into its raw CBOR value form.
890fn encode_structural_field_cbor_by_kind(
891    kind: FieldKind,
892    value: &Value,
893    field_name: &str,
894) -> Result<CborValue, InternalError> {
895    match (kind, value) {
896        (_, Value::Null) => Ok(CborValue::Null),
897        (FieldKind::Blob, Value::Blob(value)) => Ok(CborValue::Bytes(value.clone())),
898        (FieldKind::Bool, Value::Bool(value)) => Ok(CborValue::Bool(*value)),
899        (FieldKind::Text, Value::Text(value)) => Ok(CborValue::Text(value.clone())),
900        (FieldKind::Int, Value::Int(value)) => Ok(CborValue::Integer(i128::from(*value))),
901        (FieldKind::Uint, Value::Uint(value)) => Ok(CborValue::Integer(i128::from(*value))),
902        (FieldKind::Float32, Value::Float32(value)) => to_cbor_value(value)
903            .map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err)),
904        (FieldKind::Float64, Value::Float64(value)) => to_cbor_value(value)
905            .map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err)),
906        (FieldKind::Int128, Value::Int128(value)) => to_cbor_value(value)
907            .map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err)),
908        (FieldKind::Uint128, Value::Uint128(value)) => to_cbor_value(value)
909            .map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err)),
910        (FieldKind::Ulid, Value::Ulid(value)) => Ok(CborValue::Text(value.to_string())),
911        (FieldKind::Account, Value::Account(value)) => encode_leaf_cbor_value(value, field_name),
912        (FieldKind::Date, Value::Date(value)) => encode_leaf_cbor_value(value, field_name),
913        (FieldKind::Decimal { .. }, Value::Decimal(value)) => {
914            encode_leaf_cbor_value(value, field_name)
915        }
916        (FieldKind::Duration, Value::Duration(value)) => encode_leaf_cbor_value(value, field_name),
917        (FieldKind::IntBig, Value::IntBig(value)) => encode_leaf_cbor_value(value, field_name),
918        (FieldKind::Principal, Value::Principal(value)) => {
919            encode_leaf_cbor_value(value, field_name)
920        }
921        (FieldKind::Subaccount, Value::Subaccount(value)) => {
922            encode_leaf_cbor_value(value, field_name)
923        }
924        (FieldKind::Timestamp, Value::Timestamp(value)) => {
925            encode_leaf_cbor_value(value, field_name)
926        }
927        (FieldKind::UintBig, Value::UintBig(value)) => encode_leaf_cbor_value(value, field_name),
928        (FieldKind::Unit, Value::Unit) => encode_leaf_cbor_value(&(), field_name),
929        (FieldKind::Relation { key_kind, .. }, value) => {
930            encode_structural_field_cbor_by_kind(*key_kind, value, field_name)
931        }
932        (FieldKind::List(inner) | FieldKind::Set(inner), Value::List(items)) => {
933            Ok(CborValue::Array(
934                items
935                    .iter()
936                    .map(|item| encode_structural_field_cbor_by_kind(*inner, item, field_name))
937                    .collect::<Result<Vec<_>, _>>()?,
938            ))
939        }
940        (FieldKind::Map { key, value }, Value::Map(entries)) => {
941            let mut encoded = BTreeMap::new();
942            for (entry_key, entry_value) in entries {
943                encoded.insert(
944                    encode_structural_field_cbor_by_kind(*key, entry_key, field_name)?,
945                    encode_structural_field_cbor_by_kind(*value, entry_value, field_name)?,
946                );
947            }
948
949            Ok(CborValue::Map(encoded))
950        }
951        (FieldKind::Enum { path, variants }, Value::Enum(value)) => {
952            encode_enum_cbor_value(path, variants, value, field_name)
953        }
954        (FieldKind::Structured { .. }, _) => Err(InternalError::persisted_row_field_encode_failed(
955            field_name,
956            "structured ByKind field encoding is unsupported",
957        )),
958        _ => Err(InternalError::persisted_row_field_encode_failed(
959            field_name,
960            format!("field kind {kind:?} does not accept runtime value {value:?}"),
961        )),
962    }
963}
964
965// Encode one typed leaf wrapper into its raw CBOR value form.
966fn encode_leaf_cbor_value<T>(value: &T, field_name: &str) -> Result<CborValue, InternalError>
967where
968    T: serde::Serialize,
969{
970    to_cbor_value(value)
971        .map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err))
972}
973
974// Encode one enum field using the same unit-vs-one-entry-map envelope expected
975// by structural enum decode.
976fn encode_enum_cbor_value(
977    path: &'static str,
978    variants: &'static [crate::model::field::EnumVariantModel],
979    value: &ValueEnum,
980    field_name: &str,
981) -> Result<CborValue, InternalError> {
982    if let Some(actual_path) = value.path()
983        && actual_path != path
984    {
985        return Err(InternalError::persisted_row_field_encode_failed(
986            field_name,
987            format!("enum path mismatch: expected '{path}', found '{actual_path}'"),
988        ));
989    }
990
991    let Some(payload) = value.payload() else {
992        return Ok(CborValue::Text(value.variant().to_string()));
993    };
994
995    let Some(variant_model) = variants.iter().find(|item| item.ident() == value.variant()) else {
996        return Err(InternalError::persisted_row_field_encode_failed(
997            field_name,
998            format!(
999                "unknown enum variant '{}' for path '{path}'",
1000                value.variant()
1001            ),
1002        ));
1003    };
1004    let Some(payload_kind) = variant_model.payload_kind() else {
1005        return Err(InternalError::persisted_row_field_encode_failed(
1006            field_name,
1007            format!(
1008                "enum variant '{}' does not accept a payload",
1009                value.variant()
1010            ),
1011        ));
1012    };
1013
1014    let payload_value = match variant_model.payload_storage_decode() {
1015        FieldStorageDecode::ByKind => {
1016            encode_structural_field_cbor_by_kind(*payload_kind, payload, field_name)?
1017        }
1018        FieldStorageDecode::Value => to_cbor_value(payload)
1019            .map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err))?,
1020    };
1021
1022    let mut encoded = BTreeMap::new();
1023    encoded.insert(CborValue::Text(value.variant().to_string()), payload_value);
1024
1025    Ok(CborValue::Map(encoded))
1026}
1027
1028// Resolve one field model entry by stable slot index.
1029fn field_model_for_slot(
1030    model: &'static EntityModel,
1031    slot: usize,
1032) -> Result<&'static FieldModel, InternalError> {
1033    model
1034        .fields()
1035        .get(slot)
1036        .ok_or_else(|| InternalError::persisted_row_slot_lookup_out_of_bounds(model.path(), slot))
1037}
1038
1039///
1040/// ScalarValueRef
1041///
1042/// ScalarValueRef is the borrowed-or-copy scalar payload view returned by the
1043/// slot-reader fast path.
1044/// It preserves cheap references for text/blob payloads while keeping fixed
1045/// width scalar wrappers as copy values.
1046///
1047
1048#[derive(Clone, Copy, Debug)]
1049pub enum ScalarValueRef<'a> {
1050    Blob(&'a [u8]),
1051    Bool(bool),
1052    Date(Date),
1053    Duration(Duration),
1054    Float32(Float32),
1055    Float64(Float64),
1056    Int(i64),
1057    Principal(Principal),
1058    Subaccount(Subaccount),
1059    Text(&'a str),
1060    Timestamp(Timestamp),
1061    Uint(u64),
1062    Ulid(Ulid),
1063    Unit,
1064}
1065
1066impl ScalarValueRef<'_> {
1067    /// Materialize this scalar view into the runtime `Value` enum.
1068    #[must_use]
1069    pub fn into_value(self) -> Value {
1070        match self {
1071            Self::Blob(value) => Value::Blob(value.to_vec()),
1072            Self::Bool(value) => Value::Bool(value),
1073            Self::Date(value) => Value::Date(value),
1074            Self::Duration(value) => Value::Duration(value),
1075            Self::Float32(value) => Value::Float32(value),
1076            Self::Float64(value) => Value::Float64(value),
1077            Self::Int(value) => Value::Int(value),
1078            Self::Principal(value) => Value::Principal(value),
1079            Self::Subaccount(value) => Value::Subaccount(value),
1080            Self::Text(value) => Value::Text(value.to_owned()),
1081            Self::Timestamp(value) => Value::Timestamp(value),
1082            Self::Uint(value) => Value::Uint(value),
1083            Self::Ulid(value) => Value::Ulid(value),
1084            Self::Unit => Value::Unit,
1085        }
1086    }
1087}
1088
1089///
1090/// ScalarSlotValueRef
1091///
1092/// ScalarSlotValueRef preserves the distinction between a missing slot and an
1093/// explicitly persisted `NULL` scalar payload.
1094/// The outer `Option` from `SlotReader::get_scalar` therefore still means
1095/// "slot absent".
1096///
1097
1098#[derive(Clone, Copy, Debug)]
1099pub enum ScalarSlotValueRef<'a> {
1100    Null,
1101    Value(ScalarValueRef<'a>),
1102}
1103
1104///
1105/// PersistedScalar
1106///
1107/// PersistedScalar defines the canonical binary payload codec for one scalar
1108/// leaf type.
1109/// Derive-generated persisted-row materializers and writers use this trait to
1110/// avoid routing scalar fields back through CBOR.
1111///
1112
1113pub trait PersistedScalar: Sized {
1114    /// Canonical scalar codec identifier used by schema/runtime metadata.
1115    const CODEC: ScalarCodec;
1116
1117    /// Encode this scalar value into its codec-specific payload bytes.
1118    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError>;
1119
1120    /// Decode this scalar value from its codec-specific payload bytes.
1121    fn decode_scalar_payload(bytes: &[u8], field_name: &'static str)
1122    -> Result<Self, InternalError>;
1123}
1124
1125/// Encode one persisted slot payload using the shared leaf codec boundary.
1126pub fn encode_persisted_slot_payload<T>(
1127    value: &T,
1128    field_name: &'static str,
1129) -> Result<Vec<u8>, InternalError>
1130where
1131    T: serde::Serialize,
1132{
1133    serialize(value)
1134        .map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err))
1135}
1136
1137/// Encode one persisted scalar slot payload using the canonical scalar envelope.
1138pub fn encode_persisted_scalar_slot_payload<T>(
1139    value: &T,
1140    field_name: &'static str,
1141) -> Result<Vec<u8>, InternalError>
1142where
1143    T: PersistedScalar,
1144{
1145    let payload = value.encode_scalar_payload()?;
1146    let mut encoded = Vec::with_capacity(payload.len() + 2);
1147    encoded.push(SCALAR_SLOT_PREFIX);
1148    encoded.push(SCALAR_SLOT_TAG_VALUE);
1149    encoded.extend_from_slice(&payload);
1150
1151    if encoded.len() < 2 {
1152        return Err(InternalError::persisted_row_field_encode_failed(
1153            field_name,
1154            "scalar payload envelope underflow",
1155        ));
1156    }
1157
1158    Ok(encoded)
1159}
1160
1161/// Encode one optional persisted scalar slot payload preserving explicit `NULL`.
1162pub fn encode_persisted_option_scalar_slot_payload<T>(
1163    value: &Option<T>,
1164    field_name: &'static str,
1165) -> Result<Vec<u8>, InternalError>
1166where
1167    T: PersistedScalar,
1168{
1169    match value {
1170        Some(value) => encode_persisted_scalar_slot_payload(value, field_name),
1171        None => Ok(vec![SCALAR_SLOT_PREFIX, SCALAR_SLOT_TAG_NULL]),
1172    }
1173}
1174
1175/// Decode one persisted slot payload using the shared leaf codec boundary.
1176pub fn decode_persisted_slot_payload<T>(
1177    bytes: &[u8],
1178    field_name: &'static str,
1179) -> Result<T, InternalError>
1180where
1181    T: serde::de::DeserializeOwned,
1182{
1183    deserialize(bytes)
1184        .map_err(|err| InternalError::persisted_row_field_decode_failed(field_name, err))
1185}
1186
1187/// Decode one persisted scalar slot payload using the canonical scalar envelope.
1188pub fn decode_persisted_scalar_slot_payload<T>(
1189    bytes: &[u8],
1190    field_name: &'static str,
1191) -> Result<T, InternalError>
1192where
1193    T: PersistedScalar,
1194{
1195    let payload = decode_scalar_slot_payload_body(bytes, field_name)?.ok_or_else(|| {
1196        InternalError::persisted_row_field_decode_failed(
1197            field_name,
1198            "unexpected null for non-nullable scalar field",
1199        )
1200    })?;
1201
1202    T::decode_scalar_payload(payload, field_name)
1203}
1204
1205/// Decode one optional persisted scalar slot payload preserving explicit `NULL`.
1206pub fn decode_persisted_option_scalar_slot_payload<T>(
1207    bytes: &[u8],
1208    field_name: &'static str,
1209) -> Result<Option<T>, InternalError>
1210where
1211    T: PersistedScalar,
1212{
1213    let Some(payload) = decode_scalar_slot_payload_body(bytes, field_name)? else {
1214        return Ok(None);
1215    };
1216
1217    T::decode_scalar_payload(payload, field_name).map(Some)
1218}
1219
1220///
1221/// SlotBufferWriter
1222///
1223/// SlotBufferWriter captures one dense canonical row worth of slot payloads
1224/// before they are encoded into the canonical slot container.
1225///
1226
1227pub(in crate::db) struct SlotBufferWriter {
1228    model: &'static EntityModel,
1229    slots: Vec<SlotBufferSlot>,
1230}
1231
1232impl SlotBufferWriter {
1233    /// Build one empty slot buffer for one entity model.
1234    pub(in crate::db) fn for_model(model: &'static EntityModel) -> Self {
1235        Self {
1236            model,
1237            slots: vec![SlotBufferSlot::Missing; model.fields().len()],
1238        }
1239    }
1240
1241    /// Encode the buffered slots into the canonical row payload.
1242    pub(in crate::db) fn finish(self) -> Result<Vec<u8>, InternalError> {
1243        let slot_count = self.slots.len();
1244        let mut payload_bytes = Vec::new();
1245        let mut slot_table = Vec::with_capacity(slot_count);
1246
1247        // Phase 1: require one payload for every declared slot before the row
1248        // can cross the canonical persisted-row boundary.
1249        for (slot, slot_payload) in self.slots.into_iter().enumerate() {
1250            match slot_payload {
1251                SlotBufferSlot::Set(bytes) => {
1252                    let start = u32::try_from(payload_bytes.len()).map_err(|_| {
1253                        InternalError::persisted_row_encode_failed(
1254                            "slot payload start exceeds u32 range",
1255                        )
1256                    })?;
1257                    let len = u32::try_from(bytes.len()).map_err(|_| {
1258                        InternalError::persisted_row_encode_failed(
1259                            "slot payload length exceeds u32 range",
1260                        )
1261                    })?;
1262                    payload_bytes.extend_from_slice(&bytes);
1263                    slot_table.push((start, len));
1264                }
1265                SlotBufferSlot::Missing => {
1266                    return Err(InternalError::persisted_row_encode_failed(format!(
1267                        "slot buffer writer did not emit slot {slot} for entity '{}'",
1268                        self.model.path()
1269                    )));
1270                }
1271            }
1272        }
1273
1274        // Phase 2: flatten the slot table plus payload bytes into the canonical row image.
1275        encode_slot_payload_from_parts(slot_count, slot_table.as_slice(), payload_bytes.as_slice())
1276    }
1277}
1278
1279impl SlotWriter for SlotBufferWriter {
1280    fn write_slot(&mut self, slot: usize, payload: Option<&[u8]>) -> Result<(), InternalError> {
1281        let entry = slot_cell_mut(self.slots.as_mut_slice(), slot)?;
1282        let payload = required_slot_payload_bytes(self.model, "slot buffer writer", slot, payload)?;
1283        *entry = SlotBufferSlot::Set(payload.to_vec());
1284
1285        Ok(())
1286    }
1287}
1288
1289///
1290/// SlotBufferSlot
1291///
1292/// SlotBufferSlot tracks whether one canonical row encoder has emitted a
1293/// payload for every declared slot before flattening the row payload.
1294///
1295
1296#[derive(Clone, Debug, Eq, PartialEq)]
1297enum SlotBufferSlot {
1298    Missing,
1299    Set(Vec<u8>),
1300}
1301
1302///
1303/// SerializedPatchWriter
1304///
1305/// SerializedPatchWriter
1306///
1307/// SerializedPatchWriter captures a dense typed entity slot image into the
1308/// serialized patch artifact used by `0.64` mutation staging.
1309/// Unlike `SlotBufferWriter`, this writer does not flatten into one row payload;
1310/// it preserves slot-level ownership so later stages can replay the row through
1311/// the structural patch boundary.
1312///
1313
1314struct SerializedPatchWriter {
1315    model: &'static EntityModel,
1316    slots: Vec<PatchWriterSlot>,
1317}
1318
1319impl SerializedPatchWriter {
1320    /// Build one empty serialized patch writer for one entity model.
1321    fn for_model(model: &'static EntityModel) -> Self {
1322        Self {
1323            model,
1324            slots: vec![PatchWriterSlot::Missing; model.fields().len()],
1325        }
1326    }
1327
1328    /// Materialize one dense serialized patch, erroring if the writer failed
1329    /// to emit any declared slot.
1330    fn finish_complete(self) -> Result<SerializedUpdatePatch, InternalError> {
1331        let mut entries = Vec::with_capacity(self.slots.len());
1332
1333        // Phase 1: require a complete slot image so typed save/update staging
1334        // stays equivalent to the existing full-row encoder.
1335        for (slot, payload) in self.slots.into_iter().enumerate() {
1336            let field_slot = FieldSlot::from_index(self.model, slot)?;
1337            let serialized = match payload {
1338                PatchWriterSlot::Set(payload) => SerializedFieldUpdate::new(field_slot, payload),
1339                PatchWriterSlot::Missing => {
1340                    return Err(InternalError::persisted_row_encode_failed(format!(
1341                        "serialized patch writer did not emit slot {slot} for entity '{}'",
1342                        self.model.path()
1343                    )));
1344                }
1345            };
1346            entries.push(serialized);
1347        }
1348
1349        Ok(SerializedUpdatePatch::new(entries))
1350    }
1351}
1352
1353impl SlotWriter for SerializedPatchWriter {
1354    fn write_slot(&mut self, slot: usize, payload: Option<&[u8]>) -> Result<(), InternalError> {
1355        let entry = slot_cell_mut(self.slots.as_mut_slice(), slot)?;
1356        let payload =
1357            required_slot_payload_bytes(self.model, "serialized patch writer", slot, payload)?;
1358        *entry = PatchWriterSlot::Set(payload.to_vec());
1359
1360        Ok(())
1361    }
1362}
1363
1364///
1365/// PatchWriterSlot
1366///
1367/// PatchWriterSlot
1368///
1369/// PatchWriterSlot tracks whether one dense slot-image writer has emitted a
1370/// payload or failed to visit the slot at all.
1371/// That lets the typed save/update bridge reject incomplete writers instead of
1372/// silently leaving stale bytes in the baseline row.
1373///
1374
1375#[derive(Clone, Debug, Eq, PartialEq)]
1376enum PatchWriterSlot {
1377    Missing,
1378    Set(Vec<u8>),
1379}
1380
1381///
1382/// StructuralSlotReader
1383///
1384/// StructuralSlotReader adapts the current persisted-row bytes into the
1385/// canonical slot-reader seam.
1386/// It validates row shape and fully decodes every declared field before any
1387/// consumer can observe the row, then keeps those decoded values cached so
1388/// later index/predicate reads do not re-run field decoders.
1389///
1390
1391pub(in crate::db) struct StructuralSlotReader<'a> {
1392    model: &'static EntityModel,
1393    field_bytes: StructuralRowFieldBytes<'a>,
1394    cached_values: Vec<CachedSlotValue>,
1395}
1396
1397impl<'a> StructuralSlotReader<'a> {
1398    /// Build one slot reader over one persisted row using the current structural row scanner.
1399    pub(in crate::db) fn from_raw_row(
1400        raw_row: &'a RawRow,
1401        model: &'static EntityModel,
1402    ) -> Result<Self, InternalError> {
1403        let field_bytes = StructuralRowFieldBytes::from_raw_row(raw_row, model)
1404            .map_err(StructuralRowDecodeError::into_internal_error)?;
1405        let cached_values = std::iter::repeat_with(|| CachedSlotValue::Pending)
1406            .take(model.fields().len())
1407            .collect();
1408        let mut reader = Self {
1409            model,
1410            field_bytes,
1411            cached_values,
1412        };
1413
1414        // Phase 1: force every declared slot through the owned field decode
1415        // contract once so malformed but unreferenced payloads cannot stay
1416        // latent behind consumer-specific partial decode paths.
1417        reader.decode_all_declared_slots()?;
1418
1419        Ok(reader)
1420    }
1421
1422    /// Validate the decoded primary-key slot against the authoritative row key.
1423    pub(in crate::db) fn validate_storage_key(
1424        &self,
1425        data_key: &DataKey,
1426    ) -> Result<(), InternalError> {
1427        let Some(primary_key_slot) = resolve_primary_key_slot(self.model) else {
1428            return Err(InternalError::persisted_row_primary_key_field_missing(
1429                self.model.path(),
1430            ));
1431        };
1432        let field = self.field_model(primary_key_slot)?;
1433        let decoded_key = match self.get_scalar(primary_key_slot)? {
1434            Some(ScalarSlotValueRef::Null) => None,
1435            Some(ScalarSlotValueRef::Value(value)) => storage_key_from_scalar_ref(value),
1436            None => Some(
1437                decode_storage_key_field_bytes(
1438                    self.required_field_bytes(primary_key_slot, field.name())?,
1439                    field.kind,
1440                )
1441                .map_err(|err| {
1442                    InternalError::persisted_row_primary_key_not_storage_encodable(data_key, err)
1443                })?,
1444            ),
1445        };
1446        let Some(decoded_key) = decoded_key else {
1447            return Err(InternalError::persisted_row_primary_key_slot_missing(
1448                data_key,
1449            ));
1450        };
1451        let expected_key = data_key.storage_key();
1452
1453        if decoded_key != expected_key {
1454            return Err(InternalError::persisted_row_key_mismatch(
1455                expected_key,
1456                decoded_key,
1457            ));
1458        }
1459
1460        Ok(())
1461    }
1462
1463    // Resolve one field model entry by stable slot index.
1464    fn field_model(&self, slot: usize) -> Result<&FieldModel, InternalError> {
1465        field_model_for_slot(self.model, slot)
1466    }
1467
1468    // Decode every declared slot exactly once at the structural row boundary so
1469    // later consumers inherit one globally enforced canonical-row contract.
1470    fn decode_all_declared_slots(&mut self) -> Result<(), InternalError> {
1471        for slot in 0..self.model.fields().len() {
1472            let _ = self.get_value(slot)?;
1473        }
1474
1475        Ok(())
1476    }
1477
1478    // Borrow one declared slot payload, treating absence as a persisted-row
1479    // invariant violation instead of a normal structural branch.
1480    pub(in crate::db) fn required_field_bytes(
1481        &self,
1482        slot: usize,
1483        field_name: &str,
1484    ) -> Result<&[u8], InternalError> {
1485        self.field_bytes
1486            .field(slot)
1487            .ok_or_else(|| InternalError::persisted_row_declared_field_missing(field_name))
1488    }
1489}
1490
1491// Convert one scalar slot fast-path value into its storage-key form when the
1492// field kind is storage-key-compatible.
1493const fn storage_key_from_scalar_ref(value: ScalarValueRef<'_>) -> Option<StorageKey> {
1494    match value {
1495        ScalarValueRef::Int(value) => Some(StorageKey::Int(value)),
1496        ScalarValueRef::Principal(value) => Some(StorageKey::Principal(value)),
1497        ScalarValueRef::Subaccount(value) => Some(StorageKey::Subaccount(value)),
1498        ScalarValueRef::Timestamp(value) => Some(StorageKey::Timestamp(value)),
1499        ScalarValueRef::Uint(value) => Some(StorageKey::Uint(value)),
1500        ScalarValueRef::Ulid(value) => Some(StorageKey::Ulid(value)),
1501        ScalarValueRef::Unit => Some(StorageKey::Unit),
1502        _ => None,
1503    }
1504}
1505
1506impl SlotReader for StructuralSlotReader<'_> {
1507    fn model(&self) -> &'static EntityModel {
1508        self.model
1509    }
1510
1511    fn has(&self, slot: usize) -> bool {
1512        self.field_bytes.field(slot).is_some()
1513    }
1514
1515    fn get_bytes(&self, slot: usize) -> Option<&[u8]> {
1516        self.field_bytes.field(slot)
1517    }
1518
1519    fn get_scalar(&self, slot: usize) -> Result<Option<ScalarSlotValueRef<'_>>, InternalError> {
1520        let field = self.field_model(slot)?;
1521
1522        match field.leaf_codec() {
1523            LeafCodec::Scalar(codec) => decode_scalar_slot_value(
1524                self.required_field_bytes(slot, field.name())?,
1525                codec,
1526                field.name(),
1527            )
1528            .map(Some),
1529            LeafCodec::CborFallback => Ok(None),
1530        }
1531    }
1532
1533    fn get_value(&mut self, slot: usize) -> Result<Option<Value>, InternalError> {
1534        let cached = self.cached_values.get(slot).ok_or_else(|| {
1535            InternalError::persisted_row_slot_cache_lookup_out_of_bounds(self.model.path(), slot)
1536        })?;
1537        if let CachedSlotValue::Decoded(value) = cached {
1538            return Ok(Some(value.clone()));
1539        }
1540
1541        let field = self.field_model(slot)?;
1542        let value = decode_slot_value_from_bytes(
1543            self.model,
1544            slot,
1545            self.required_field_bytes(slot, field.name())?,
1546        )?;
1547        self.cached_values[slot] = CachedSlotValue::Decoded(value.clone());
1548
1549        Ok(Some(value))
1550    }
1551}
1552
1553impl CanonicalSlotReader for StructuralSlotReader<'_> {}
1554
1555///
1556/// CachedSlotValue
1557///
1558/// CachedSlotValue tracks whether one slot has already been decoded during the
1559/// current structural row access pass.
1560///
1561
1562enum CachedSlotValue {
1563    Pending,
1564    Decoded(Value),
1565}
1566
1567// Encode one scalar slot value into the canonical prefixed scalar envelope.
1568fn encode_scalar_slot_value(value: ScalarSlotValueRef<'_>) -> Vec<u8> {
1569    match value {
1570        ScalarSlotValueRef::Null => vec![SCALAR_SLOT_PREFIX, SCALAR_SLOT_TAG_NULL],
1571        ScalarSlotValueRef::Value(value) => {
1572            let mut encoded = Vec::new();
1573            encoded.push(SCALAR_SLOT_PREFIX);
1574            encoded.push(SCALAR_SLOT_TAG_VALUE);
1575
1576            match value {
1577                ScalarValueRef::Blob(bytes) => encoded.extend_from_slice(bytes),
1578                ScalarValueRef::Bool(value) => encoded.push(u8::from(value)),
1579                ScalarValueRef::Date(value) => {
1580                    encoded.extend_from_slice(&value.as_days_since_epoch().to_le_bytes());
1581                }
1582                ScalarValueRef::Duration(value) => {
1583                    encoded.extend_from_slice(&value.as_millis().to_le_bytes());
1584                }
1585                ScalarValueRef::Float32(value) => {
1586                    encoded.extend_from_slice(&value.get().to_bits().to_le_bytes());
1587                }
1588                ScalarValueRef::Float64(value) => {
1589                    encoded.extend_from_slice(&value.get().to_bits().to_le_bytes());
1590                }
1591                ScalarValueRef::Int(value) => encoded.extend_from_slice(&value.to_le_bytes()),
1592                ScalarValueRef::Principal(value) => encoded.extend_from_slice(value.as_slice()),
1593                ScalarValueRef::Subaccount(value) => encoded.extend_from_slice(&value.to_bytes()),
1594                ScalarValueRef::Text(value) => encoded.extend_from_slice(value.as_bytes()),
1595                ScalarValueRef::Timestamp(value) => {
1596                    encoded.extend_from_slice(&value.as_millis().to_le_bytes());
1597                }
1598                ScalarValueRef::Uint(value) => encoded.extend_from_slice(&value.to_le_bytes()),
1599                ScalarValueRef::Ulid(value) => encoded.extend_from_slice(&value.to_bytes()),
1600                ScalarValueRef::Unit => {}
1601            }
1602
1603            encoded
1604        }
1605    }
1606}
1607
1608// Split one scalar slot envelope into `NULL` vs payload bytes.
1609fn decode_scalar_slot_payload_body<'a>(
1610    bytes: &'a [u8],
1611    field_name: &'static str,
1612) -> Result<Option<&'a [u8]>, InternalError> {
1613    let Some((&prefix, rest)) = bytes.split_first() else {
1614        return Err(InternalError::persisted_row_field_decode_failed(
1615            field_name,
1616            "empty scalar payload",
1617        ));
1618    };
1619    if prefix != SCALAR_SLOT_PREFIX {
1620        return Err(InternalError::persisted_row_field_decode_failed(
1621            field_name,
1622            format!(
1623                "scalar payload prefix mismatch: expected slot envelope prefix byte 0x{SCALAR_SLOT_PREFIX:02X}, found 0x{prefix:02X}",
1624            ),
1625        ));
1626    }
1627    let Some((&tag, payload)) = rest.split_first() else {
1628        return Err(InternalError::persisted_row_field_decode_failed(
1629            field_name,
1630            "truncated scalar payload tag",
1631        ));
1632    };
1633
1634    match tag {
1635        SCALAR_SLOT_TAG_NULL => {
1636            if !payload.is_empty() {
1637                return Err(InternalError::persisted_row_field_decode_failed(
1638                    field_name,
1639                    "null scalar payload has trailing bytes",
1640                ));
1641            }
1642
1643            Ok(None)
1644        }
1645        SCALAR_SLOT_TAG_VALUE => Ok(Some(payload)),
1646        _ => Err(InternalError::persisted_row_field_decode_failed(
1647            field_name,
1648            format!("invalid scalar payload tag {tag}"),
1649        )),
1650    }
1651}
1652
1653// Decode one scalar slot view using the field-declared scalar codec.
1654#[expect(clippy::too_many_lines)]
1655fn decode_scalar_slot_value<'a>(
1656    bytes: &'a [u8],
1657    codec: ScalarCodec,
1658    field_name: &'static str,
1659) -> Result<ScalarSlotValueRef<'a>, InternalError> {
1660    let Some(payload) = decode_scalar_slot_payload_body(bytes, field_name)? else {
1661        return Ok(ScalarSlotValueRef::Null);
1662    };
1663
1664    let value = match codec {
1665        ScalarCodec::Blob => ScalarValueRef::Blob(payload),
1666        ScalarCodec::Bool => {
1667            let [value] = payload else {
1668                return Err(
1669                    InternalError::persisted_row_field_payload_exact_len_required(
1670                        field_name,
1671                        "bool",
1672                        SCALAR_BOOL_PAYLOAD_LEN,
1673                    ),
1674                );
1675            };
1676            match *value {
1677                SCALAR_BOOL_FALSE_TAG => ScalarValueRef::Bool(false),
1678                SCALAR_BOOL_TRUE_TAG => ScalarValueRef::Bool(true),
1679                _ => {
1680                    return Err(InternalError::persisted_row_field_payload_invalid_byte(
1681                        field_name, "bool", *value,
1682                    ));
1683                }
1684            }
1685        }
1686        ScalarCodec::Date => {
1687            let bytes: [u8; SCALAR_WORD32_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
1688                InternalError::persisted_row_field_payload_exact_len_required(
1689                    field_name,
1690                    "date",
1691                    SCALAR_WORD32_PAYLOAD_LEN,
1692                )
1693            })?;
1694            ScalarValueRef::Date(Date::from_days_since_epoch(i32::from_le_bytes(bytes)))
1695        }
1696        ScalarCodec::Duration => {
1697            let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
1698                InternalError::persisted_row_field_payload_exact_len_required(
1699                    field_name,
1700                    "duration",
1701                    SCALAR_WORD64_PAYLOAD_LEN,
1702                )
1703            })?;
1704            ScalarValueRef::Duration(Duration::from_millis(u64::from_le_bytes(bytes)))
1705        }
1706        ScalarCodec::Float32 => {
1707            let bytes: [u8; SCALAR_WORD32_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
1708                InternalError::persisted_row_field_payload_exact_len_required(
1709                    field_name,
1710                    "float32",
1711                    SCALAR_WORD32_PAYLOAD_LEN,
1712                )
1713            })?;
1714            let value = f32::from_bits(u32::from_le_bytes(bytes));
1715            let value = Float32::try_new(value).ok_or_else(|| {
1716                InternalError::persisted_row_field_payload_non_finite(field_name, "float32")
1717            })?;
1718            ScalarValueRef::Float32(value)
1719        }
1720        ScalarCodec::Float64 => {
1721            let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
1722                InternalError::persisted_row_field_payload_exact_len_required(
1723                    field_name,
1724                    "float64",
1725                    SCALAR_WORD64_PAYLOAD_LEN,
1726                )
1727            })?;
1728            let value = f64::from_bits(u64::from_le_bytes(bytes));
1729            let value = Float64::try_new(value).ok_or_else(|| {
1730                InternalError::persisted_row_field_payload_non_finite(field_name, "float64")
1731            })?;
1732            ScalarValueRef::Float64(value)
1733        }
1734        ScalarCodec::Int64 => {
1735            let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
1736                InternalError::persisted_row_field_payload_exact_len_required(
1737                    field_name,
1738                    "int",
1739                    SCALAR_WORD64_PAYLOAD_LEN,
1740                )
1741            })?;
1742            ScalarValueRef::Int(i64::from_le_bytes(bytes))
1743        }
1744        ScalarCodec::Principal => ScalarValueRef::Principal(
1745            Principal::try_from_bytes(payload)
1746                .map_err(|err| InternalError::persisted_row_field_decode_failed(field_name, err))?,
1747        ),
1748        ScalarCodec::Subaccount => {
1749            let bytes: [u8; SCALAR_SUBACCOUNT_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
1750                InternalError::persisted_row_field_payload_exact_len_required(
1751                    field_name,
1752                    "subaccount",
1753                    SCALAR_SUBACCOUNT_PAYLOAD_LEN,
1754                )
1755            })?;
1756            ScalarValueRef::Subaccount(Subaccount::from_array(bytes))
1757        }
1758        ScalarCodec::Text => {
1759            let value = str::from_utf8(payload).map_err(|err| {
1760                InternalError::persisted_row_field_text_payload_invalid_utf8(field_name, err)
1761            })?;
1762            ScalarValueRef::Text(value)
1763        }
1764        ScalarCodec::Timestamp => {
1765            let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
1766                InternalError::persisted_row_field_payload_exact_len_required(
1767                    field_name,
1768                    "timestamp",
1769                    SCALAR_WORD64_PAYLOAD_LEN,
1770                )
1771            })?;
1772            ScalarValueRef::Timestamp(Timestamp::from_millis(i64::from_le_bytes(bytes)))
1773        }
1774        ScalarCodec::Uint64 => {
1775            let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
1776                InternalError::persisted_row_field_payload_exact_len_required(
1777                    field_name,
1778                    "uint",
1779                    SCALAR_WORD64_PAYLOAD_LEN,
1780                )
1781            })?;
1782            ScalarValueRef::Uint(u64::from_le_bytes(bytes))
1783        }
1784        ScalarCodec::Ulid => {
1785            let bytes: [u8; SCALAR_ULID_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
1786                InternalError::persisted_row_field_payload_exact_len_required(
1787                    field_name,
1788                    "ulid",
1789                    SCALAR_ULID_PAYLOAD_LEN,
1790                )
1791            })?;
1792            ScalarValueRef::Ulid(Ulid::from_bytes(bytes))
1793        }
1794        ScalarCodec::Unit => {
1795            if !payload.is_empty() {
1796                return Err(InternalError::persisted_row_field_payload_must_be_empty(
1797                    field_name, "unit",
1798                ));
1799            }
1800            ScalarValueRef::Unit
1801        }
1802    };
1803
1804    Ok(ScalarSlotValueRef::Value(value))
1805}
1806
1807macro_rules! impl_persisted_scalar_signed {
1808    ($($ty:ty),* $(,)?) => {
1809        $(
1810            impl PersistedScalar for $ty {
1811                const CODEC: ScalarCodec = ScalarCodec::Int64;
1812
1813                fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1814                    Ok(i64::from(*self).to_le_bytes().to_vec())
1815                }
1816
1817                fn decode_scalar_payload(
1818                    bytes: &[u8],
1819                    field_name: &'static str,
1820                ) -> Result<Self, InternalError> {
1821                    let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
1822                        InternalError::persisted_row_field_payload_exact_len_required(
1823                            field_name,
1824                            "int",
1825                            SCALAR_WORD64_PAYLOAD_LEN,
1826                        )
1827                    })?;
1828                    <$ty>::try_from(i64::from_le_bytes(raw)).map_err(|_| {
1829                        InternalError::persisted_row_field_payload_out_of_range(
1830                            field_name,
1831                            "integer",
1832                        )
1833                    })
1834                }
1835            }
1836        )*
1837    };
1838}
1839
1840macro_rules! impl_persisted_scalar_unsigned {
1841    ($($ty:ty),* $(,)?) => {
1842        $(
1843            impl PersistedScalar for $ty {
1844                const CODEC: ScalarCodec = ScalarCodec::Uint64;
1845
1846                fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1847                    Ok(u64::from(*self).to_le_bytes().to_vec())
1848                }
1849
1850                fn decode_scalar_payload(
1851                    bytes: &[u8],
1852                    field_name: &'static str,
1853                ) -> Result<Self, InternalError> {
1854                    let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
1855                        InternalError::persisted_row_field_payload_exact_len_required(
1856                            field_name,
1857                            "uint",
1858                            SCALAR_WORD64_PAYLOAD_LEN,
1859                        )
1860                    })?;
1861                    <$ty>::try_from(u64::from_le_bytes(raw)).map_err(|_| {
1862                        InternalError::persisted_row_field_payload_out_of_range(
1863                            field_name,
1864                            "unsigned",
1865                        )
1866                    })
1867                }
1868            }
1869        )*
1870    };
1871}
1872
1873impl_persisted_scalar_signed!(i8, i16, i32, i64);
1874impl_persisted_scalar_unsigned!(u8, u16, u32, u64);
1875
1876impl PersistedScalar for bool {
1877    const CODEC: ScalarCodec = ScalarCodec::Bool;
1878
1879    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1880        Ok(vec![u8::from(*self)])
1881    }
1882
1883    fn decode_scalar_payload(
1884        bytes: &[u8],
1885        field_name: &'static str,
1886    ) -> Result<Self, InternalError> {
1887        let [value] = bytes else {
1888            return Err(
1889                InternalError::persisted_row_field_payload_exact_len_required(
1890                    field_name,
1891                    "bool",
1892                    SCALAR_BOOL_PAYLOAD_LEN,
1893                ),
1894            );
1895        };
1896
1897        match *value {
1898            SCALAR_BOOL_FALSE_TAG => Ok(false),
1899            SCALAR_BOOL_TRUE_TAG => Ok(true),
1900            _ => Err(InternalError::persisted_row_field_payload_invalid_byte(
1901                field_name, "bool", *value,
1902            )),
1903        }
1904    }
1905}
1906
1907impl PersistedScalar for String {
1908    const CODEC: ScalarCodec = ScalarCodec::Text;
1909
1910    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1911        Ok(self.as_bytes().to_vec())
1912    }
1913
1914    fn decode_scalar_payload(
1915        bytes: &[u8],
1916        field_name: &'static str,
1917    ) -> Result<Self, InternalError> {
1918        str::from_utf8(bytes).map(str::to_owned).map_err(|err| {
1919            InternalError::persisted_row_field_text_payload_invalid_utf8(field_name, err)
1920        })
1921    }
1922}
1923
1924impl PersistedScalar for Vec<u8> {
1925    const CODEC: ScalarCodec = ScalarCodec::Blob;
1926
1927    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1928        Ok(self.clone())
1929    }
1930
1931    fn decode_scalar_payload(
1932        bytes: &[u8],
1933        _field_name: &'static str,
1934    ) -> Result<Self, InternalError> {
1935        Ok(bytes.to_vec())
1936    }
1937}
1938
1939impl PersistedScalar for Blob {
1940    const CODEC: ScalarCodec = ScalarCodec::Blob;
1941
1942    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1943        Ok(self.to_vec())
1944    }
1945
1946    fn decode_scalar_payload(
1947        bytes: &[u8],
1948        _field_name: &'static str,
1949    ) -> Result<Self, InternalError> {
1950        Ok(Self::from(bytes))
1951    }
1952}
1953
1954impl PersistedScalar for Ulid {
1955    const CODEC: ScalarCodec = ScalarCodec::Ulid;
1956
1957    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1958        Ok(self.to_bytes().to_vec())
1959    }
1960
1961    fn decode_scalar_payload(
1962        bytes: &[u8],
1963        field_name: &'static str,
1964    ) -> Result<Self, InternalError> {
1965        Self::try_from_bytes(bytes)
1966            .map_err(|err| InternalError::persisted_row_field_decode_failed(field_name, err))
1967    }
1968}
1969
1970impl PersistedScalar for Timestamp {
1971    const CODEC: ScalarCodec = ScalarCodec::Timestamp;
1972
1973    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1974        Ok(self.as_millis().to_le_bytes().to_vec())
1975    }
1976
1977    fn decode_scalar_payload(
1978        bytes: &[u8],
1979        field_name: &'static str,
1980    ) -> Result<Self, InternalError> {
1981        let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
1982            InternalError::persisted_row_field_payload_exact_len_required(
1983                field_name,
1984                "timestamp",
1985                SCALAR_WORD64_PAYLOAD_LEN,
1986            )
1987        })?;
1988
1989        Ok(Self::from_millis(i64::from_le_bytes(raw)))
1990    }
1991}
1992
1993impl PersistedScalar for Date {
1994    const CODEC: ScalarCodec = ScalarCodec::Date;
1995
1996    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1997        Ok(self.as_days_since_epoch().to_le_bytes().to_vec())
1998    }
1999
2000    fn decode_scalar_payload(
2001        bytes: &[u8],
2002        field_name: &'static str,
2003    ) -> Result<Self, InternalError> {
2004        let raw: [u8; SCALAR_WORD32_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
2005            InternalError::persisted_row_field_payload_exact_len_required(
2006                field_name,
2007                "date",
2008                SCALAR_WORD32_PAYLOAD_LEN,
2009            )
2010        })?;
2011
2012        Ok(Self::from_days_since_epoch(i32::from_le_bytes(raw)))
2013    }
2014}
2015
2016impl PersistedScalar for Duration {
2017    const CODEC: ScalarCodec = ScalarCodec::Duration;
2018
2019    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
2020        Ok(self.as_millis().to_le_bytes().to_vec())
2021    }
2022
2023    fn decode_scalar_payload(
2024        bytes: &[u8],
2025        field_name: &'static str,
2026    ) -> Result<Self, InternalError> {
2027        let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
2028            InternalError::persisted_row_field_payload_exact_len_required(
2029                field_name,
2030                "duration",
2031                SCALAR_WORD64_PAYLOAD_LEN,
2032            )
2033        })?;
2034
2035        Ok(Self::from_millis(u64::from_le_bytes(raw)))
2036    }
2037}
2038
2039impl PersistedScalar for Float32 {
2040    const CODEC: ScalarCodec = ScalarCodec::Float32;
2041
2042    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
2043        Ok(self.get().to_bits().to_le_bytes().to_vec())
2044    }
2045
2046    fn decode_scalar_payload(
2047        bytes: &[u8],
2048        field_name: &'static str,
2049    ) -> Result<Self, InternalError> {
2050        let raw: [u8; SCALAR_WORD32_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
2051            InternalError::persisted_row_field_payload_exact_len_required(
2052                field_name,
2053                "float32",
2054                SCALAR_WORD32_PAYLOAD_LEN,
2055            )
2056        })?;
2057        let value = f32::from_bits(u32::from_le_bytes(raw));
2058
2059        Self::try_new(value).ok_or_else(|| {
2060            InternalError::persisted_row_field_payload_non_finite(field_name, "float32")
2061        })
2062    }
2063}
2064
2065impl PersistedScalar for Float64 {
2066    const CODEC: ScalarCodec = ScalarCodec::Float64;
2067
2068    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
2069        Ok(self.get().to_bits().to_le_bytes().to_vec())
2070    }
2071
2072    fn decode_scalar_payload(
2073        bytes: &[u8],
2074        field_name: &'static str,
2075    ) -> Result<Self, InternalError> {
2076        let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
2077            InternalError::persisted_row_field_payload_exact_len_required(
2078                field_name,
2079                "float64",
2080                SCALAR_WORD64_PAYLOAD_LEN,
2081            )
2082        })?;
2083        let value = f64::from_bits(u64::from_le_bytes(raw));
2084
2085        Self::try_new(value).ok_or_else(|| {
2086            InternalError::persisted_row_field_payload_non_finite(field_name, "float64")
2087        })
2088    }
2089}
2090
2091impl PersistedScalar for Principal {
2092    const CODEC: ScalarCodec = ScalarCodec::Principal;
2093
2094    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
2095        self.to_bytes()
2096            .map_err(|err| InternalError::persisted_row_field_encode_failed("principal", err))
2097    }
2098
2099    fn decode_scalar_payload(
2100        bytes: &[u8],
2101        field_name: &'static str,
2102    ) -> Result<Self, InternalError> {
2103        Self::try_from_bytes(bytes)
2104            .map_err(|err| InternalError::persisted_row_field_decode_failed(field_name, err))
2105    }
2106}
2107
2108impl PersistedScalar for Subaccount {
2109    const CODEC: ScalarCodec = ScalarCodec::Subaccount;
2110
2111    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
2112        Ok(self.to_bytes().to_vec())
2113    }
2114
2115    fn decode_scalar_payload(
2116        bytes: &[u8],
2117        field_name: &'static str,
2118    ) -> Result<Self, InternalError> {
2119        let raw: [u8; SCALAR_SUBACCOUNT_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
2120            InternalError::persisted_row_field_payload_exact_len_required(
2121                field_name,
2122                "subaccount",
2123                SCALAR_SUBACCOUNT_PAYLOAD_LEN,
2124            )
2125        })?;
2126
2127        Ok(Self::from_array(raw))
2128    }
2129}
2130
2131impl PersistedScalar for () {
2132    const CODEC: ScalarCodec = ScalarCodec::Unit;
2133
2134    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
2135        Ok(Vec::new())
2136    }
2137
2138    fn decode_scalar_payload(
2139        bytes: &[u8],
2140        field_name: &'static str,
2141    ) -> Result<Self, InternalError> {
2142        if !bytes.is_empty() {
2143            return Err(InternalError::persisted_row_field_payload_must_be_empty(
2144                field_name, "unit",
2145            ));
2146        }
2147
2148        Ok(())
2149    }
2150}
2151
2152///
2153/// TESTS
2154///
2155
2156#[cfg(test)]
2157mod tests {
2158    use super::{
2159        FieldSlot, ScalarSlotValueRef, ScalarValueRef, SerializedFieldUpdate,
2160        SerializedPatchWriter, SerializedUpdatePatch, SlotBufferWriter, SlotReader, SlotWriter,
2161        UpdatePatch, apply_serialized_update_patch_to_raw_row, apply_update_patch_to_raw_row,
2162        decode_slot_value_by_contract, decode_slot_value_from_bytes, encode_scalar_slot_value,
2163        encode_slot_payload_from_parts, encode_slot_value_from_value,
2164        serialize_entity_slots_as_update_patch, serialize_update_patch_fields,
2165    };
2166    use crate::{
2167        db::{
2168            codec::serialize_row_payload,
2169            data::{RawRow, StructuralSlotReader},
2170        },
2171        error::InternalError,
2172        model::{
2173            EntityModel,
2174            field::{EnumVariantModel, FieldKind, FieldModel, FieldStorageDecode},
2175        },
2176        testing::SIMPLE_ENTITY_TAG,
2177        traits::EntitySchema,
2178        types::{Account, Principal, Subaccount},
2179        value::{Value, ValueEnum},
2180    };
2181    use icydb_derive::{FieldProjection, PersistedRow};
2182    use serde::{Deserialize, Serialize};
2183
2184    crate::test_canister! {
2185        ident = PersistedRowPatchBridgeCanister,
2186        commit_memory_id = crate::testing::test_commit_memory_id(),
2187    }
2188
2189    crate::test_store! {
2190        ident = PersistedRowPatchBridgeStore,
2191        canister = PersistedRowPatchBridgeCanister,
2192    }
2193
2194    ///
2195    /// PersistedRowPatchBridgeEntity
2196    ///
2197    /// PersistedRowPatchBridgeEntity
2198    ///
2199    /// PersistedRowPatchBridgeEntity is the smallest derive-owned entity used
2200    /// to validate the typed-entity -> serialized-patch bridge.
2201    /// It lets the persisted-row tests exercise the same dense slot writer the
2202    /// save/update path now uses.
2203    ///
2204
2205    #[derive(
2206        Clone, Debug, Default, Deserialize, FieldProjection, PartialEq, PersistedRow, Serialize,
2207    )]
2208    struct PersistedRowPatchBridgeEntity {
2209        id: crate::types::Ulid,
2210        name: String,
2211    }
2212
2213    crate::test_entity_schema! {
2214        ident = PersistedRowPatchBridgeEntity,
2215        id = crate::types::Ulid,
2216        id_field = id,
2217        entity_name = "PersistedRowPatchBridgeEntity",
2218        entity_tag = SIMPLE_ENTITY_TAG,
2219        pk_index = 0,
2220        fields = [
2221            ("id", FieldKind::Ulid),
2222            ("name", FieldKind::Text),
2223        ],
2224        indexes = [],
2225        store = PersistedRowPatchBridgeStore,
2226        canister = PersistedRowPatchBridgeCanister,
2227    }
2228
2229    static STATE_VARIANTS: &[EnumVariantModel] = &[EnumVariantModel::new(
2230        "Loaded",
2231        Some(&FieldKind::Uint),
2232        FieldStorageDecode::ByKind,
2233    )];
2234    static FIELD_MODELS: [FieldModel; 2] = [
2235        FieldModel::new("name", FieldKind::Text),
2236        FieldModel::new_with_storage_decode("payload", FieldKind::Text, FieldStorageDecode::Value),
2237    ];
2238    static LIST_FIELD_MODELS: [FieldModel; 1] =
2239        [FieldModel::new("tags", FieldKind::List(&FieldKind::Text))];
2240    static MAP_FIELD_MODELS: [FieldModel; 1] = [FieldModel::new(
2241        "props",
2242        FieldKind::Map {
2243            key: &FieldKind::Text,
2244            value: &FieldKind::Uint,
2245        },
2246    )];
2247    static ENUM_FIELD_MODELS: [FieldModel; 1] = [FieldModel::new(
2248        "state",
2249        FieldKind::Enum {
2250            path: "tests::State",
2251            variants: STATE_VARIANTS,
2252        },
2253    )];
2254    static ACCOUNT_FIELD_MODELS: [FieldModel; 1] = [FieldModel::new("owner", FieldKind::Account)];
2255    static INDEX_MODELS: [&crate::model::index::IndexModel; 0] = [];
2256    static TEST_MODEL: EntityModel = EntityModel::new(
2257        "tests::PersistedRowFieldCodecEntity",
2258        "persisted_row_field_codec_entity",
2259        &FIELD_MODELS[0],
2260        &FIELD_MODELS,
2261        &INDEX_MODELS,
2262    );
2263    static LIST_MODEL: EntityModel = EntityModel::new(
2264        "tests::PersistedRowListFieldCodecEntity",
2265        "persisted_row_list_field_codec_entity",
2266        &LIST_FIELD_MODELS[0],
2267        &LIST_FIELD_MODELS,
2268        &INDEX_MODELS,
2269    );
2270    static MAP_MODEL: EntityModel = EntityModel::new(
2271        "tests::PersistedRowMapFieldCodecEntity",
2272        "persisted_row_map_field_codec_entity",
2273        &MAP_FIELD_MODELS[0],
2274        &MAP_FIELD_MODELS,
2275        &INDEX_MODELS,
2276    );
2277    static ENUM_MODEL: EntityModel = EntityModel::new(
2278        "tests::PersistedRowEnumFieldCodecEntity",
2279        "persisted_row_enum_field_codec_entity",
2280        &ENUM_FIELD_MODELS[0],
2281        &ENUM_FIELD_MODELS,
2282        &INDEX_MODELS,
2283    );
2284    static ACCOUNT_MODEL: EntityModel = EntityModel::new(
2285        "tests::PersistedRowAccountFieldCodecEntity",
2286        "persisted_row_account_field_codec_entity",
2287        &ACCOUNT_FIELD_MODELS[0],
2288        &ACCOUNT_FIELD_MODELS,
2289        &INDEX_MODELS,
2290    );
2291
2292    fn encode_slot_payload_allowing_missing_for_tests(
2293        model: &'static EntityModel,
2294        slots: &[Option<&[u8]>],
2295    ) -> Result<Vec<u8>, InternalError> {
2296        if slots.len() != model.fields().len() {
2297            return Err(InternalError::persisted_row_encode_failed(format!(
2298                "noncanonical slot payload test helper expected {} slots for entity '{}', found {}",
2299                model.fields().len(),
2300                model.path(),
2301                slots.len()
2302            )));
2303        }
2304        let mut payload_bytes = Vec::new();
2305        let mut slot_table = Vec::with_capacity(slots.len());
2306
2307        for slot_payload in slots {
2308            match slot_payload {
2309                Some(bytes) => {
2310                    let start = u32::try_from(payload_bytes.len()).map_err(|_| {
2311                        InternalError::persisted_row_encode_failed(
2312                            "slot payload start exceeds u32 range",
2313                        )
2314                    })?;
2315                    let len = u32::try_from(bytes.len()).map_err(|_| {
2316                        InternalError::persisted_row_encode_failed(
2317                            "slot payload length exceeds u32 range",
2318                        )
2319                    })?;
2320                    payload_bytes.extend_from_slice(bytes);
2321                    slot_table.push((start, len));
2322                }
2323                None => slot_table.push((0_u32, 0_u32)),
2324            }
2325        }
2326
2327        encode_slot_payload_from_parts(slots.len(), slot_table.as_slice(), payload_bytes.as_slice())
2328    }
2329
2330    #[test]
2331    fn decode_slot_value_from_bytes_decodes_scalar_slots_through_one_owner() {
2332        let payload =
2333            encode_scalar_slot_value(ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")));
2334        let value =
2335            decode_slot_value_from_bytes(&TEST_MODEL, 0, payload.as_slice()).expect("decode slot");
2336
2337        assert_eq!(value, Value::Text("Ada".to_string()));
2338    }
2339
2340    #[test]
2341    fn decode_slot_value_from_bytes_reports_scalar_prefix_bytes() {
2342        let err = decode_slot_value_from_bytes(&TEST_MODEL, 0, &[0x00, 1])
2343            .expect_err("invalid scalar slot prefix should fail closed");
2344
2345        assert!(
2346            err.message
2347                .contains("expected slot envelope prefix byte 0xFF, found 0x00"),
2348            "unexpected error: {err:?}"
2349        );
2350    }
2351
2352    #[test]
2353    fn decode_slot_value_from_bytes_respects_value_storage_decode_contract() {
2354        let payload = crate::serialize::serialize(&Value::Text("Ada".to_string()))
2355            .expect("encode value-storage payload");
2356
2357        let value =
2358            decode_slot_value_from_bytes(&TEST_MODEL, 1, payload.as_slice()).expect("decode slot");
2359
2360        assert_eq!(value, Value::Text("Ada".to_string()));
2361    }
2362
2363    #[test]
2364    fn encode_slot_value_from_value_roundtrips_scalar_slots() {
2365        let payload = encode_slot_value_from_value(&TEST_MODEL, 0, &Value::Text("Ada".to_string()))
2366            .expect("encode slot");
2367        let decoded =
2368            decode_slot_value_from_bytes(&TEST_MODEL, 0, payload.as_slice()).expect("decode slot");
2369
2370        assert_eq!(decoded, Value::Text("Ada".to_string()));
2371    }
2372
2373    #[test]
2374    fn encode_slot_value_from_value_roundtrips_value_storage_slots() {
2375        let payload = encode_slot_value_from_value(&TEST_MODEL, 1, &Value::Text("Ada".to_string()))
2376            .expect("encode slot");
2377        let decoded =
2378            decode_slot_value_from_bytes(&TEST_MODEL, 1, payload.as_slice()).expect("decode slot");
2379
2380        assert_eq!(decoded, Value::Text("Ada".to_string()));
2381    }
2382
2383    #[test]
2384    fn encode_slot_value_from_value_roundtrips_list_by_kind_slots() {
2385        let payload = encode_slot_value_from_value(
2386            &LIST_MODEL,
2387            0,
2388            &Value::List(vec![Value::Text("alpha".to_string())]),
2389        )
2390        .expect("encode list slot");
2391        let decoded =
2392            decode_slot_value_from_bytes(&LIST_MODEL, 0, payload.as_slice()).expect("decode slot");
2393
2394        assert_eq!(decoded, Value::List(vec![Value::Text("alpha".to_string())]),);
2395    }
2396
2397    #[test]
2398    fn encode_slot_value_from_value_roundtrips_map_by_kind_slots() {
2399        let payload = encode_slot_value_from_value(
2400            &MAP_MODEL,
2401            0,
2402            &Value::Map(vec![(Value::Text("alpha".to_string()), Value::Uint(7))]),
2403        )
2404        .expect("encode map slot");
2405        let decoded =
2406            decode_slot_value_from_bytes(&MAP_MODEL, 0, payload.as_slice()).expect("decode slot");
2407
2408        assert_eq!(
2409            decoded,
2410            Value::Map(vec![(Value::Text("alpha".to_string()), Value::Uint(7))]),
2411        );
2412    }
2413
2414    #[test]
2415    fn encode_slot_value_from_value_roundtrips_enum_by_kind_slots() {
2416        let payload = encode_slot_value_from_value(
2417            &ENUM_MODEL,
2418            0,
2419            &Value::Enum(
2420                ValueEnum::new("Loaded", Some("tests::State")).with_payload(Value::Uint(7)),
2421            ),
2422        )
2423        .expect("encode enum slot");
2424        let decoded =
2425            decode_slot_value_from_bytes(&ENUM_MODEL, 0, payload.as_slice()).expect("decode slot");
2426
2427        assert_eq!(
2428            decoded,
2429            Value::Enum(
2430                ValueEnum::new("Loaded", Some("tests::State")).with_payload(Value::Uint(7,))
2431            ),
2432        );
2433    }
2434
2435    #[test]
2436    fn encode_slot_value_from_value_roundtrips_leaf_by_kind_wrapper_slots() {
2437        let account = Account::from_parts(Principal::dummy(7), Some(Subaccount::from([7_u8; 32])));
2438        let payload = encode_slot_value_from_value(&ACCOUNT_MODEL, 0, &Value::Account(account))
2439            .expect("encode account slot");
2440        let decoded = decode_slot_value_from_bytes(&ACCOUNT_MODEL, 0, payload.as_slice())
2441            .expect("decode slot");
2442
2443        assert_eq!(decoded, Value::Account(account));
2444    }
2445
2446    #[test]
2447    fn encode_slot_value_from_value_rejects_unknown_enum_payload_variants() {
2448        let err = encode_slot_value_from_value(
2449            &ENUM_MODEL,
2450            0,
2451            &Value::Enum(
2452                ValueEnum::new("Unknown", Some("tests::State")).with_payload(Value::Uint(7)),
2453            ),
2454        )
2455        .expect_err("unknown enum payload should fail closed");
2456
2457        assert!(err.message.contains("unknown enum variant"));
2458    }
2459
2460    #[test]
2461    fn structural_slot_reader_and_direct_decode_share_the_same_field_codec_boundary() {
2462        let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
2463        let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
2464            .expect("encode value-storage payload");
2465        writer
2466            .write_scalar(0, ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")))
2467            .expect("write scalar slot");
2468        writer
2469            .write_slot(1, Some(payload.as_slice()))
2470            .expect("write value-storage slot");
2471        let raw_row = RawRow::try_new(
2472            serialize_row_payload(writer.finish().expect("finish slot payload"))
2473                .expect("serialize row payload"),
2474        )
2475        .expect("build raw row");
2476
2477        let direct_slots =
2478            StructuralSlotReader::from_raw_row(&raw_row, &TEST_MODEL).expect("decode row");
2479        let mut cached_slots =
2480            StructuralSlotReader::from_raw_row(&raw_row, &TEST_MODEL).expect("decode row");
2481
2482        let direct_name = decode_slot_value_by_contract(&direct_slots, 0).expect("decode name");
2483        let direct_payload =
2484            decode_slot_value_by_contract(&direct_slots, 1).expect("decode payload");
2485        let cached_name = cached_slots.get_value(0).expect("cached name");
2486        let cached_payload = cached_slots.get_value(1).expect("cached payload");
2487
2488        assert_eq!(direct_name, cached_name);
2489        assert_eq!(direct_payload, cached_payload);
2490    }
2491
2492    #[test]
2493    fn apply_update_patch_to_raw_row_updates_only_targeted_slots() {
2494        let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
2495        let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
2496            .expect("encode value-storage payload");
2497        writer
2498            .write_scalar(0, ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")))
2499            .expect("write scalar slot");
2500        writer
2501            .write_slot(1, Some(payload.as_slice()))
2502            .expect("write value-storage slot");
2503        let raw_row = RawRow::try_new(
2504            serialize_row_payload(writer.finish().expect("finish slot payload"))
2505                .expect("serialize row payload"),
2506        )
2507        .expect("build raw row");
2508        let patch = UpdatePatch::new().set(
2509            FieldSlot::from_index(&TEST_MODEL, 0).expect("resolve slot"),
2510            Value::Text("Grace".to_string()),
2511        );
2512
2513        let patched =
2514            apply_update_patch_to_raw_row(&TEST_MODEL, &raw_row, &patch).expect("apply patch");
2515        let mut reader =
2516            StructuralSlotReader::from_raw_row(&patched, &TEST_MODEL).expect("decode row");
2517
2518        assert_eq!(
2519            reader.get_value(0).expect("decode slot"),
2520            Some(Value::Text("Grace".to_string()))
2521        );
2522        assert_eq!(
2523            reader.get_value(1).expect("decode slot"),
2524            Some(Value::Text("payload".to_string()))
2525        );
2526    }
2527
2528    #[test]
2529    fn serialize_update_patch_fields_encodes_canonical_slot_payloads() {
2530        let patch = UpdatePatch::new()
2531            .set(
2532                FieldSlot::from_index(&TEST_MODEL, 0).expect("resolve slot"),
2533                Value::Text("Grace".to_string()),
2534            )
2535            .set(
2536                FieldSlot::from_index(&TEST_MODEL, 1).expect("resolve slot"),
2537                Value::Text("payload".to_string()),
2538            );
2539
2540        let serialized =
2541            serialize_update_patch_fields(&TEST_MODEL, &patch).expect("serialize patch");
2542
2543        assert_eq!(serialized.entries().len(), 2);
2544        assert_eq!(
2545            decode_slot_value_from_bytes(
2546                &TEST_MODEL,
2547                serialized.entries()[0].slot().index(),
2548                serialized.entries()[0].payload(),
2549            )
2550            .expect("decode slot payload"),
2551            Value::Text("Grace".to_string())
2552        );
2553        assert_eq!(
2554            decode_slot_value_from_bytes(
2555                &TEST_MODEL,
2556                serialized.entries()[1].slot().index(),
2557                serialized.entries()[1].payload(),
2558            )
2559            .expect("decode slot payload"),
2560            Value::Text("payload".to_string())
2561        );
2562    }
2563
2564    #[test]
2565    fn serialized_patch_writer_rejects_clear_slots() {
2566        let mut writer = SerializedPatchWriter::for_model(&TEST_MODEL);
2567
2568        let err = writer
2569            .write_slot(0, None)
2570            .expect_err("0.65 patch staging must reject missing-slot clears");
2571
2572        assert!(
2573            err.message
2574                .contains("serialized patch writer cannot clear slot 0"),
2575            "unexpected error: {err:?}"
2576        );
2577        assert!(
2578            err.message.contains(TEST_MODEL.path()),
2579            "unexpected error: {err:?}"
2580        );
2581    }
2582
2583    #[test]
2584    fn slot_buffer_writer_rejects_clear_slots() {
2585        let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
2586
2587        let err = writer
2588            .write_slot(0, None)
2589            .expect_err("canonical row staging must reject missing-slot clears");
2590
2591        assert!(
2592            err.message
2593                .contains("slot buffer writer cannot clear slot 0"),
2594            "unexpected error: {err:?}"
2595        );
2596        assert!(
2597            err.message.contains(TEST_MODEL.path()),
2598            "unexpected error: {err:?}"
2599        );
2600    }
2601
2602    #[test]
2603    fn apply_update_patch_to_raw_row_uses_last_write_wins() {
2604        let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
2605        let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
2606            .expect("encode value-storage payload");
2607        writer
2608            .write_scalar(0, ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")))
2609            .expect("write scalar slot");
2610        writer
2611            .write_slot(1, Some(payload.as_slice()))
2612            .expect("write value-storage slot");
2613        let raw_row = RawRow::try_new(
2614            serialize_row_payload(writer.finish().expect("finish slot payload"))
2615                .expect("serialize row payload"),
2616        )
2617        .expect("build raw row");
2618        let slot = FieldSlot::from_index(&TEST_MODEL, 0).expect("resolve slot");
2619        let patch = UpdatePatch::new()
2620            .set(slot, Value::Text("Grace".to_string()))
2621            .set(slot, Value::Text("Lin".to_string()));
2622
2623        let patched =
2624            apply_update_patch_to_raw_row(&TEST_MODEL, &raw_row, &patch).expect("apply patch");
2625        let mut reader =
2626            StructuralSlotReader::from_raw_row(&patched, &TEST_MODEL).expect("decode row");
2627
2628        assert_eq!(
2629            reader.get_value(0).expect("decode slot"),
2630            Some(Value::Text("Lin".to_string()))
2631        );
2632    }
2633
2634    #[test]
2635    fn apply_update_patch_to_raw_row_rejects_noncanonical_missing_slot_baseline() {
2636        let empty_slots = vec![None::<&[u8]>; TEST_MODEL.fields().len()];
2637        let raw_row = RawRow::try_new(
2638            serialize_row_payload(
2639                encode_slot_payload_allowing_missing_for_tests(&TEST_MODEL, empty_slots.as_slice())
2640                    .expect("encode malformed slot payload"),
2641            )
2642            .expect("serialize row payload"),
2643        )
2644        .expect("build raw row");
2645        let patch = UpdatePatch::new().set(
2646            FieldSlot::from_index(&TEST_MODEL, 1).expect("resolve slot"),
2647            Value::Text("payload".to_string()),
2648        );
2649
2650        let err = apply_update_patch_to_raw_row(&TEST_MODEL, &raw_row, &patch)
2651            .expect_err("noncanonical rows with missing slots must fail closed");
2652
2653        assert_eq!(
2654            err.message,
2655            "row decode failed: missing slot payload: slot=0"
2656        );
2657    }
2658
2659    #[test]
2660    fn apply_serialized_update_patch_to_raw_row_rejects_noncanonical_scalar_baseline() {
2661        let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
2662            .expect("encode value-storage payload");
2663        let malformed_slots = [Some([0xF6].as_slice()), Some(payload.as_slice())];
2664        let raw_row = RawRow::try_new(
2665            serialize_row_payload(
2666                encode_slot_payload_allowing_missing_for_tests(&TEST_MODEL, &malformed_slots)
2667                    .expect("encode malformed slot payload"),
2668            )
2669            .expect("serialize row payload"),
2670        )
2671        .expect("build raw row");
2672        let patch = UpdatePatch::new().set(
2673            FieldSlot::from_index(&TEST_MODEL, 1).expect("resolve slot"),
2674            Value::Text("patched".to_string()),
2675        );
2676        let serialized =
2677            serialize_update_patch_fields(&TEST_MODEL, &patch).expect("serialize patch");
2678
2679        let err = apply_serialized_update_patch_to_raw_row(&TEST_MODEL, &raw_row, &serialized)
2680            .expect_err("noncanonical scalar baseline must fail closed");
2681
2682        assert!(
2683            err.message.contains("field 'name'"),
2684            "unexpected error: {err:?}"
2685        );
2686        assert!(
2687            err.message
2688                .contains("expected slot envelope prefix byte 0xFF, found 0xF6"),
2689            "unexpected error: {err:?}"
2690        );
2691    }
2692
2693    #[test]
2694    fn apply_serialized_update_patch_to_raw_row_rejects_noncanonical_scalar_patch_payload() {
2695        let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
2696        let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
2697            .expect("encode value-storage payload");
2698        writer
2699            .write_scalar(0, ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")))
2700            .expect("write scalar slot");
2701        writer
2702            .write_slot(1, Some(payload.as_slice()))
2703            .expect("write value-storage slot");
2704        let raw_row = RawRow::try_new(
2705            serialize_row_payload(writer.finish().expect("finish slot payload"))
2706                .expect("serialize row payload"),
2707        )
2708        .expect("build raw row");
2709        let serialized = SerializedUpdatePatch::new(vec![SerializedFieldUpdate::new(
2710            FieldSlot::from_index(&TEST_MODEL, 0).expect("resolve slot"),
2711            vec![0xF6],
2712        )]);
2713
2714        let err = apply_serialized_update_patch_to_raw_row(&TEST_MODEL, &raw_row, &serialized)
2715            .expect_err("noncanonical serialized patch payload must fail closed");
2716
2717        assert!(
2718            err.message.contains("field 'name'"),
2719            "unexpected error: {err:?}"
2720        );
2721        assert!(
2722            err.message
2723                .contains("expected slot envelope prefix byte 0xFF, found 0xF6"),
2724            "unexpected error: {err:?}"
2725        );
2726    }
2727
2728    #[test]
2729    fn structural_slot_reader_rejects_slot_count_mismatch() {
2730        let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
2731        let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
2732            .expect("encode payload");
2733        writer
2734            .write_scalar(0, ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")))
2735            .expect("write scalar slot");
2736        writer
2737            .write_slot(1, Some(payload.as_slice()))
2738            .expect("write payload slot");
2739        let mut payload = writer.finish().expect("finish slot payload");
2740        payload[..2].copy_from_slice(&1_u16.to_be_bytes());
2741        let raw_row =
2742            RawRow::try_new(serialize_row_payload(payload).expect("serialize row payload"))
2743                .expect("build raw row");
2744
2745        let err = StructuralSlotReader::from_raw_row(&raw_row, &TEST_MODEL)
2746            .err()
2747            .expect("slot-count drift must fail closed");
2748
2749        assert_eq!(
2750            err.message,
2751            "row decode failed: slot count mismatch: expected 2, found 1"
2752        );
2753    }
2754
2755    #[test]
2756    fn structural_slot_reader_rejects_slot_span_exceeds_payload_length() {
2757        let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
2758        let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
2759            .expect("encode payload");
2760        writer
2761            .write_scalar(0, ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")))
2762            .expect("write scalar slot");
2763        writer
2764            .write_slot(1, Some(payload.as_slice()))
2765            .expect("write payload slot");
2766        let mut payload = writer.finish().expect("finish slot payload");
2767
2768        // Corrupt the second slot span so the payload table points past the
2769        // available data section.
2770        payload[14..18].copy_from_slice(&u32::MAX.to_be_bytes());
2771        let raw_row =
2772            RawRow::try_new(serialize_row_payload(payload).expect("serialize row payload"))
2773                .expect("build raw row");
2774
2775        let err = StructuralSlotReader::from_raw_row(&raw_row, &TEST_MODEL)
2776            .err()
2777            .expect("slot span drift must fail closed");
2778
2779        assert_eq!(
2780            err.message,
2781            "row decode failed: slot span exceeds payload length"
2782        );
2783    }
2784
2785    #[test]
2786    fn apply_serialized_update_patch_to_raw_row_replays_preencoded_slots() {
2787        let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
2788        let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
2789            .expect("encode value-storage payload");
2790        writer
2791            .write_scalar(0, ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")))
2792            .expect("write scalar slot");
2793        writer
2794            .write_slot(1, Some(payload.as_slice()))
2795            .expect("write value-storage slot");
2796        let raw_row = RawRow::try_new(
2797            serialize_row_payload(writer.finish().expect("finish slot payload"))
2798                .expect("serialize row payload"),
2799        )
2800        .expect("build raw row");
2801        let patch = UpdatePatch::new().set(
2802            FieldSlot::from_index(&TEST_MODEL, 0).expect("resolve slot"),
2803            Value::Text("Grace".to_string()),
2804        );
2805        let serialized =
2806            serialize_update_patch_fields(&TEST_MODEL, &patch).expect("serialize patch");
2807
2808        let patched = raw_row
2809            .apply_serialized_update_patch(&TEST_MODEL, &serialized)
2810            .expect("apply serialized patch");
2811        let mut reader =
2812            StructuralSlotReader::from_raw_row(&patched, &TEST_MODEL).expect("decode row");
2813
2814        assert_eq!(
2815            reader.get_value(0).expect("decode slot"),
2816            Some(Value::Text("Grace".to_string()))
2817        );
2818    }
2819
2820    #[test]
2821    fn serialize_entity_slots_as_update_patch_replays_full_typed_after_image() {
2822        let old_entity = PersistedRowPatchBridgeEntity {
2823            id: crate::types::Ulid::from_u128(7),
2824            name: "Ada".to_string(),
2825        };
2826        let new_entity = PersistedRowPatchBridgeEntity {
2827            id: crate::types::Ulid::from_u128(7),
2828            name: "Grace".to_string(),
2829        };
2830        let raw_row = RawRow::from_entity(&old_entity).expect("encode old row");
2831        let old_decoded = raw_row
2832            .try_decode::<PersistedRowPatchBridgeEntity>()
2833            .expect("decode old entity");
2834        let serialized =
2835            serialize_entity_slots_as_update_patch(&new_entity).expect("serialize entity patch");
2836        let direct =
2837            RawRow::from_serialized_update_patch(PersistedRowPatchBridgeEntity::MODEL, &serialized)
2838                .expect("direct row emission should succeed");
2839
2840        let patched = raw_row
2841            .apply_serialized_update_patch(PersistedRowPatchBridgeEntity::MODEL, &serialized)
2842            .expect("apply serialized patch");
2843        let decoded = patched
2844            .try_decode::<PersistedRowPatchBridgeEntity>()
2845            .expect("decode patched entity");
2846
2847        assert_eq!(
2848            direct, patched,
2849            "fresh row emission and replayed full-image patch must converge on identical bytes",
2850        );
2851        assert_eq!(old_decoded, old_entity);
2852        assert_eq!(decoded, new_entity);
2853    }
2854
2855    #[test]
2856    fn canonical_row_from_raw_row_replays_canonical_full_image_bytes() {
2857        let entity = PersistedRowPatchBridgeEntity {
2858            id: crate::types::Ulid::from_u128(11),
2859            name: "Ada".to_string(),
2860        };
2861        let raw_row = RawRow::from_entity(&entity).expect("encode canonical row");
2862        let canonical = crate::db::data::canonical_row_from_raw_row(
2863            PersistedRowPatchBridgeEntity::MODEL,
2864            &raw_row,
2865        )
2866        .expect("canonical re-emission should succeed");
2867
2868        assert_eq!(
2869            canonical.as_bytes(),
2870            raw_row.as_bytes(),
2871            "canonical raw-row rebuild must preserve already canonical row bytes",
2872        );
2873    }
2874
2875    #[test]
2876    fn canonical_row_from_raw_row_rejects_noncanonical_scalar_payload() {
2877        let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
2878            .expect("encode value-storage payload");
2879        let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
2880        writer
2881            .write_slot(0, Some(&[0xF6]))
2882            .expect("write malformed scalar slot");
2883        writer
2884            .write_slot(1, Some(payload.as_slice()))
2885            .expect("write value-storage slot");
2886        let raw_row = RawRow::try_new(
2887            serialize_row_payload(writer.finish().expect("finish slot payload"))
2888                .expect("serialize malformed row"),
2889        )
2890        .expect("build malformed raw row");
2891
2892        let err = crate::db::data::canonical_row_from_raw_row(&TEST_MODEL, &raw_row)
2893            .expect_err("canonical raw-row rebuild must reject malformed scalar payloads");
2894
2895        assert!(
2896            err.message.contains("field 'name'"),
2897            "unexpected error: {err:?}"
2898        );
2899        assert!(
2900            err.message
2901                .contains("expected slot envelope prefix byte 0xFF, found 0xF6"),
2902            "unexpected error: {err:?}"
2903        );
2904    }
2905
2906    #[test]
2907    fn raw_row_from_serialized_update_patch_rejects_noncanonical_scalar_payload() {
2908        let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
2909            .expect("encode value-storage payload");
2910        let serialized = SerializedUpdatePatch::new(vec![
2911            SerializedFieldUpdate::new(
2912                FieldSlot::from_index(&TEST_MODEL, 0).expect("resolve slot"),
2913                vec![0xF6],
2914            ),
2915            SerializedFieldUpdate::new(
2916                FieldSlot::from_index(&TEST_MODEL, 1).expect("resolve slot"),
2917                payload,
2918            ),
2919        ]);
2920
2921        let err = RawRow::from_serialized_update_patch(&TEST_MODEL, &serialized)
2922            .expect_err("fresh row emission must reject noncanonical serialized patch payloads");
2923
2924        assert!(
2925            err.message.contains("field 'name'"),
2926            "unexpected error: {err:?}"
2927        );
2928        assert!(
2929            err.message
2930                .contains("expected slot envelope prefix byte 0xFF, found 0xF6"),
2931            "unexpected error: {err:?}"
2932        );
2933    }
2934
2935    #[test]
2936    fn raw_row_from_serialized_update_patch_rejects_incomplete_slot_image() {
2937        let serialized = SerializedUpdatePatch::new(vec![SerializedFieldUpdate::new(
2938            FieldSlot::from_index(&TEST_MODEL, 1).expect("resolve slot"),
2939            crate::serialize::serialize(&Value::Text("payload".to_string()))
2940                .expect("encode value-storage payload"),
2941        )]);
2942
2943        let err = RawRow::from_serialized_update_patch(&TEST_MODEL, &serialized)
2944            .expect_err("fresh row emission must reject missing declared slots");
2945
2946        assert!(
2947            err.message.contains("serialized patch did not emit slot 0"),
2948            "unexpected error: {err:?}"
2949        );
2950    }
2951}