Skip to main content

icydb_core/db/data/
persisted_row.rs

1//! Module: data::persisted_row
2//! Responsibility: slot-oriented persisted-row seams over runtime row bytes.
3//! Does not own: row envelope versions, typed entity materialization, or query semantics.
4//! Boundary: commit/index planning, row writes, and typed materialization all
5//! consume the canonical slot-oriented persisted-row boundary here.
6
7use crate::{
8    db::{
9        codec::serialize_row_payload,
10        data::{
11            DataKey, RawRow, StructuralRowDecodeError, StructuralRowFieldBytes,
12            decode_storage_key_field_bytes, decode_structural_field_by_kind_bytes,
13            decode_structural_value_storage_bytes,
14        },
15        scalar_expr::compile_scalar_literal_expr_value,
16        schema::{field_type_from_model_kind, literal_matches_type},
17    },
18    error::InternalError,
19    model::{
20        entity::{EntityModel, resolve_field_slot, resolve_primary_key_slot},
21        field::{FieldKind, FieldModel, FieldStorageDecode, LeafCodec, ScalarCodec},
22    },
23    serialize::{deserialize, serialize},
24    traits::EntityKind,
25    types::{Blob, Date, Duration, Float32, Float64, Principal, Subaccount, Timestamp, Ulid},
26    value::{StorageKey, Value, ValueEnum},
27};
28use serde_cbor::{Value as CborValue, value::to_value as to_cbor_value};
29use std::{cmp::Ordering, collections::BTreeMap, str};
30
31const SCALAR_SLOT_PREFIX: u8 = 0xFF;
32const SCALAR_SLOT_TAG_NULL: u8 = 0;
33const SCALAR_SLOT_TAG_VALUE: u8 = 1;
34
35const SCALAR_BOOL_PAYLOAD_LEN: usize = 1;
36const SCALAR_WORD32_PAYLOAD_LEN: usize = 4;
37const SCALAR_WORD64_PAYLOAD_LEN: usize = 8;
38const SCALAR_ULID_PAYLOAD_LEN: usize = 16;
39const SCALAR_SUBACCOUNT_PAYLOAD_LEN: usize = 32;
40
41const SCALAR_BOOL_FALSE_TAG: u8 = 0;
42const SCALAR_BOOL_TRUE_TAG: u8 = 1;
43
44///
45/// FieldSlot
46///
47/// FieldSlot
48///
49/// FieldSlot is the structural stable slot reference used by the `0.64`
50/// patching path.
51/// It intentionally carries only the model-local slot index so field-level
52/// mutation stays structural instead of reintroducing typed entity helpers.
53///
54
55#[allow(dead_code)]
56#[derive(Clone, Copy, Debug, Eq, PartialEq)]
57pub(in crate::db) struct FieldSlot {
58    index: usize,
59}
60
61#[allow(dead_code)]
62impl FieldSlot {
63    /// Resolve one stable field slot by runtime field name.
64    #[must_use]
65    pub(in crate::db) fn resolve(model: &'static EntityModel, field_name: &str) -> Option<Self> {
66        resolve_field_slot(model, field_name).map(|index| Self { index })
67    }
68
69    /// Build one stable field slot from an already validated index.
70    pub(in crate::db) fn from_index(
71        model: &'static EntityModel,
72        index: usize,
73    ) -> Result<Self, InternalError> {
74        field_model_for_slot(model, index)?;
75
76        Ok(Self { index })
77    }
78
79    /// Return the stable slot index inside `EntityModel::fields`.
80    #[must_use]
81    pub(in crate::db) const fn index(self) -> usize {
82        self.index
83    }
84}
85
86///
87/// FieldUpdate
88///
89/// FieldUpdate
90///
91/// FieldUpdate carries one ordered field-level mutation over the structural
92/// persisted-row boundary.
93/// `UpdatePatch` applies these entries in order and last write wins for the
94/// same slot.
95///
96
97#[allow(dead_code)]
98#[derive(Clone, Debug, Eq, PartialEq)]
99pub(in crate::db) struct FieldUpdate {
100    slot: FieldSlot,
101    value: Value,
102}
103
104#[allow(dead_code)]
105impl FieldUpdate {
106    /// Build one field-level structural update.
107    #[must_use]
108    pub(in crate::db) const fn new(slot: FieldSlot, value: Value) -> Self {
109        Self { slot, value }
110    }
111
112    /// Return the stable target slot.
113    #[must_use]
114    pub(in crate::db) const fn slot(&self) -> FieldSlot {
115        self.slot
116    }
117
118    /// Return the runtime value payload for this update.
119    #[must_use]
120    pub(in crate::db) const fn value(&self) -> &Value {
121        &self.value
122    }
123}
124
125///
126/// UpdatePatch
127///
128/// UpdatePatch
129///
130/// UpdatePatch is the ordered structural mutation program applied to one
131/// persisted row.
132/// This is the phase-1 `0.64` patch container: it updates slot values
133/// structurally and then re-encodes the full row.
134///
135
136#[derive(Clone, Debug, Default, Eq, PartialEq)]
137pub struct UpdatePatch {
138    entries: Vec<FieldUpdate>,
139}
140
141impl UpdatePatch {
142    /// Build one empty patch.
143    #[must_use]
144    pub const fn new() -> Self {
145        Self {
146            entries: Vec::new(),
147        }
148    }
149
150    /// Append one structural field update in declaration order.
151    #[must_use]
152    pub(in crate::db) fn set(mut self, slot: FieldSlot, value: Value) -> Self {
153        self.entries.push(FieldUpdate::new(slot, value));
154        self
155    }
156
157    /// Resolve one field name and append its structural update.
158    pub fn set_field(
159        self,
160        model: &'static EntityModel,
161        field_name: &str,
162        value: Value,
163    ) -> Result<Self, InternalError> {
164        let Some(slot) = FieldSlot::resolve(model, field_name) else {
165            return Err(InternalError::mutation_structural_field_unknown(
166                model.path(),
167                field_name,
168            ));
169        };
170
171        Ok(self.set(slot, value))
172    }
173
174    /// Borrow the ordered field updates carried by this patch.
175    #[must_use]
176    pub(in crate::db) const fn entries(&self) -> &[FieldUpdate] {
177        self.entries.as_slice()
178    }
179
180    /// Return whether this patch carries no field updates.
181    #[must_use]
182    pub(in crate::db) const fn is_empty(&self) -> bool {
183        self.entries.is_empty()
184    }
185}
186
187///
188/// SerializedFieldUpdate
189///
190/// SerializedFieldUpdate
191///
192/// SerializedFieldUpdate carries one ordered field-level mutation after the
193/// owning persisted-row field codec has already lowered the runtime `Value`
194/// into canonical slot payload bytes.
195/// This lets later patch-application stages consume one mechanical slot-patch
196/// artifact instead of rebuilding per-field encode dispatch.
197///
198
199#[allow(dead_code)]
200#[derive(Clone, Debug, Eq, PartialEq)]
201pub(in crate::db) struct SerializedFieldUpdate {
202    slot: FieldSlot,
203    payload: Option<Vec<u8>>,
204}
205
206#[allow(dead_code)]
207impl SerializedFieldUpdate {
208    /// Build one serialized structural field update.
209    #[must_use]
210    pub(in crate::db) const fn new(slot: FieldSlot, payload: Option<Vec<u8>>) -> Self {
211        Self { slot, payload }
212    }
213
214    /// Return the stable target slot.
215    #[must_use]
216    pub(in crate::db) const fn slot(&self) -> FieldSlot {
217        self.slot
218    }
219
220    /// Borrow the canonical slot payload bytes for this update when present.
221    #[must_use]
222    pub(in crate::db) fn payload(&self) -> Option<&[u8]> {
223        self.payload.as_deref()
224    }
225}
226
227///
228/// SerializedUpdatePatch
229///
230/// SerializedUpdatePatch
231///
232/// SerializedUpdatePatch is the canonical serialized form of `UpdatePatch`
233/// over persisted-row slot payload bytes.
234/// This is the structural patch artifact later write-path stages can stage or
235/// replay without re-entering field-contract encode logic.
236///
237
238#[allow(dead_code)]
239#[derive(Clone, Debug, Default, Eq, PartialEq)]
240pub(in crate::db) struct SerializedUpdatePatch {
241    entries: Vec<SerializedFieldUpdate>,
242}
243
244#[allow(dead_code)]
245impl SerializedUpdatePatch {
246    /// Build one serialized patch from already encoded slot payloads.
247    #[must_use]
248    pub(in crate::db) const fn new(entries: Vec<SerializedFieldUpdate>) -> Self {
249        Self { entries }
250    }
251
252    /// Borrow the ordered serialized field updates carried by this patch.
253    #[must_use]
254    pub(in crate::db) const fn entries(&self) -> &[SerializedFieldUpdate] {
255        self.entries.as_slice()
256    }
257
258    /// Return whether this serialized patch carries no field updates.
259    #[must_use]
260    pub(in crate::db) const fn is_empty(&self) -> bool {
261        self.entries.is_empty()
262    }
263}
264
265///
266/// SlotReader
267///
268/// SlotReader exposes one persisted row as stable slot-addressable fields.
269/// Callers may inspect field presence, borrow raw field bytes, or decode one
270/// field value on demand.
271///
272
273pub trait SlotReader {
274    /// Return the structural model that owns this slot mapping.
275    fn model(&self) -> &'static EntityModel;
276
277    /// Return whether the given slot is present in the persisted row.
278    fn has(&self, slot: usize) -> bool;
279
280    /// Borrow the raw persisted payload for one slot when present.
281    fn get_bytes(&self, slot: usize) -> Option<&[u8]>;
282
283    /// Decode one slot as a scalar leaf when the field model declares a scalar codec.
284    fn get_scalar(&self, slot: usize) -> Result<Option<ScalarSlotValueRef<'_>>, InternalError>;
285
286    /// Decode one slot value on demand using the field contract declared by the model.
287    fn get_value(&mut self, slot: usize) -> Result<Option<Value>, InternalError>;
288}
289
290///
291/// SlotWriter
292///
293/// SlotWriter is the canonical row-container output seam used by persisted-row
294/// writers.
295///
296
297pub trait SlotWriter {
298    /// Record one slot payload for the current row.
299    fn write_slot(&mut self, slot: usize, payload: Option<&[u8]>) -> Result<(), InternalError>;
300
301    /// Record one scalar slot payload using the canonical scalar leaf envelope.
302    fn write_scalar(
303        &mut self,
304        slot: usize,
305        value: ScalarSlotValueRef<'_>,
306    ) -> Result<(), InternalError> {
307        let payload = encode_scalar_slot_value(value);
308
309        self.write_slot(slot, Some(payload.as_slice()))
310    }
311}
312
313///
314/// PersistedRow
315///
316/// PersistedRow is the derive-owned bridge between typed entities and
317/// slot-addressable persisted rows.
318/// It owns entity-specific materialization/default semantics while runtime
319/// paths stay structural at the row boundary.
320///
321
322pub trait PersistedRow: EntityKind + Sized {
323    /// Materialize one typed entity from one slot reader.
324    fn materialize_from_slots(slots: &mut dyn SlotReader) -> Result<Self, InternalError>;
325
326    /// Write one typed entity into one slot writer.
327    fn write_slots(&self, out: &mut dyn SlotWriter) -> Result<(), InternalError>;
328
329    /// Decode one slot value needed by structural planner/projection consumers.
330    fn project_slot(slots: &mut dyn SlotReader, slot: usize) -> Result<Option<Value>, InternalError>
331    where
332        Self: crate::traits::FieldProjection,
333    {
334        let entity = Self::materialize_from_slots(slots)?;
335
336        Ok(<Self as crate::traits::FieldProjection>::get_value_by_index(&entity, slot))
337    }
338}
339
340/// Decode one slot value through the declared field contract without routing
341/// through `SlotReader::get_value`.
342pub(in crate::db) fn decode_slot_value_by_contract(
343    slots: &dyn SlotReader,
344    slot: usize,
345) -> Result<Option<Value>, InternalError> {
346    let Some(raw_value) = slots.get_bytes(slot) else {
347        return Ok(None);
348    };
349
350    decode_slot_value_from_bytes(slots.model(), slot, raw_value).map(Some)
351}
352
353/// Decode one structural slot payload using the owning model field contract.
354///
355/// This is the canonical field-level decode boundary for persisted-row bytes.
356/// Higher-level row readers may still cache decoded values, but they should not
357/// rebuild scalar-vs-CBOR field dispatch themselves.
358pub(in crate::db) fn decode_slot_value_from_bytes(
359    model: &'static EntityModel,
360    slot: usize,
361    raw_value: &[u8],
362) -> Result<Value, InternalError> {
363    let field = field_model_for_slot(model, slot)?;
364
365    match field.leaf_codec() {
366        LeafCodec::Scalar(codec) => match decode_scalar_slot_value(raw_value, codec, field.name())?
367        {
368            ScalarSlotValueRef::Null => Ok(Value::Null),
369            ScalarSlotValueRef::Value(value) => Ok(value.into_value()),
370        },
371        LeafCodec::CborFallback => decode_non_scalar_slot_value(raw_value, field),
372    }
373}
374
375/// Encode one structural slot value using the owning model field contract.
376///
377/// This is the initial `0.64` write-side field-codec boundary. It currently
378/// covers:
379/// - scalar leaf slots
380/// - `FieldStorageDecode::Value` slots
381///
382/// Composite `ByKind` field encoding remains a follow-up slice so the runtime
383/// can add one structural encoder owner instead of quietly rebuilding typed
384/// per-field branches.
385#[allow(dead_code)]
386pub(in crate::db) fn encode_slot_value_from_value(
387    model: &'static EntityModel,
388    slot: usize,
389    value: &Value,
390) -> Result<Vec<u8>, InternalError> {
391    let field = field_model_for_slot(model, slot)?;
392    ensure_slot_value_matches_field_contract(field, value)?;
393
394    match field.storage_decode() {
395        FieldStorageDecode::Value => serialize(value)
396            .map_err(|err| InternalError::persisted_row_field_encode_failed(field.name(), err)),
397        FieldStorageDecode::ByKind => match field.leaf_codec() {
398            LeafCodec::Scalar(_) => {
399                let scalar = compile_scalar_literal_expr_value(value).ok_or_else(|| {
400                    InternalError::persisted_row_field_encode_failed(
401                        field.name(),
402                        format!(
403                            "field kind {:?} requires a scalar runtime value, found {value:?}",
404                            field.kind()
405                        ),
406                    )
407                })?;
408
409                Ok(encode_scalar_slot_value(scalar.as_slot_value_ref()))
410            }
411            LeafCodec::CborFallback => {
412                encode_structural_field_bytes_by_kind(field.kind(), value, field.name())
413            }
414        },
415    }
416}
417
418/// Apply one ordered structural patch to one raw row using the current
419/// persisted-row field codec authority.
420#[allow(dead_code)]
421pub(in crate::db) fn apply_update_patch_to_raw_row(
422    model: &'static EntityModel,
423    raw_row: &RawRow,
424    patch: &UpdatePatch,
425) -> Result<RawRow, InternalError> {
426    let serialized_patch = serialize_update_patch_fields(model, patch)?;
427
428    apply_serialized_update_patch_to_raw_row(model, raw_row, &serialized_patch)
429}
430
431/// Serialize one ordered structural patch into canonical slot payload bytes.
432///
433/// This is the phase-1 partial-serialization seam for `0.64`: later mutation
434/// stages can stage or replay one field patch without rebuilding the runtime
435/// value-to-bytes contract per consumer.
436#[allow(dead_code)]
437pub(in crate::db) fn serialize_update_patch_fields(
438    model: &'static EntityModel,
439    patch: &UpdatePatch,
440) -> Result<SerializedUpdatePatch, InternalError> {
441    if patch.is_empty() {
442        return Ok(SerializedUpdatePatch::default());
443    }
444
445    let mut entries = Vec::with_capacity(patch.entries().len());
446
447    // Phase 1: validate and encode each ordered field update through the
448    // canonical slot codec owner.
449    for entry in patch.entries() {
450        let slot = entry.slot();
451        let payload = encode_slot_value_from_value(model, slot.index(), entry.value())?;
452        entries.push(SerializedFieldUpdate::new(slot, Some(payload)));
453    }
454
455    Ok(SerializedUpdatePatch::new(entries))
456}
457
458/// Serialize one full typed entity image into the canonical serialized patch
459/// artifact used by row-boundary patch replay.
460///
461/// This keeps typed save/update APIs on the existing surface while moving the
462/// actual after-image staging onto the structural slot-patch boundary.
463#[allow(dead_code)]
464pub(in crate::db) fn serialize_entity_slots_as_update_patch<E>(
465    entity: &E,
466) -> Result<SerializedUpdatePatch, InternalError>
467where
468    E: PersistedRow,
469{
470    let mut writer = SerializedPatchWriter::for_model(E::MODEL);
471
472    // Phase 1: let the derive-owned persisted-row writer emit the complete
473    // structural slot image for this entity.
474    entity.write_slots(&mut writer)?;
475
476    // Phase 2: require a dense slot image so save/update replay remains
477    // equivalent to the existing full-row write semantics.
478    writer.finish_complete()
479}
480
481/// Apply one serialized structural patch to one raw row.
482///
483/// This mechanical replay step no longer owns any `Value -> bytes` dispatch.
484/// It only replays already encoded slot payloads over the current row layout.
485#[allow(dead_code)]
486pub(in crate::db) fn apply_serialized_update_patch_to_raw_row(
487    model: &'static EntityModel,
488    raw_row: &RawRow,
489    patch: &SerializedUpdatePatch,
490) -> Result<RawRow, InternalError> {
491    if patch.is_empty() {
492        return Ok(raw_row.clone());
493    }
494
495    let field_bytes = StructuralRowFieldBytes::from_raw_row(raw_row, model)
496        .map_err(StructuralRowDecodeError::into_internal_error)?;
497    let patch_payloads = serialized_patch_payload_by_slot(model, patch)?;
498    let mut writer = SlotBufferWriter::for_model(model);
499
500    // Phase 1: replay the current row layout slot-by-slot, overriding only the
501    // patched fields with their already serialized payload bytes.
502    for (slot, patch_payload) in patch_payloads.iter().enumerate() {
503        match patch_payload {
504            Some(SerializedSlotPatchRef::Set(payload)) => {
505                writer.write_slot(slot, Some(payload))?;
506            }
507            Some(SerializedSlotPatchRef::Clear) => {
508                writer.write_slot(slot, None)?;
509            }
510            None => {
511                writer.write_slot(slot, field_bytes.field(slot))?;
512            }
513        }
514    }
515
516    // Phase 2: wrap the new slot payload bytes back into the canonical row
517    // envelope.
518    let payload = writer.finish()?;
519    let encoded = serialize_row_payload(payload)?;
520
521    RawRow::try_new(encoded).map_err(InternalError::from)
522}
523
524// Decode one non-scalar slot through the exact persisted contract declared by
525// the field model.
526fn decode_non_scalar_slot_value(
527    raw_value: &[u8],
528    field: &FieldModel,
529) -> Result<Value, InternalError> {
530    let decoded = match field.storage_decode() {
531        crate::model::field::FieldStorageDecode::ByKind => {
532            decode_structural_field_by_kind_bytes(raw_value, field.kind())
533        }
534        crate::model::field::FieldStorageDecode::Value => {
535            decode_structural_value_storage_bytes(raw_value)
536        }
537    };
538
539    decoded.map_err(|err| {
540        InternalError::persisted_row_field_kind_decode_failed(field.name(), field.kind(), err)
541    })
542}
543
544// Validate one runtime value against the persisted field contract before field-
545// level structural encoding writes bytes into a row slot.
546#[allow(dead_code)]
547fn ensure_slot_value_matches_field_contract(
548    field: &FieldModel,
549    value: &Value,
550) -> Result<(), InternalError> {
551    if matches!(value, Value::Null) {
552        return Ok(());
553    }
554
555    if matches!(field.kind(), FieldKind::Structured { queryable: false })
556        && matches!(field.storage_decode(), FieldStorageDecode::Value)
557    {
558        return ensure_value_is_deterministic_for_storage(field.name(), field.kind(), value);
559    }
560
561    let field_type = field_type_from_model_kind(&field.kind());
562    if !literal_matches_type(value, &field_type) {
563        return Err(InternalError::persisted_row_field_encode_failed(
564            field.name(),
565            format!(
566                "field kind {:?} does not accept runtime value {value:?}",
567                field.kind()
568            ),
569        ));
570    }
571
572    ensure_decimal_scale_matches(field.name(), field.kind(), value)?;
573    ensure_value_is_deterministic_for_storage(field.name(), field.kind(), value)
574}
575
576// Enforce fixed decimal scales through nested collection/map shapes before a
577// field-level patch value is persisted.
578#[allow(dead_code)]
579fn ensure_decimal_scale_matches(
580    field_name: &str,
581    kind: FieldKind,
582    value: &Value,
583) -> Result<(), InternalError> {
584    if matches!(value, Value::Null) {
585        return Ok(());
586    }
587
588    match (kind, value) {
589        (FieldKind::Decimal { scale }, Value::Decimal(decimal)) => {
590            if decimal.scale() != scale {
591                return Err(InternalError::persisted_row_field_encode_failed(
592                    field_name,
593                    format!(
594                        "decimal scale mismatch: expected {scale}, found {}",
595                        decimal.scale()
596                    ),
597                ));
598            }
599
600            Ok(())
601        }
602        (FieldKind::Relation { key_kind, .. }, value) => {
603            ensure_decimal_scale_matches(field_name, *key_kind, value)
604        }
605        (FieldKind::List(inner) | FieldKind::Set(inner), Value::List(items)) => {
606            for item in items {
607                ensure_decimal_scale_matches(field_name, *inner, item)?;
608            }
609
610            Ok(())
611        }
612        (
613            FieldKind::Map {
614                key,
615                value: map_value,
616            },
617            Value::Map(entries),
618        ) => {
619            for (entry_key, entry_value) in entries {
620                ensure_decimal_scale_matches(field_name, *key, entry_key)?;
621                ensure_decimal_scale_matches(field_name, *map_value, entry_value)?;
622            }
623
624            Ok(())
625        }
626        _ => Ok(()),
627    }
628}
629
630// Enforce the canonical persisted ordering rules for set/map shapes before one
631// field-level patch value becomes row bytes.
632#[allow(dead_code)]
633fn ensure_value_is_deterministic_for_storage(
634    field_name: &str,
635    kind: FieldKind,
636    value: &Value,
637) -> Result<(), InternalError> {
638    match (kind, value) {
639        (FieldKind::Set(_), Value::List(items)) => {
640            for pair in items.windows(2) {
641                let [left, right] = pair else {
642                    continue;
643                };
644                if Value::canonical_cmp(left, right) != Ordering::Less {
645                    return Err(InternalError::persisted_row_field_encode_failed(
646                        field_name,
647                        "set payload must already be canonical and deduplicated",
648                    ));
649                }
650            }
651
652            Ok(())
653        }
654        (FieldKind::Map { .. }, Value::Map(entries)) => {
655            Value::validate_map_entries(entries.as_slice())
656                .map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err))?;
657
658            if !Value::map_entries_are_strictly_canonical(entries.as_slice()) {
659                return Err(InternalError::persisted_row_field_encode_failed(
660                    field_name,
661                    "map payload must already be canonical and deduplicated",
662                ));
663            }
664
665            Ok(())
666        }
667        _ => Ok(()),
668    }
669}
670
671// Materialize the last-write-wins serialized patch view indexed by stable slot.
672fn serialized_patch_payload_by_slot<'a>(
673    model: &'static EntityModel,
674    patch: &'a SerializedUpdatePatch,
675) -> Result<Vec<Option<SerializedSlotPatchRef<'a>>>, InternalError> {
676    let mut payloads = vec![None; model.fields().len()];
677
678    for entry in patch.entries() {
679        let slot = entry.slot().index();
680        field_model_for_slot(model, slot)?;
681        payloads[slot] = Some(match entry.payload() {
682            Some(payload) => SerializedSlotPatchRef::Set(payload),
683            None => SerializedSlotPatchRef::Clear,
684        });
685    }
686
687    Ok(payloads)
688}
689
690///
691/// SerializedSlotPatchRef
692///
693/// SerializedSlotPatchRef
694///
695/// SerializedSlotPatchRef is the borrowed replay view used while applying one
696/// serialized patch over an existing row layout.
697/// It preserves the distinction between "set these bytes" and "clear this
698/// slot" without forcing row replay to reason about runtime `Value`s.
699///
700
701#[derive(Clone, Copy, Debug, Eq, PartialEq)]
702enum SerializedSlotPatchRef<'a> {
703    Clear,
704    Set(&'a [u8]),
705}
706
707// Encode one `ByKind` field payload into the raw CBOR shape expected by the
708// structural field decoder.
709fn encode_structural_field_bytes_by_kind(
710    kind: FieldKind,
711    value: &Value,
712    field_name: &str,
713) -> Result<Vec<u8>, InternalError> {
714    let cbor_value = encode_structural_field_cbor_by_kind(kind, value, field_name)?;
715
716    serialize(&cbor_value)
717        .map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err))
718}
719
720// Encode one `ByKind` field payload into its raw CBOR value form.
721fn encode_structural_field_cbor_by_kind(
722    kind: FieldKind,
723    value: &Value,
724    field_name: &str,
725) -> Result<CborValue, InternalError> {
726    match (kind, value) {
727        (_, Value::Null) => Ok(CborValue::Null),
728        (FieldKind::Blob, Value::Blob(value)) => Ok(CborValue::Bytes(value.clone())),
729        (FieldKind::Bool, Value::Bool(value)) => Ok(CborValue::Bool(*value)),
730        (FieldKind::Text, Value::Text(value)) => Ok(CborValue::Text(value.clone())),
731        (FieldKind::Int, Value::Int(value)) => Ok(CborValue::Integer(i128::from(*value))),
732        (FieldKind::Uint, Value::Uint(value)) => Ok(CborValue::Integer(i128::from(*value))),
733        (FieldKind::Float32, Value::Float32(value)) => to_cbor_value(value)
734            .map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err)),
735        (FieldKind::Float64, Value::Float64(value)) => to_cbor_value(value)
736            .map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err)),
737        (FieldKind::Int128, Value::Int128(value)) => to_cbor_value(value)
738            .map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err)),
739        (FieldKind::Uint128, Value::Uint128(value)) => to_cbor_value(value)
740            .map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err)),
741        (FieldKind::Ulid, Value::Ulid(value)) => Ok(CborValue::Text(value.to_string())),
742        (FieldKind::Account, Value::Account(value)) => encode_leaf_cbor_value(value, field_name),
743        (FieldKind::Date, Value::Date(value)) => encode_leaf_cbor_value(value, field_name),
744        (FieldKind::Decimal { .. }, Value::Decimal(value)) => {
745            encode_leaf_cbor_value(value, field_name)
746        }
747        (FieldKind::Duration, Value::Duration(value)) => encode_leaf_cbor_value(value, field_name),
748        (FieldKind::IntBig, Value::IntBig(value)) => encode_leaf_cbor_value(value, field_name),
749        (FieldKind::Principal, Value::Principal(value)) => {
750            encode_leaf_cbor_value(value, field_name)
751        }
752        (FieldKind::Subaccount, Value::Subaccount(value)) => {
753            encode_leaf_cbor_value(value, field_name)
754        }
755        (FieldKind::Timestamp, Value::Timestamp(value)) => {
756            encode_leaf_cbor_value(value, field_name)
757        }
758        (FieldKind::UintBig, Value::UintBig(value)) => encode_leaf_cbor_value(value, field_name),
759        (FieldKind::Unit, Value::Unit) => encode_leaf_cbor_value(&(), field_name),
760        (FieldKind::Relation { key_kind, .. }, value) => {
761            encode_structural_field_cbor_by_kind(*key_kind, value, field_name)
762        }
763        (FieldKind::List(inner) | FieldKind::Set(inner), Value::List(items)) => {
764            Ok(CborValue::Array(
765                items
766                    .iter()
767                    .map(|item| encode_structural_field_cbor_by_kind(*inner, item, field_name))
768                    .collect::<Result<Vec<_>, _>>()?,
769            ))
770        }
771        (FieldKind::Map { key, value }, Value::Map(entries)) => {
772            let mut encoded = BTreeMap::new();
773            for (entry_key, entry_value) in entries {
774                encoded.insert(
775                    encode_structural_field_cbor_by_kind(*key, entry_key, field_name)?,
776                    encode_structural_field_cbor_by_kind(*value, entry_value, field_name)?,
777                );
778            }
779
780            Ok(CborValue::Map(encoded))
781        }
782        (FieldKind::Enum { path, variants }, Value::Enum(value)) => {
783            encode_enum_cbor_value(path, variants, value, field_name)
784        }
785        (FieldKind::Structured { .. }, _) => Err(InternalError::persisted_row_field_encode_failed(
786            field_name,
787            "structured ByKind field encoding is unsupported",
788        )),
789        _ => Err(InternalError::persisted_row_field_encode_failed(
790            field_name,
791            format!("field kind {kind:?} does not accept runtime value {value:?}"),
792        )),
793    }
794}
795
796// Encode one typed leaf wrapper into its raw CBOR value form.
797fn encode_leaf_cbor_value<T>(value: &T, field_name: &str) -> Result<CborValue, InternalError>
798where
799    T: serde::Serialize,
800{
801    to_cbor_value(value)
802        .map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err))
803}
804
805// Encode one enum field using the same unit-vs-one-entry-map envelope expected
806// by structural enum decode.
807fn encode_enum_cbor_value(
808    path: &'static str,
809    variants: &'static [crate::model::field::EnumVariantModel],
810    value: &ValueEnum,
811    field_name: &str,
812) -> Result<CborValue, InternalError> {
813    if let Some(actual_path) = value.path()
814        && actual_path != path
815    {
816        return Err(InternalError::persisted_row_field_encode_failed(
817            field_name,
818            format!("enum path mismatch: expected '{path}', found '{actual_path}'"),
819        ));
820    }
821
822    let Some(payload) = value.payload() else {
823        return Ok(CborValue::Text(value.variant().to_string()));
824    };
825
826    let Some(variant_model) = variants.iter().find(|item| item.ident() == value.variant()) else {
827        return Err(InternalError::persisted_row_field_encode_failed(
828            field_name,
829            format!(
830                "unknown enum variant '{}' for path '{path}'",
831                value.variant()
832            ),
833        ));
834    };
835    let Some(payload_kind) = variant_model.payload_kind() else {
836        return Err(InternalError::persisted_row_field_encode_failed(
837            field_name,
838            format!(
839                "enum variant '{}' does not accept a payload",
840                value.variant()
841            ),
842        ));
843    };
844
845    let payload_value = match variant_model.payload_storage_decode() {
846        FieldStorageDecode::ByKind => {
847            encode_structural_field_cbor_by_kind(*payload_kind, payload, field_name)?
848        }
849        FieldStorageDecode::Value => to_cbor_value(payload)
850            .map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err))?,
851    };
852
853    let mut encoded = BTreeMap::new();
854    encoded.insert(CborValue::Text(value.variant().to_string()), payload_value);
855
856    Ok(CborValue::Map(encoded))
857}
858
859// Resolve one field model entry by stable slot index.
860fn field_model_for_slot(
861    model: &'static EntityModel,
862    slot: usize,
863) -> Result<&'static FieldModel, InternalError> {
864    model
865        .fields()
866        .get(slot)
867        .ok_or_else(|| InternalError::persisted_row_slot_lookup_out_of_bounds(model.path(), slot))
868}
869
870///
871/// ScalarValueRef
872///
873/// ScalarValueRef is the borrowed-or-copy scalar payload view returned by the
874/// slot-reader fast path.
875/// It preserves cheap references for text/blob payloads while keeping fixed
876/// width scalar wrappers as copy values.
877///
878
879#[derive(Clone, Copy, Debug)]
880pub enum ScalarValueRef<'a> {
881    Blob(&'a [u8]),
882    Bool(bool),
883    Date(Date),
884    Duration(Duration),
885    Float32(Float32),
886    Float64(Float64),
887    Int(i64),
888    Principal(Principal),
889    Subaccount(Subaccount),
890    Text(&'a str),
891    Timestamp(Timestamp),
892    Uint(u64),
893    Ulid(Ulid),
894    Unit,
895}
896
897impl ScalarValueRef<'_> {
898    /// Materialize this scalar view into the runtime `Value` enum.
899    #[must_use]
900    pub fn into_value(self) -> Value {
901        match self {
902            Self::Blob(value) => Value::Blob(value.to_vec()),
903            Self::Bool(value) => Value::Bool(value),
904            Self::Date(value) => Value::Date(value),
905            Self::Duration(value) => Value::Duration(value),
906            Self::Float32(value) => Value::Float32(value),
907            Self::Float64(value) => Value::Float64(value),
908            Self::Int(value) => Value::Int(value),
909            Self::Principal(value) => Value::Principal(value),
910            Self::Subaccount(value) => Value::Subaccount(value),
911            Self::Text(value) => Value::Text(value.to_owned()),
912            Self::Timestamp(value) => Value::Timestamp(value),
913            Self::Uint(value) => Value::Uint(value),
914            Self::Ulid(value) => Value::Ulid(value),
915            Self::Unit => Value::Unit,
916        }
917    }
918}
919
920///
921/// ScalarSlotValueRef
922///
923/// ScalarSlotValueRef preserves the distinction between a missing slot and an
924/// explicitly persisted `NULL` scalar payload.
925/// The outer `Option` from `SlotReader::get_scalar` therefore still means
926/// "slot absent".
927///
928
929#[derive(Clone, Copy, Debug)]
930pub enum ScalarSlotValueRef<'a> {
931    Null,
932    Value(ScalarValueRef<'a>),
933}
934
935///
936/// PersistedScalar
937///
938/// PersistedScalar defines the canonical binary payload codec for one scalar
939/// leaf type.
940/// Derive-generated persisted-row materializers and writers use this trait to
941/// avoid routing scalar fields back through CBOR.
942///
943
944pub trait PersistedScalar: Sized {
945    /// Canonical scalar codec identifier used by schema/runtime metadata.
946    const CODEC: ScalarCodec;
947
948    /// Encode this scalar value into its codec-specific payload bytes.
949    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError>;
950
951    /// Decode this scalar value from its codec-specific payload bytes.
952    fn decode_scalar_payload(bytes: &[u8], field_name: &'static str)
953    -> Result<Self, InternalError>;
954}
955
956/// Encode one persisted slot payload using the shared leaf codec boundary.
957pub fn encode_persisted_slot_payload<T>(
958    value: &T,
959    field_name: &'static str,
960) -> Result<Vec<u8>, InternalError>
961where
962    T: serde::Serialize,
963{
964    serialize(value)
965        .map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err))
966}
967
968/// Encode one persisted scalar slot payload using the canonical scalar envelope.
969pub fn encode_persisted_scalar_slot_payload<T>(
970    value: &T,
971    field_name: &'static str,
972) -> Result<Vec<u8>, InternalError>
973where
974    T: PersistedScalar,
975{
976    let payload = value.encode_scalar_payload()?;
977    let mut encoded = Vec::with_capacity(payload.len() + 2);
978    encoded.push(SCALAR_SLOT_PREFIX);
979    encoded.push(SCALAR_SLOT_TAG_VALUE);
980    encoded.extend_from_slice(&payload);
981
982    if encoded.len() < 2 {
983        return Err(InternalError::persisted_row_field_encode_failed(
984            field_name,
985            "scalar payload envelope underflow",
986        ));
987    }
988
989    Ok(encoded)
990}
991
992/// Encode one optional persisted scalar slot payload preserving explicit `NULL`.
993pub fn encode_persisted_option_scalar_slot_payload<T>(
994    value: &Option<T>,
995    field_name: &'static str,
996) -> Result<Vec<u8>, InternalError>
997where
998    T: PersistedScalar,
999{
1000    match value {
1001        Some(value) => encode_persisted_scalar_slot_payload(value, field_name),
1002        None => Ok(vec![SCALAR_SLOT_PREFIX, SCALAR_SLOT_TAG_NULL]),
1003    }
1004}
1005
1006/// Decode one persisted slot payload using the shared leaf codec boundary.
1007pub fn decode_persisted_slot_payload<T>(
1008    bytes: &[u8],
1009    field_name: &'static str,
1010) -> Result<T, InternalError>
1011where
1012    T: serde::de::DeserializeOwned,
1013{
1014    deserialize(bytes)
1015        .map_err(|err| InternalError::persisted_row_field_decode_failed(field_name, err))
1016}
1017
1018/// Decode one persisted scalar slot payload using the canonical scalar envelope.
1019pub fn decode_persisted_scalar_slot_payload<T>(
1020    bytes: &[u8],
1021    field_name: &'static str,
1022) -> Result<T, InternalError>
1023where
1024    T: PersistedScalar,
1025{
1026    let payload = decode_scalar_slot_payload_body(bytes, field_name)?.ok_or_else(|| {
1027        InternalError::persisted_row_field_decode_failed(
1028            field_name,
1029            "unexpected null for non-nullable scalar field",
1030        )
1031    })?;
1032
1033    T::decode_scalar_payload(payload, field_name)
1034}
1035
1036/// Decode one optional persisted scalar slot payload preserving explicit `NULL`.
1037pub fn decode_persisted_option_scalar_slot_payload<T>(
1038    bytes: &[u8],
1039    field_name: &'static str,
1040) -> Result<Option<T>, InternalError>
1041where
1042    T: PersistedScalar,
1043{
1044    let Some(payload) = decode_scalar_slot_payload_body(bytes, field_name)? else {
1045        return Ok(None);
1046    };
1047
1048    T::decode_scalar_payload(payload, field_name).map(Some)
1049}
1050
1051///
1052/// SlotBufferWriter
1053///
1054/// SlotBufferWriter captures one row worth of slot payloads before they are
1055/// encoded into the canonical slot container.
1056///
1057
1058pub(in crate::db) struct SlotBufferWriter {
1059    slots: Vec<Option<Vec<u8>>>,
1060}
1061
1062impl SlotBufferWriter {
1063    /// Build one empty slot buffer for one entity model.
1064    pub(in crate::db) fn for_model(model: &'static EntityModel) -> Self {
1065        Self {
1066            slots: vec![None; model.fields().len()],
1067        }
1068    }
1069
1070    /// Encode the buffered slots into the canonical row payload.
1071    pub(in crate::db) fn finish(self) -> Result<Vec<u8>, InternalError> {
1072        let field_count = u16::try_from(self.slots.len()).map_err(|_| {
1073            InternalError::persisted_row_encode_failed(format!(
1074                "field count {} exceeds u16 slot table capacity",
1075                self.slots.len(),
1076            ))
1077        })?;
1078        let mut payload_bytes = Vec::new();
1079        let mut slot_table = Vec::with_capacity(self.slots.len());
1080
1081        // Phase 1: assign each present slot one payload span inside the data section.
1082        for slot_payload in self.slots {
1083            match slot_payload {
1084                Some(bytes) => {
1085                    let start = u32::try_from(payload_bytes.len()).map_err(|_| {
1086                        InternalError::persisted_row_encode_failed(
1087                            "slot payload start exceeds u32 range",
1088                        )
1089                    })?;
1090                    let len = u32::try_from(bytes.len()).map_err(|_| {
1091                        InternalError::persisted_row_encode_failed(
1092                            "slot payload length exceeds u32 range",
1093                        )
1094                    })?;
1095                    payload_bytes.extend_from_slice(&bytes);
1096                    slot_table.push((start, len));
1097                }
1098                None => slot_table.push((0, 0)),
1099            }
1100        }
1101
1102        // Phase 2: write the fixed-width slot header followed by concatenated payloads.
1103        let mut encoded = Vec::with_capacity(
1104            usize::from(field_count) * (u32::BITS as usize / 4) + 2 + payload_bytes.len(),
1105        );
1106        encoded.extend_from_slice(&field_count.to_be_bytes());
1107        for (start, len) in slot_table {
1108            encoded.extend_from_slice(&start.to_be_bytes());
1109            encoded.extend_from_slice(&len.to_be_bytes());
1110        }
1111        encoded.extend_from_slice(&payload_bytes);
1112
1113        Ok(encoded)
1114    }
1115}
1116
1117impl SlotWriter for SlotBufferWriter {
1118    fn write_slot(&mut self, slot: usize, payload: Option<&[u8]>) -> Result<(), InternalError> {
1119        let entry = self.slots.get_mut(slot).ok_or_else(|| {
1120            InternalError::persisted_row_encode_failed(format!(
1121                "slot {slot} is outside the row layout",
1122            ))
1123        })?;
1124        *entry = payload.map(<[u8]>::to_vec);
1125
1126        Ok(())
1127    }
1128}
1129
1130///
1131/// SerializedPatchWriter
1132///
1133/// SerializedPatchWriter
1134///
1135/// SerializedPatchWriter captures a dense typed entity slot image into the
1136/// serialized patch artifact used by `0.64` mutation staging.
1137/// Unlike `SlotBufferWriter`, this writer does not flatten into one row payload;
1138/// it preserves slot-level ownership so later stages can replay the row through
1139/// the structural patch boundary.
1140///
1141
1142struct SerializedPatchWriter {
1143    model: &'static EntityModel,
1144    slots: Vec<PatchWriterSlot>,
1145}
1146
1147impl SerializedPatchWriter {
1148    /// Build one empty serialized patch writer for one entity model.
1149    fn for_model(model: &'static EntityModel) -> Self {
1150        Self {
1151            model,
1152            slots: vec![PatchWriterSlot::Missing; model.fields().len()],
1153        }
1154    }
1155
1156    /// Materialize one dense serialized patch, erroring if the writer failed
1157    /// to emit any declared slot.
1158    fn finish_complete(self) -> Result<SerializedUpdatePatch, InternalError> {
1159        let mut entries = Vec::with_capacity(self.slots.len());
1160
1161        // Phase 1: require a complete slot image so typed save/update staging
1162        // stays equivalent to the existing full-row encoder.
1163        for (slot, payload) in self.slots.into_iter().enumerate() {
1164            let field_slot = FieldSlot::from_index(self.model, slot)?;
1165            let serialized = match payload {
1166                PatchWriterSlot::Set(payload) => {
1167                    SerializedFieldUpdate::new(field_slot, Some(payload))
1168                }
1169                PatchWriterSlot::Clear => SerializedFieldUpdate::new(field_slot, None),
1170                PatchWriterSlot::Missing => {
1171                    return Err(InternalError::persisted_row_encode_failed(format!(
1172                        "serialized patch writer did not emit slot {slot} for entity '{}'",
1173                        self.model.path()
1174                    )));
1175                }
1176            };
1177            entries.push(serialized);
1178        }
1179
1180        Ok(SerializedUpdatePatch::new(entries))
1181    }
1182}
1183
1184impl SlotWriter for SerializedPatchWriter {
1185    fn write_slot(&mut self, slot: usize, payload: Option<&[u8]>) -> Result<(), InternalError> {
1186        let entry = self.slots.get_mut(slot).ok_or_else(|| {
1187            InternalError::persisted_row_encode_failed(format!(
1188                "slot {slot} is outside the row layout",
1189            ))
1190        })?;
1191        *entry = match payload {
1192            Some(payload) => PatchWriterSlot::Set(payload.to_vec()),
1193            None => PatchWriterSlot::Clear,
1194        };
1195
1196        Ok(())
1197    }
1198}
1199
1200///
1201/// PatchWriterSlot
1202///
1203/// PatchWriterSlot
1204///
1205/// PatchWriterSlot tracks whether one dense slot-image writer has emitted a
1206/// payload, emitted an explicit clear, or failed to visit the slot at all.
1207/// That lets the typed save/update bridge reject incomplete writers instead of
1208/// silently leaving stale bytes in the baseline row.
1209///
1210
1211#[derive(Clone, Debug, Eq, PartialEq)]
1212enum PatchWriterSlot {
1213    Missing,
1214    Clear,
1215    Set(Vec<u8>),
1216}
1217
1218///
1219/// StructuralSlotReader
1220///
1221/// StructuralSlotReader adapts the current persisted-row bytes into the
1222/// canonical slot-reader seam.
1223/// It caches decoded field values lazily so repeated index/predicate reads do
1224/// not re-run the same field decoder within one row planning pass.
1225///
1226
1227pub(in crate::db) struct StructuralSlotReader<'a> {
1228    model: &'static EntityModel,
1229    field_bytes: StructuralRowFieldBytes<'a>,
1230    cached_values: Vec<CachedSlotValue>,
1231}
1232
1233impl<'a> StructuralSlotReader<'a> {
1234    /// Build one slot reader over one persisted row using the current structural row scanner.
1235    pub(in crate::db) fn from_raw_row(
1236        raw_row: &'a RawRow,
1237        model: &'static EntityModel,
1238    ) -> Result<Self, InternalError> {
1239        let field_bytes = StructuralRowFieldBytes::from_raw_row(raw_row, model)
1240            .map_err(StructuralRowDecodeError::into_internal_error)?;
1241        let cached_values = std::iter::repeat_with(|| CachedSlotValue::Pending)
1242            .take(model.fields().len())
1243            .collect();
1244
1245        Ok(Self {
1246            model,
1247            field_bytes,
1248            cached_values,
1249        })
1250    }
1251
1252    /// Validate the decoded primary-key slot against the authoritative row key.
1253    pub(in crate::db) fn validate_storage_key(
1254        &self,
1255        data_key: &DataKey,
1256    ) -> Result<(), InternalError> {
1257        let Some(primary_key_slot) = resolve_primary_key_slot(self.model) else {
1258            return Err(InternalError::persisted_row_primary_key_field_missing(
1259                self.model.path(),
1260            ));
1261        };
1262        let field = self.field_model(primary_key_slot)?;
1263        let decoded_key = match self.get_scalar(primary_key_slot)? {
1264            Some(ScalarSlotValueRef::Null) => None,
1265            Some(ScalarSlotValueRef::Value(value)) => storage_key_from_scalar_ref(value),
1266            None => match self.field_bytes.field(primary_key_slot) {
1267                Some(raw_value) => Some(
1268                    decode_storage_key_field_bytes(raw_value, field.kind).map_err(|err| {
1269                        InternalError::persisted_row_primary_key_not_storage_encodable(
1270                            data_key, err,
1271                        )
1272                    })?,
1273                ),
1274                None => None,
1275            },
1276        };
1277        let Some(decoded_key) = decoded_key else {
1278            return Err(InternalError::persisted_row_primary_key_slot_missing(
1279                data_key,
1280            ));
1281        };
1282        let expected_key = data_key.storage_key();
1283
1284        if decoded_key != expected_key {
1285            return Err(InternalError::persisted_row_key_mismatch(
1286                expected_key,
1287                decoded_key,
1288            ));
1289        }
1290
1291        Ok(())
1292    }
1293
1294    // Resolve one field model entry by stable slot index.
1295    fn field_model(&self, slot: usize) -> Result<&FieldModel, InternalError> {
1296        field_model_for_slot(self.model, slot)
1297    }
1298}
1299
1300// Convert one scalar slot fast-path value into its storage-key form when the
1301// field kind is storage-key-compatible.
1302const fn storage_key_from_scalar_ref(value: ScalarValueRef<'_>) -> Option<StorageKey> {
1303    match value {
1304        ScalarValueRef::Int(value) => Some(StorageKey::Int(value)),
1305        ScalarValueRef::Principal(value) => Some(StorageKey::Principal(value)),
1306        ScalarValueRef::Subaccount(value) => Some(StorageKey::Subaccount(value)),
1307        ScalarValueRef::Timestamp(value) => Some(StorageKey::Timestamp(value)),
1308        ScalarValueRef::Uint(value) => Some(StorageKey::Uint(value)),
1309        ScalarValueRef::Ulid(value) => Some(StorageKey::Ulid(value)),
1310        ScalarValueRef::Unit => Some(StorageKey::Unit),
1311        _ => None,
1312    }
1313}
1314
1315impl SlotReader for StructuralSlotReader<'_> {
1316    fn model(&self) -> &'static EntityModel {
1317        self.model
1318    }
1319
1320    fn has(&self, slot: usize) -> bool {
1321        self.field_bytes.field(slot).is_some()
1322    }
1323
1324    fn get_bytes(&self, slot: usize) -> Option<&[u8]> {
1325        self.field_bytes.field(slot)
1326    }
1327
1328    fn get_scalar(&self, slot: usize) -> Result<Option<ScalarSlotValueRef<'_>>, InternalError> {
1329        let field = self.field_model(slot)?;
1330        let Some(raw_value) = self.field_bytes.field(slot) else {
1331            return Ok(None);
1332        };
1333
1334        match field.leaf_codec() {
1335            LeafCodec::Scalar(codec) => {
1336                decode_scalar_slot_value(raw_value, codec, field.name()).map(Some)
1337            }
1338            LeafCodec::CborFallback => Ok(None),
1339        }
1340    }
1341
1342    fn get_value(&mut self, slot: usize) -> Result<Option<Value>, InternalError> {
1343        let cached = self.cached_values.get(slot).ok_or_else(|| {
1344            InternalError::persisted_row_slot_cache_lookup_out_of_bounds(self.model.path(), slot)
1345        })?;
1346        if let CachedSlotValue::Decoded(value) = cached {
1347            return Ok(value.clone());
1348        }
1349
1350        let value = match self.field_bytes.field(slot) {
1351            Some(raw_value) => Some(decode_slot_value_from_bytes(self.model, slot, raw_value)?),
1352            None => None,
1353        };
1354        self.cached_values[slot] = CachedSlotValue::Decoded(value.clone());
1355
1356        Ok(value)
1357    }
1358}
1359
1360///
1361/// CachedSlotValue
1362///
1363/// CachedSlotValue tracks whether one slot has already been decoded during the
1364/// current structural row access pass.
1365///
1366
1367enum CachedSlotValue {
1368    Pending,
1369    Decoded(Option<Value>),
1370}
1371
1372// Encode one scalar slot value into the canonical prefixed scalar envelope.
1373fn encode_scalar_slot_value(value: ScalarSlotValueRef<'_>) -> Vec<u8> {
1374    match value {
1375        ScalarSlotValueRef::Null => vec![SCALAR_SLOT_PREFIX, SCALAR_SLOT_TAG_NULL],
1376        ScalarSlotValueRef::Value(value) => {
1377            let mut encoded = Vec::new();
1378            encoded.push(SCALAR_SLOT_PREFIX);
1379            encoded.push(SCALAR_SLOT_TAG_VALUE);
1380
1381            match value {
1382                ScalarValueRef::Blob(bytes) => encoded.extend_from_slice(bytes),
1383                ScalarValueRef::Bool(value) => encoded.push(u8::from(value)),
1384                ScalarValueRef::Date(value) => {
1385                    encoded.extend_from_slice(&value.as_days_since_epoch().to_le_bytes());
1386                }
1387                ScalarValueRef::Duration(value) => {
1388                    encoded.extend_from_slice(&value.as_millis().to_le_bytes());
1389                }
1390                ScalarValueRef::Float32(value) => {
1391                    encoded.extend_from_slice(&value.get().to_bits().to_le_bytes());
1392                }
1393                ScalarValueRef::Float64(value) => {
1394                    encoded.extend_from_slice(&value.get().to_bits().to_le_bytes());
1395                }
1396                ScalarValueRef::Int(value) => encoded.extend_from_slice(&value.to_le_bytes()),
1397                ScalarValueRef::Principal(value) => encoded.extend_from_slice(value.as_slice()),
1398                ScalarValueRef::Subaccount(value) => encoded.extend_from_slice(&value.to_bytes()),
1399                ScalarValueRef::Text(value) => encoded.extend_from_slice(value.as_bytes()),
1400                ScalarValueRef::Timestamp(value) => {
1401                    encoded.extend_from_slice(&value.as_millis().to_le_bytes());
1402                }
1403                ScalarValueRef::Uint(value) => encoded.extend_from_slice(&value.to_le_bytes()),
1404                ScalarValueRef::Ulid(value) => encoded.extend_from_slice(&value.to_bytes()),
1405                ScalarValueRef::Unit => {}
1406            }
1407
1408            encoded
1409        }
1410    }
1411}
1412
1413// Split one scalar slot envelope into `NULL` vs payload bytes.
1414fn decode_scalar_slot_payload_body<'a>(
1415    bytes: &'a [u8],
1416    field_name: &'static str,
1417) -> Result<Option<&'a [u8]>, InternalError> {
1418    let Some((&prefix, rest)) = bytes.split_first() else {
1419        return Err(InternalError::persisted_row_field_decode_failed(
1420            field_name,
1421            "empty scalar payload",
1422        ));
1423    };
1424    if prefix != SCALAR_SLOT_PREFIX {
1425        return Err(InternalError::persisted_row_field_decode_failed(
1426            field_name,
1427            "scalar payload prefix mismatch",
1428        ));
1429    }
1430    let Some((&tag, payload)) = rest.split_first() else {
1431        return Err(InternalError::persisted_row_field_decode_failed(
1432            field_name,
1433            "truncated scalar payload tag",
1434        ));
1435    };
1436
1437    match tag {
1438        SCALAR_SLOT_TAG_NULL => {
1439            if !payload.is_empty() {
1440                return Err(InternalError::persisted_row_field_decode_failed(
1441                    field_name,
1442                    "null scalar payload has trailing bytes",
1443                ));
1444            }
1445
1446            Ok(None)
1447        }
1448        SCALAR_SLOT_TAG_VALUE => Ok(Some(payload)),
1449        _ => Err(InternalError::persisted_row_field_decode_failed(
1450            field_name,
1451            format!("invalid scalar payload tag {tag}"),
1452        )),
1453    }
1454}
1455
1456// Decode one scalar slot view using the field-declared scalar codec.
1457#[expect(clippy::too_many_lines)]
1458fn decode_scalar_slot_value<'a>(
1459    bytes: &'a [u8],
1460    codec: ScalarCodec,
1461    field_name: &'static str,
1462) -> Result<ScalarSlotValueRef<'a>, InternalError> {
1463    let Some(payload) = decode_scalar_slot_payload_body(bytes, field_name)? else {
1464        return Ok(ScalarSlotValueRef::Null);
1465    };
1466
1467    let value = match codec {
1468        ScalarCodec::Blob => ScalarValueRef::Blob(payload),
1469        ScalarCodec::Bool => {
1470            let [value] = payload else {
1471                return Err(
1472                    InternalError::persisted_row_field_payload_exact_len_required(
1473                        field_name,
1474                        "bool",
1475                        SCALAR_BOOL_PAYLOAD_LEN,
1476                    ),
1477                );
1478            };
1479            match *value {
1480                SCALAR_BOOL_FALSE_TAG => ScalarValueRef::Bool(false),
1481                SCALAR_BOOL_TRUE_TAG => ScalarValueRef::Bool(true),
1482                _ => {
1483                    return Err(InternalError::persisted_row_field_payload_invalid_byte(
1484                        field_name, "bool", *value,
1485                    ));
1486                }
1487            }
1488        }
1489        ScalarCodec::Date => {
1490            let bytes: [u8; SCALAR_WORD32_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
1491                InternalError::persisted_row_field_payload_exact_len_required(
1492                    field_name,
1493                    "date",
1494                    SCALAR_WORD32_PAYLOAD_LEN,
1495                )
1496            })?;
1497            ScalarValueRef::Date(Date::from_days_since_epoch(i32::from_le_bytes(bytes)))
1498        }
1499        ScalarCodec::Duration => {
1500            let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
1501                InternalError::persisted_row_field_payload_exact_len_required(
1502                    field_name,
1503                    "duration",
1504                    SCALAR_WORD64_PAYLOAD_LEN,
1505                )
1506            })?;
1507            ScalarValueRef::Duration(Duration::from_millis(u64::from_le_bytes(bytes)))
1508        }
1509        ScalarCodec::Float32 => {
1510            let bytes: [u8; SCALAR_WORD32_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
1511                InternalError::persisted_row_field_payload_exact_len_required(
1512                    field_name,
1513                    "float32",
1514                    SCALAR_WORD32_PAYLOAD_LEN,
1515                )
1516            })?;
1517            let value = f32::from_bits(u32::from_le_bytes(bytes));
1518            let value = Float32::try_new(value).ok_or_else(|| {
1519                InternalError::persisted_row_field_payload_non_finite(field_name, "float32")
1520            })?;
1521            ScalarValueRef::Float32(value)
1522        }
1523        ScalarCodec::Float64 => {
1524            let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
1525                InternalError::persisted_row_field_payload_exact_len_required(
1526                    field_name,
1527                    "float64",
1528                    SCALAR_WORD64_PAYLOAD_LEN,
1529                )
1530            })?;
1531            let value = f64::from_bits(u64::from_le_bytes(bytes));
1532            let value = Float64::try_new(value).ok_or_else(|| {
1533                InternalError::persisted_row_field_payload_non_finite(field_name, "float64")
1534            })?;
1535            ScalarValueRef::Float64(value)
1536        }
1537        ScalarCodec::Int64 => {
1538            let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
1539                InternalError::persisted_row_field_payload_exact_len_required(
1540                    field_name,
1541                    "int",
1542                    SCALAR_WORD64_PAYLOAD_LEN,
1543                )
1544            })?;
1545            ScalarValueRef::Int(i64::from_le_bytes(bytes))
1546        }
1547        ScalarCodec::Principal => ScalarValueRef::Principal(
1548            Principal::try_from_bytes(payload)
1549                .map_err(|err| InternalError::persisted_row_field_decode_failed(field_name, err))?,
1550        ),
1551        ScalarCodec::Subaccount => {
1552            let bytes: [u8; SCALAR_SUBACCOUNT_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
1553                InternalError::persisted_row_field_payload_exact_len_required(
1554                    field_name,
1555                    "subaccount",
1556                    SCALAR_SUBACCOUNT_PAYLOAD_LEN,
1557                )
1558            })?;
1559            ScalarValueRef::Subaccount(Subaccount::from_array(bytes))
1560        }
1561        ScalarCodec::Text => {
1562            let value = str::from_utf8(payload).map_err(|err| {
1563                InternalError::persisted_row_field_text_payload_invalid_utf8(field_name, err)
1564            })?;
1565            ScalarValueRef::Text(value)
1566        }
1567        ScalarCodec::Timestamp => {
1568            let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
1569                InternalError::persisted_row_field_payload_exact_len_required(
1570                    field_name,
1571                    "timestamp",
1572                    SCALAR_WORD64_PAYLOAD_LEN,
1573                )
1574            })?;
1575            ScalarValueRef::Timestamp(Timestamp::from_millis(i64::from_le_bytes(bytes)))
1576        }
1577        ScalarCodec::Uint64 => {
1578            let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
1579                InternalError::persisted_row_field_payload_exact_len_required(
1580                    field_name,
1581                    "uint",
1582                    SCALAR_WORD64_PAYLOAD_LEN,
1583                )
1584            })?;
1585            ScalarValueRef::Uint(u64::from_le_bytes(bytes))
1586        }
1587        ScalarCodec::Ulid => {
1588            let bytes: [u8; SCALAR_ULID_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
1589                InternalError::persisted_row_field_payload_exact_len_required(
1590                    field_name,
1591                    "ulid",
1592                    SCALAR_ULID_PAYLOAD_LEN,
1593                )
1594            })?;
1595            ScalarValueRef::Ulid(Ulid::from_bytes(bytes))
1596        }
1597        ScalarCodec::Unit => {
1598            if !payload.is_empty() {
1599                return Err(InternalError::persisted_row_field_payload_must_be_empty(
1600                    field_name, "unit",
1601                ));
1602            }
1603            ScalarValueRef::Unit
1604        }
1605    };
1606
1607    Ok(ScalarSlotValueRef::Value(value))
1608}
1609
1610macro_rules! impl_persisted_scalar_signed {
1611    ($($ty:ty),* $(,)?) => {
1612        $(
1613            impl PersistedScalar for $ty {
1614                const CODEC: ScalarCodec = ScalarCodec::Int64;
1615
1616                fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1617                    Ok(i64::from(*self).to_le_bytes().to_vec())
1618                }
1619
1620                fn decode_scalar_payload(
1621                    bytes: &[u8],
1622                    field_name: &'static str,
1623                ) -> Result<Self, InternalError> {
1624                    let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
1625                        InternalError::persisted_row_field_payload_exact_len_required(
1626                            field_name,
1627                            "int",
1628                            SCALAR_WORD64_PAYLOAD_LEN,
1629                        )
1630                    })?;
1631                    <$ty>::try_from(i64::from_le_bytes(raw)).map_err(|_| {
1632                        InternalError::persisted_row_field_payload_out_of_range(
1633                            field_name,
1634                            "integer",
1635                        )
1636                    })
1637                }
1638            }
1639        )*
1640    };
1641}
1642
1643macro_rules! impl_persisted_scalar_unsigned {
1644    ($($ty:ty),* $(,)?) => {
1645        $(
1646            impl PersistedScalar for $ty {
1647                const CODEC: ScalarCodec = ScalarCodec::Uint64;
1648
1649                fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1650                    Ok(u64::from(*self).to_le_bytes().to_vec())
1651                }
1652
1653                fn decode_scalar_payload(
1654                    bytes: &[u8],
1655                    field_name: &'static str,
1656                ) -> Result<Self, InternalError> {
1657                    let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
1658                        InternalError::persisted_row_field_payload_exact_len_required(
1659                            field_name,
1660                            "uint",
1661                            SCALAR_WORD64_PAYLOAD_LEN,
1662                        )
1663                    })?;
1664                    <$ty>::try_from(u64::from_le_bytes(raw)).map_err(|_| {
1665                        InternalError::persisted_row_field_payload_out_of_range(
1666                            field_name,
1667                            "unsigned",
1668                        )
1669                    })
1670                }
1671            }
1672        )*
1673    };
1674}
1675
1676impl_persisted_scalar_signed!(i8, i16, i32, i64);
1677impl_persisted_scalar_unsigned!(u8, u16, u32, u64);
1678
1679impl PersistedScalar for bool {
1680    const CODEC: ScalarCodec = ScalarCodec::Bool;
1681
1682    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1683        Ok(vec![u8::from(*self)])
1684    }
1685
1686    fn decode_scalar_payload(
1687        bytes: &[u8],
1688        field_name: &'static str,
1689    ) -> Result<Self, InternalError> {
1690        let [value] = bytes else {
1691            return Err(
1692                InternalError::persisted_row_field_payload_exact_len_required(
1693                    field_name,
1694                    "bool",
1695                    SCALAR_BOOL_PAYLOAD_LEN,
1696                ),
1697            );
1698        };
1699
1700        match *value {
1701            SCALAR_BOOL_FALSE_TAG => Ok(false),
1702            SCALAR_BOOL_TRUE_TAG => Ok(true),
1703            _ => Err(InternalError::persisted_row_field_payload_invalid_byte(
1704                field_name, "bool", *value,
1705            )),
1706        }
1707    }
1708}
1709
1710impl PersistedScalar for String {
1711    const CODEC: ScalarCodec = ScalarCodec::Text;
1712
1713    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1714        Ok(self.as_bytes().to_vec())
1715    }
1716
1717    fn decode_scalar_payload(
1718        bytes: &[u8],
1719        field_name: &'static str,
1720    ) -> Result<Self, InternalError> {
1721        str::from_utf8(bytes).map(str::to_owned).map_err(|err| {
1722            InternalError::persisted_row_field_text_payload_invalid_utf8(field_name, err)
1723        })
1724    }
1725}
1726
1727impl PersistedScalar for Vec<u8> {
1728    const CODEC: ScalarCodec = ScalarCodec::Blob;
1729
1730    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1731        Ok(self.clone())
1732    }
1733
1734    fn decode_scalar_payload(
1735        bytes: &[u8],
1736        _field_name: &'static str,
1737    ) -> Result<Self, InternalError> {
1738        Ok(bytes.to_vec())
1739    }
1740}
1741
1742impl PersistedScalar for Blob {
1743    const CODEC: ScalarCodec = ScalarCodec::Blob;
1744
1745    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1746        Ok(self.to_vec())
1747    }
1748
1749    fn decode_scalar_payload(
1750        bytes: &[u8],
1751        _field_name: &'static str,
1752    ) -> Result<Self, InternalError> {
1753        Ok(Self::from(bytes))
1754    }
1755}
1756
1757impl PersistedScalar for Ulid {
1758    const CODEC: ScalarCodec = ScalarCodec::Ulid;
1759
1760    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1761        Ok(self.to_bytes().to_vec())
1762    }
1763
1764    fn decode_scalar_payload(
1765        bytes: &[u8],
1766        field_name: &'static str,
1767    ) -> Result<Self, InternalError> {
1768        Self::try_from_bytes(bytes)
1769            .map_err(|err| InternalError::persisted_row_field_decode_failed(field_name, err))
1770    }
1771}
1772
1773impl PersistedScalar for Timestamp {
1774    const CODEC: ScalarCodec = ScalarCodec::Timestamp;
1775
1776    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1777        Ok(self.as_millis().to_le_bytes().to_vec())
1778    }
1779
1780    fn decode_scalar_payload(
1781        bytes: &[u8],
1782        field_name: &'static str,
1783    ) -> Result<Self, InternalError> {
1784        let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
1785            InternalError::persisted_row_field_payload_exact_len_required(
1786                field_name,
1787                "timestamp",
1788                SCALAR_WORD64_PAYLOAD_LEN,
1789            )
1790        })?;
1791
1792        Ok(Self::from_millis(i64::from_le_bytes(raw)))
1793    }
1794}
1795
1796impl PersistedScalar for Date {
1797    const CODEC: ScalarCodec = ScalarCodec::Date;
1798
1799    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1800        Ok(self.as_days_since_epoch().to_le_bytes().to_vec())
1801    }
1802
1803    fn decode_scalar_payload(
1804        bytes: &[u8],
1805        field_name: &'static str,
1806    ) -> Result<Self, InternalError> {
1807        let raw: [u8; SCALAR_WORD32_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
1808            InternalError::persisted_row_field_payload_exact_len_required(
1809                field_name,
1810                "date",
1811                SCALAR_WORD32_PAYLOAD_LEN,
1812            )
1813        })?;
1814
1815        Ok(Self::from_days_since_epoch(i32::from_le_bytes(raw)))
1816    }
1817}
1818
1819impl PersistedScalar for Duration {
1820    const CODEC: ScalarCodec = ScalarCodec::Duration;
1821
1822    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1823        Ok(self.as_millis().to_le_bytes().to_vec())
1824    }
1825
1826    fn decode_scalar_payload(
1827        bytes: &[u8],
1828        field_name: &'static str,
1829    ) -> Result<Self, InternalError> {
1830        let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
1831            InternalError::persisted_row_field_payload_exact_len_required(
1832                field_name,
1833                "duration",
1834                SCALAR_WORD64_PAYLOAD_LEN,
1835            )
1836        })?;
1837
1838        Ok(Self::from_millis(u64::from_le_bytes(raw)))
1839    }
1840}
1841
1842impl PersistedScalar for Float32 {
1843    const CODEC: ScalarCodec = ScalarCodec::Float32;
1844
1845    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1846        Ok(self.get().to_bits().to_le_bytes().to_vec())
1847    }
1848
1849    fn decode_scalar_payload(
1850        bytes: &[u8],
1851        field_name: &'static str,
1852    ) -> Result<Self, InternalError> {
1853        let raw: [u8; SCALAR_WORD32_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
1854            InternalError::persisted_row_field_payload_exact_len_required(
1855                field_name,
1856                "float32",
1857                SCALAR_WORD32_PAYLOAD_LEN,
1858            )
1859        })?;
1860        let value = f32::from_bits(u32::from_le_bytes(raw));
1861
1862        Self::try_new(value).ok_or_else(|| {
1863            InternalError::persisted_row_field_payload_non_finite(field_name, "float32")
1864        })
1865    }
1866}
1867
1868impl PersistedScalar for Float64 {
1869    const CODEC: ScalarCodec = ScalarCodec::Float64;
1870
1871    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1872        Ok(self.get().to_bits().to_le_bytes().to_vec())
1873    }
1874
1875    fn decode_scalar_payload(
1876        bytes: &[u8],
1877        field_name: &'static str,
1878    ) -> Result<Self, InternalError> {
1879        let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
1880            InternalError::persisted_row_field_payload_exact_len_required(
1881                field_name,
1882                "float64",
1883                SCALAR_WORD64_PAYLOAD_LEN,
1884            )
1885        })?;
1886        let value = f64::from_bits(u64::from_le_bytes(raw));
1887
1888        Self::try_new(value).ok_or_else(|| {
1889            InternalError::persisted_row_field_payload_non_finite(field_name, "float64")
1890        })
1891    }
1892}
1893
1894impl PersistedScalar for Principal {
1895    const CODEC: ScalarCodec = ScalarCodec::Principal;
1896
1897    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1898        self.to_bytes()
1899            .map_err(|err| InternalError::persisted_row_field_encode_failed("principal", err))
1900    }
1901
1902    fn decode_scalar_payload(
1903        bytes: &[u8],
1904        field_name: &'static str,
1905    ) -> Result<Self, InternalError> {
1906        Self::try_from_bytes(bytes)
1907            .map_err(|err| InternalError::persisted_row_field_decode_failed(field_name, err))
1908    }
1909}
1910
1911impl PersistedScalar for Subaccount {
1912    const CODEC: ScalarCodec = ScalarCodec::Subaccount;
1913
1914    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1915        Ok(self.to_bytes().to_vec())
1916    }
1917
1918    fn decode_scalar_payload(
1919        bytes: &[u8],
1920        field_name: &'static str,
1921    ) -> Result<Self, InternalError> {
1922        let raw: [u8; SCALAR_SUBACCOUNT_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
1923            InternalError::persisted_row_field_payload_exact_len_required(
1924                field_name,
1925                "subaccount",
1926                SCALAR_SUBACCOUNT_PAYLOAD_LEN,
1927            )
1928        })?;
1929
1930        Ok(Self::from_array(raw))
1931    }
1932}
1933
1934impl PersistedScalar for () {
1935    const CODEC: ScalarCodec = ScalarCodec::Unit;
1936
1937    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1938        Ok(Vec::new())
1939    }
1940
1941    fn decode_scalar_payload(
1942        bytes: &[u8],
1943        field_name: &'static str,
1944    ) -> Result<Self, InternalError> {
1945        if !bytes.is_empty() {
1946            return Err(InternalError::persisted_row_field_payload_must_be_empty(
1947                field_name, "unit",
1948            ));
1949        }
1950
1951        Ok(())
1952    }
1953}
1954
1955///
1956/// TESTS
1957///
1958
1959#[cfg(test)]
1960mod tests {
1961    use super::{
1962        FieldSlot, ScalarSlotValueRef, ScalarValueRef, SlotBufferWriter, SlotReader, SlotWriter,
1963        UpdatePatch, apply_update_patch_to_raw_row, decode_slot_value_by_contract,
1964        decode_slot_value_from_bytes, encode_scalar_slot_value, encode_slot_value_from_value,
1965        serialize_entity_slots_as_update_patch, serialize_update_patch_fields,
1966    };
1967    use crate::{
1968        db::{
1969            codec::serialize_row_payload,
1970            data::{RawRow, StructuralSlotReader},
1971        },
1972        model::{
1973            EntityModel,
1974            field::{EnumVariantModel, FieldKind, FieldModel, FieldStorageDecode},
1975        },
1976        testing::SIMPLE_ENTITY_TAG,
1977        traits::EntitySchema,
1978        types::{Account, Principal, Subaccount},
1979        value::{Value, ValueEnum},
1980    };
1981    use icydb_derive::{FieldProjection, PersistedRow};
1982    use serde::{Deserialize, Serialize};
1983
1984    crate::test_canister! {
1985        ident = PersistedRowPatchBridgeCanister,
1986        commit_memory_id = crate::testing::test_commit_memory_id(),
1987    }
1988
1989    crate::test_store! {
1990        ident = PersistedRowPatchBridgeStore,
1991        canister = PersistedRowPatchBridgeCanister,
1992    }
1993
1994    ///
1995    /// PersistedRowPatchBridgeEntity
1996    ///
1997    /// PersistedRowPatchBridgeEntity
1998    ///
1999    /// PersistedRowPatchBridgeEntity is the smallest derive-owned entity used
2000    /// to validate the typed-entity -> serialized-patch bridge.
2001    /// It lets the persisted-row tests exercise the same dense slot writer the
2002    /// save/update path now uses.
2003    ///
2004
2005    #[derive(
2006        Clone, Debug, Default, Deserialize, FieldProjection, PartialEq, PersistedRow, Serialize,
2007    )]
2008    struct PersistedRowPatchBridgeEntity {
2009        id: crate::types::Ulid,
2010        name: String,
2011    }
2012
2013    crate::test_entity_schema! {
2014        ident = PersistedRowPatchBridgeEntity,
2015        id = crate::types::Ulid,
2016        id_field = id,
2017        entity_name = "PersistedRowPatchBridgeEntity",
2018        entity_tag = SIMPLE_ENTITY_TAG,
2019        pk_index = 0,
2020        fields = [
2021            ("id", FieldKind::Ulid),
2022            ("name", FieldKind::Text),
2023        ],
2024        indexes = [],
2025        store = PersistedRowPatchBridgeStore,
2026        canister = PersistedRowPatchBridgeCanister,
2027    }
2028
2029    static STATE_VARIANTS: &[EnumVariantModel] = &[EnumVariantModel::new(
2030        "Loaded",
2031        Some(&FieldKind::Uint),
2032        FieldStorageDecode::ByKind,
2033    )];
2034    static FIELD_MODELS: [FieldModel; 2] = [
2035        FieldModel::new("name", FieldKind::Text),
2036        FieldModel::new_with_storage_decode("payload", FieldKind::Text, FieldStorageDecode::Value),
2037    ];
2038    static LIST_FIELD_MODELS: [FieldModel; 1] =
2039        [FieldModel::new("tags", FieldKind::List(&FieldKind::Text))];
2040    static MAP_FIELD_MODELS: [FieldModel; 1] = [FieldModel::new(
2041        "props",
2042        FieldKind::Map {
2043            key: &FieldKind::Text,
2044            value: &FieldKind::Uint,
2045        },
2046    )];
2047    static ENUM_FIELD_MODELS: [FieldModel; 1] = [FieldModel::new(
2048        "state",
2049        FieldKind::Enum {
2050            path: "tests::State",
2051            variants: STATE_VARIANTS,
2052        },
2053    )];
2054    static ACCOUNT_FIELD_MODELS: [FieldModel; 1] = [FieldModel::new("owner", FieldKind::Account)];
2055    static INDEX_MODELS: [&crate::model::index::IndexModel; 0] = [];
2056    static TEST_MODEL: EntityModel = EntityModel::new(
2057        "tests::PersistedRowFieldCodecEntity",
2058        "persisted_row_field_codec_entity",
2059        &FIELD_MODELS[0],
2060        &FIELD_MODELS,
2061        &INDEX_MODELS,
2062    );
2063    static LIST_MODEL: EntityModel = EntityModel::new(
2064        "tests::PersistedRowListFieldCodecEntity",
2065        "persisted_row_list_field_codec_entity",
2066        &LIST_FIELD_MODELS[0],
2067        &LIST_FIELD_MODELS,
2068        &INDEX_MODELS,
2069    );
2070    static MAP_MODEL: EntityModel = EntityModel::new(
2071        "tests::PersistedRowMapFieldCodecEntity",
2072        "persisted_row_map_field_codec_entity",
2073        &MAP_FIELD_MODELS[0],
2074        &MAP_FIELD_MODELS,
2075        &INDEX_MODELS,
2076    );
2077    static ENUM_MODEL: EntityModel = EntityModel::new(
2078        "tests::PersistedRowEnumFieldCodecEntity",
2079        "persisted_row_enum_field_codec_entity",
2080        &ENUM_FIELD_MODELS[0],
2081        &ENUM_FIELD_MODELS,
2082        &INDEX_MODELS,
2083    );
2084    static ACCOUNT_MODEL: EntityModel = EntityModel::new(
2085        "tests::PersistedRowAccountFieldCodecEntity",
2086        "persisted_row_account_field_codec_entity",
2087        &ACCOUNT_FIELD_MODELS[0],
2088        &ACCOUNT_FIELD_MODELS,
2089        &INDEX_MODELS,
2090    );
2091
2092    #[test]
2093    fn decode_slot_value_from_bytes_decodes_scalar_slots_through_one_owner() {
2094        let payload =
2095            encode_scalar_slot_value(ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")));
2096        let value =
2097            decode_slot_value_from_bytes(&TEST_MODEL, 0, payload.as_slice()).expect("decode slot");
2098
2099        assert_eq!(value, Value::Text("Ada".to_string()));
2100    }
2101
2102    #[test]
2103    fn decode_slot_value_from_bytes_respects_value_storage_decode_contract() {
2104        let payload = crate::serialize::serialize(&Value::Text("Ada".to_string()))
2105            .expect("encode value-storage payload");
2106
2107        let value =
2108            decode_slot_value_from_bytes(&TEST_MODEL, 1, payload.as_slice()).expect("decode slot");
2109
2110        assert_eq!(value, Value::Text("Ada".to_string()));
2111    }
2112
2113    #[test]
2114    fn encode_slot_value_from_value_roundtrips_scalar_slots() {
2115        let payload = encode_slot_value_from_value(&TEST_MODEL, 0, &Value::Text("Ada".to_string()))
2116            .expect("encode slot");
2117        let decoded =
2118            decode_slot_value_from_bytes(&TEST_MODEL, 0, payload.as_slice()).expect("decode slot");
2119
2120        assert_eq!(decoded, Value::Text("Ada".to_string()));
2121    }
2122
2123    #[test]
2124    fn encode_slot_value_from_value_roundtrips_value_storage_slots() {
2125        let payload = encode_slot_value_from_value(&TEST_MODEL, 1, &Value::Text("Ada".to_string()))
2126            .expect("encode slot");
2127        let decoded =
2128            decode_slot_value_from_bytes(&TEST_MODEL, 1, payload.as_slice()).expect("decode slot");
2129
2130        assert_eq!(decoded, Value::Text("Ada".to_string()));
2131    }
2132
2133    #[test]
2134    fn encode_slot_value_from_value_roundtrips_list_by_kind_slots() {
2135        let payload = encode_slot_value_from_value(
2136            &LIST_MODEL,
2137            0,
2138            &Value::List(vec![Value::Text("alpha".to_string())]),
2139        )
2140        .expect("encode list slot");
2141        let decoded =
2142            decode_slot_value_from_bytes(&LIST_MODEL, 0, payload.as_slice()).expect("decode slot");
2143
2144        assert_eq!(decoded, Value::List(vec![Value::Text("alpha".to_string())]),);
2145    }
2146
2147    #[test]
2148    fn encode_slot_value_from_value_roundtrips_map_by_kind_slots() {
2149        let payload = encode_slot_value_from_value(
2150            &MAP_MODEL,
2151            0,
2152            &Value::Map(vec![(Value::Text("alpha".to_string()), Value::Uint(7))]),
2153        )
2154        .expect("encode map slot");
2155        let decoded =
2156            decode_slot_value_from_bytes(&MAP_MODEL, 0, payload.as_slice()).expect("decode slot");
2157
2158        assert_eq!(
2159            decoded,
2160            Value::Map(vec![(Value::Text("alpha".to_string()), Value::Uint(7))]),
2161        );
2162    }
2163
2164    #[test]
2165    fn encode_slot_value_from_value_roundtrips_enum_by_kind_slots() {
2166        let payload = encode_slot_value_from_value(
2167            &ENUM_MODEL,
2168            0,
2169            &Value::Enum(
2170                ValueEnum::new("Loaded", Some("tests::State")).with_payload(Value::Uint(7)),
2171            ),
2172        )
2173        .expect("encode enum slot");
2174        let decoded =
2175            decode_slot_value_from_bytes(&ENUM_MODEL, 0, payload.as_slice()).expect("decode slot");
2176
2177        assert_eq!(
2178            decoded,
2179            Value::Enum(
2180                ValueEnum::new("Loaded", Some("tests::State")).with_payload(Value::Uint(7,))
2181            ),
2182        );
2183    }
2184
2185    #[test]
2186    fn encode_slot_value_from_value_roundtrips_leaf_by_kind_wrapper_slots() {
2187        let account = Account::from_parts(Principal::dummy(7), Some(Subaccount::from([7_u8; 32])));
2188        let payload = encode_slot_value_from_value(&ACCOUNT_MODEL, 0, &Value::Account(account))
2189            .expect("encode account slot");
2190        let decoded = decode_slot_value_from_bytes(&ACCOUNT_MODEL, 0, payload.as_slice())
2191            .expect("decode slot");
2192
2193        assert_eq!(decoded, Value::Account(account));
2194    }
2195
2196    #[test]
2197    fn encode_slot_value_from_value_rejects_unknown_enum_payload_variants() {
2198        let err = encode_slot_value_from_value(
2199            &ENUM_MODEL,
2200            0,
2201            &Value::Enum(
2202                ValueEnum::new("Unknown", Some("tests::State")).with_payload(Value::Uint(7)),
2203            ),
2204        )
2205        .expect_err("unknown enum payload should fail closed");
2206
2207        assert!(err.message.contains("unknown enum variant"));
2208    }
2209
2210    #[test]
2211    fn structural_slot_reader_and_direct_decode_share_the_same_field_codec_boundary() {
2212        let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
2213        let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
2214            .expect("encode value-storage payload");
2215        writer
2216            .write_scalar(0, ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")))
2217            .expect("write scalar slot");
2218        writer
2219            .write_slot(1, Some(payload.as_slice()))
2220            .expect("write value-storage slot");
2221        let raw_row = RawRow::try_new(
2222            serialize_row_payload(writer.finish().expect("finish slot payload"))
2223                .expect("serialize row payload"),
2224        )
2225        .expect("build raw row");
2226
2227        let direct_slots =
2228            StructuralSlotReader::from_raw_row(&raw_row, &TEST_MODEL).expect("decode row");
2229        let mut cached_slots =
2230            StructuralSlotReader::from_raw_row(&raw_row, &TEST_MODEL).expect("decode row");
2231
2232        let direct_name = decode_slot_value_by_contract(&direct_slots, 0).expect("decode name");
2233        let direct_payload =
2234            decode_slot_value_by_contract(&direct_slots, 1).expect("decode payload");
2235        let cached_name = cached_slots.get_value(0).expect("cached name");
2236        let cached_payload = cached_slots.get_value(1).expect("cached payload");
2237
2238        assert_eq!(direct_name, cached_name);
2239        assert_eq!(direct_payload, cached_payload);
2240    }
2241
2242    #[test]
2243    fn apply_update_patch_to_raw_row_updates_only_targeted_slots() {
2244        let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
2245        let payload = crate::serialize::serialize(&Value::Text("payload".to_string()))
2246            .expect("encode value-storage payload");
2247        writer
2248            .write_scalar(0, ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")))
2249            .expect("write scalar slot");
2250        writer
2251            .write_slot(1, Some(payload.as_slice()))
2252            .expect("write value-storage slot");
2253        let raw_row = RawRow::try_new(
2254            serialize_row_payload(writer.finish().expect("finish slot payload"))
2255                .expect("serialize row payload"),
2256        )
2257        .expect("build raw row");
2258        let patch = UpdatePatch::new().set(
2259            FieldSlot::from_index(&TEST_MODEL, 0).expect("resolve slot"),
2260            Value::Text("Grace".to_string()),
2261        );
2262
2263        let patched =
2264            apply_update_patch_to_raw_row(&TEST_MODEL, &raw_row, &patch).expect("apply patch");
2265        let mut reader =
2266            StructuralSlotReader::from_raw_row(&patched, &TEST_MODEL).expect("decode row");
2267
2268        assert_eq!(
2269            reader.get_value(0).expect("decode slot"),
2270            Some(Value::Text("Grace".to_string()))
2271        );
2272        assert_eq!(
2273            reader.get_value(1).expect("decode slot"),
2274            Some(Value::Text("payload".to_string()))
2275        );
2276    }
2277
2278    #[test]
2279    fn serialize_update_patch_fields_encodes_canonical_slot_payloads() {
2280        let patch = UpdatePatch::new()
2281            .set(
2282                FieldSlot::from_index(&TEST_MODEL, 0).expect("resolve slot"),
2283                Value::Text("Grace".to_string()),
2284            )
2285            .set(
2286                FieldSlot::from_index(&TEST_MODEL, 1).expect("resolve slot"),
2287                Value::Text("payload".to_string()),
2288            );
2289
2290        let serialized =
2291            serialize_update_patch_fields(&TEST_MODEL, &patch).expect("serialize patch");
2292
2293        assert_eq!(serialized.entries().len(), 2);
2294        assert_eq!(
2295            decode_slot_value_from_bytes(
2296                &TEST_MODEL,
2297                serialized.entries()[0].slot().index(),
2298                serialized.entries()[0]
2299                    .payload()
2300                    .expect("serialized field update should carry payload"),
2301            )
2302            .expect("decode slot payload"),
2303            Value::Text("Grace".to_string())
2304        );
2305        assert_eq!(
2306            decode_slot_value_from_bytes(
2307                &TEST_MODEL,
2308                serialized.entries()[1].slot().index(),
2309                serialized.entries()[1]
2310                    .payload()
2311                    .expect("serialized field update should carry payload"),
2312            )
2313            .expect("decode slot payload"),
2314            Value::Text("payload".to_string())
2315        );
2316    }
2317
2318    #[test]
2319    fn apply_update_patch_to_raw_row_uses_last_write_wins() {
2320        let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
2321        writer
2322            .write_scalar(0, ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")))
2323            .expect("write scalar slot");
2324        let raw_row = RawRow::try_new(
2325            serialize_row_payload(writer.finish().expect("finish slot payload"))
2326                .expect("serialize row payload"),
2327        )
2328        .expect("build raw row");
2329        let slot = FieldSlot::from_index(&TEST_MODEL, 0).expect("resolve slot");
2330        let patch = UpdatePatch::new()
2331            .set(slot, Value::Text("Grace".to_string()))
2332            .set(slot, Value::Text("Lin".to_string()));
2333
2334        let patched =
2335            apply_update_patch_to_raw_row(&TEST_MODEL, &raw_row, &patch).expect("apply patch");
2336        let mut reader =
2337            StructuralSlotReader::from_raw_row(&patched, &TEST_MODEL).expect("decode row");
2338
2339        assert_eq!(
2340            reader.get_value(0).expect("decode slot"),
2341            Some(Value::Text("Lin".to_string()))
2342        );
2343    }
2344
2345    #[test]
2346    fn apply_update_patch_to_raw_row_can_fill_previously_absent_slot() {
2347        let raw_row = RawRow::try_new(
2348            serialize_row_payload(
2349                SlotBufferWriter::for_model(&TEST_MODEL)
2350                    .finish()
2351                    .expect("finish slot payload"),
2352            )
2353            .expect("serialize row payload"),
2354        )
2355        .expect("build raw row");
2356        let patch = UpdatePatch::new().set(
2357            FieldSlot::from_index(&TEST_MODEL, 1).expect("resolve slot"),
2358            Value::Text("payload".to_string()),
2359        );
2360
2361        let patched =
2362            apply_update_patch_to_raw_row(&TEST_MODEL, &raw_row, &patch).expect("apply patch");
2363        let mut reader =
2364            StructuralSlotReader::from_raw_row(&patched, &TEST_MODEL).expect("decode row");
2365
2366        assert_eq!(
2367            reader.get_value(1).expect("decode slot"),
2368            Some(Value::Text("payload".to_string()))
2369        );
2370    }
2371
2372    #[test]
2373    fn apply_serialized_update_patch_to_raw_row_replays_preencoded_slots() {
2374        let mut writer = SlotBufferWriter::for_model(&TEST_MODEL);
2375        writer
2376            .write_scalar(0, ScalarSlotValueRef::Value(ScalarValueRef::Text("Ada")))
2377            .expect("write scalar slot");
2378        let raw_row = RawRow::try_new(
2379            serialize_row_payload(writer.finish().expect("finish slot payload"))
2380                .expect("serialize row payload"),
2381        )
2382        .expect("build raw row");
2383        let patch = UpdatePatch::new().set(
2384            FieldSlot::from_index(&TEST_MODEL, 0).expect("resolve slot"),
2385            Value::Text("Grace".to_string()),
2386        );
2387        let serialized =
2388            serialize_update_patch_fields(&TEST_MODEL, &patch).expect("serialize patch");
2389
2390        let patched = raw_row
2391            .apply_serialized_update_patch(&TEST_MODEL, &serialized)
2392            .expect("apply serialized patch");
2393        let mut reader =
2394            StructuralSlotReader::from_raw_row(&patched, &TEST_MODEL).expect("decode row");
2395
2396        assert_eq!(
2397            reader.get_value(0).expect("decode slot"),
2398            Some(Value::Text("Grace".to_string()))
2399        );
2400    }
2401
2402    #[test]
2403    fn serialize_entity_slots_as_update_patch_replays_full_typed_after_image() {
2404        let old_entity = PersistedRowPatchBridgeEntity {
2405            id: crate::types::Ulid::from_u128(7),
2406            name: "Ada".to_string(),
2407        };
2408        let new_entity = PersistedRowPatchBridgeEntity {
2409            id: crate::types::Ulid::from_u128(7),
2410            name: "Grace".to_string(),
2411        };
2412        let raw_row = RawRow::from_entity(&old_entity).expect("encode old row");
2413        let old_decoded = raw_row
2414            .try_decode::<PersistedRowPatchBridgeEntity>()
2415            .expect("decode old entity");
2416        let serialized =
2417            serialize_entity_slots_as_update_patch(&new_entity).expect("serialize entity patch");
2418
2419        let patched = raw_row
2420            .apply_serialized_update_patch(PersistedRowPatchBridgeEntity::MODEL, &serialized)
2421            .expect("apply serialized patch");
2422        let decoded = patched
2423            .try_decode::<PersistedRowPatchBridgeEntity>()
2424            .expect("decode patched entity");
2425
2426        assert_eq!(old_decoded, old_entity);
2427        assert_eq!(decoded, new_entity);
2428    }
2429}