Skip to main content

icydb_core/db/data/
persisted_row.rs

1//! Module: data::persisted_row
2//! Responsibility: slot-oriented persisted-row seams over runtime row bytes.
3//! Does not own: row envelope versions, typed entity materialization, or query semantics.
4//! Boundary: commit/index planning, row writes, and typed materialization all
5//! consume the canonical slot-oriented persisted-row boundary here.
6
7use crate::{
8    db::data::{
9        DataKey, RawRow, StructuralRowDecodeError, StructuralRowFieldBytes,
10        decode_storage_key_field_bytes, decode_structural_field_by_kind_bytes,
11        decode_structural_value_storage_bytes,
12    },
13    error::InternalError,
14    model::{
15        entity::{EntityModel, resolve_primary_key_slot},
16        field::{FieldModel, LeafCodec, ScalarCodec},
17    },
18    serialize::{deserialize, serialize},
19    traits::EntityKind,
20    types::{Blob, Date, Duration, Float32, Float64, Principal, Subaccount, Timestamp, Ulid},
21    value::{StorageKey, Value},
22};
23use std::str;
24
25const SCALAR_SLOT_PREFIX: u8 = 0xFF;
26const SCALAR_SLOT_TAG_NULL: u8 = 0;
27const SCALAR_SLOT_TAG_VALUE: u8 = 1;
28
29const SCALAR_BOOL_PAYLOAD_LEN: usize = 1;
30const SCALAR_WORD32_PAYLOAD_LEN: usize = 4;
31const SCALAR_WORD64_PAYLOAD_LEN: usize = 8;
32const SCALAR_ULID_PAYLOAD_LEN: usize = 16;
33const SCALAR_SUBACCOUNT_PAYLOAD_LEN: usize = 32;
34
35const SCALAR_BOOL_FALSE_TAG: u8 = 0;
36const SCALAR_BOOL_TRUE_TAG: u8 = 1;
37
38///
39/// SlotReader
40///
41/// SlotReader exposes one persisted row as stable slot-addressable fields.
42/// Callers may inspect field presence, borrow raw field bytes, or decode one
43/// field value on demand.
44///
45
46pub trait SlotReader {
47    /// Return the structural model that owns this slot mapping.
48    fn model(&self) -> &'static EntityModel;
49
50    /// Return whether the given slot is present in the persisted row.
51    fn has(&self, slot: usize) -> bool;
52
53    /// Borrow the raw persisted payload for one slot when present.
54    fn get_bytes(&self, slot: usize) -> Option<&[u8]>;
55
56    /// Decode one slot as a scalar leaf when the field model declares a scalar codec.
57    fn get_scalar(&self, slot: usize) -> Result<Option<ScalarSlotValueRef<'_>>, InternalError>;
58
59    /// Decode one slot value on demand using the field contract declared by the model.
60    fn get_value(&mut self, slot: usize) -> Result<Option<Value>, InternalError>;
61}
62
63///
64/// SlotWriter
65///
66/// SlotWriter is the canonical row-container output seam used by persisted-row
67/// writers.
68///
69
70pub trait SlotWriter {
71    /// Record one slot payload for the current row.
72    fn write_slot(&mut self, slot: usize, payload: Option<&[u8]>) -> Result<(), InternalError>;
73
74    /// Record one scalar slot payload using the canonical scalar leaf envelope.
75    fn write_scalar(
76        &mut self,
77        slot: usize,
78        value: ScalarSlotValueRef<'_>,
79    ) -> Result<(), InternalError> {
80        let payload = encode_scalar_slot_value(value);
81
82        self.write_slot(slot, Some(payload.as_slice()))
83    }
84}
85
86///
87/// PersistedRow
88///
89/// PersistedRow is the derive-owned bridge between typed entities and
90/// slot-addressable persisted rows.
91/// It owns entity-specific materialization/default semantics while runtime
92/// paths stay structural at the row boundary.
93///
94
95pub trait PersistedRow: EntityKind + Sized {
96    /// Materialize one typed entity from one slot reader.
97    fn materialize_from_slots(slots: &mut dyn SlotReader) -> Result<Self, InternalError>;
98
99    /// Write one typed entity into one slot writer.
100    fn write_slots(&self, out: &mut dyn SlotWriter) -> Result<(), InternalError>;
101
102    /// Decode one slot value needed by structural planner/projection consumers.
103    fn project_slot(slots: &mut dyn SlotReader, slot: usize) -> Result<Option<Value>, InternalError>
104    where
105        Self: crate::traits::FieldProjection,
106    {
107        let entity = Self::materialize_from_slots(slots)?;
108
109        Ok(<Self as crate::traits::FieldProjection>::get_value_by_index(&entity, slot))
110    }
111}
112
113/// Decode one slot value through the declared field contract without routing
114/// through `SlotReader::get_value`.
115pub(in crate::db) fn decode_slot_value_by_contract(
116    slots: &dyn SlotReader,
117    slot: usize,
118) -> Result<Option<Value>, InternalError> {
119    let field = slots.model().fields().get(slot).ok_or_else(|| {
120        InternalError::index_invariant(format!(
121            "slot lookup outside model bounds during structural row access: model='{}' slot={slot}",
122            slots.model().path(),
123        ))
124    })?;
125
126    if matches!(field.leaf_codec(), LeafCodec::Scalar(_))
127        && let Some(value) = slots.get_scalar(slot)?
128    {
129        return Ok(Some(match value {
130            ScalarSlotValueRef::Null => Value::Null,
131            ScalarSlotValueRef::Value(value) => value.into_value(),
132        }));
133    }
134
135    match slots.get_bytes(slot) {
136        Some(raw_value) => decode_non_scalar_slot_value(raw_value, field).map(Some),
137        None => Ok(None),
138    }
139}
140
141// Decode one non-scalar slot through the exact persisted contract declared by
142// the field model.
143fn decode_non_scalar_slot_value(
144    raw_value: &[u8],
145    field: &FieldModel,
146) -> Result<Value, InternalError> {
147    let decoded = match field.storage_decode() {
148        crate::model::field::FieldStorageDecode::ByKind => {
149            decode_structural_field_by_kind_bytes(raw_value, field.kind())
150        }
151        crate::model::field::FieldStorageDecode::Value => {
152            decode_structural_value_storage_bytes(raw_value)
153        }
154    };
155
156    decoded.map_err(|err| {
157        InternalError::serialize_corruption(format!(
158            "row decode failed for field '{}' kind={:?}: {err}",
159            field.name(),
160            field.kind(),
161        ))
162    })
163}
164
165///
166/// ScalarValueRef
167///
168/// ScalarValueRef is the borrowed-or-copy scalar payload view returned by the
169/// slot-reader fast path.
170/// It preserves cheap references for text/blob payloads while keeping fixed
171/// width scalar wrappers as copy values.
172///
173
174#[derive(Clone, Copy, Debug)]
175pub enum ScalarValueRef<'a> {
176    Blob(&'a [u8]),
177    Bool(bool),
178    Date(Date),
179    Duration(Duration),
180    Float32(Float32),
181    Float64(Float64),
182    Int(i64),
183    Principal(Principal),
184    Subaccount(Subaccount),
185    Text(&'a str),
186    Timestamp(Timestamp),
187    Uint(u64),
188    Ulid(Ulid),
189    Unit,
190}
191
192impl ScalarValueRef<'_> {
193    /// Materialize this scalar view into the runtime `Value` enum.
194    #[must_use]
195    pub fn into_value(self) -> Value {
196        match self {
197            Self::Blob(value) => Value::Blob(value.to_vec()),
198            Self::Bool(value) => Value::Bool(value),
199            Self::Date(value) => Value::Date(value),
200            Self::Duration(value) => Value::Duration(value),
201            Self::Float32(value) => Value::Float32(value),
202            Self::Float64(value) => Value::Float64(value),
203            Self::Int(value) => Value::Int(value),
204            Self::Principal(value) => Value::Principal(value),
205            Self::Subaccount(value) => Value::Subaccount(value),
206            Self::Text(value) => Value::Text(value.to_owned()),
207            Self::Timestamp(value) => Value::Timestamp(value),
208            Self::Uint(value) => Value::Uint(value),
209            Self::Ulid(value) => Value::Ulid(value),
210            Self::Unit => Value::Unit,
211        }
212    }
213}
214
215///
216/// ScalarSlotValueRef
217///
218/// ScalarSlotValueRef preserves the distinction between a missing slot and an
219/// explicitly persisted `NULL` scalar payload.
220/// The outer `Option` from `SlotReader::get_scalar` therefore still means
221/// "slot absent".
222///
223
224#[derive(Clone, Copy, Debug)]
225pub enum ScalarSlotValueRef<'a> {
226    Null,
227    Value(ScalarValueRef<'a>),
228}
229
230///
231/// PersistedScalar
232///
233/// PersistedScalar defines the canonical binary payload codec for one scalar
234/// leaf type.
235/// Derive-generated persisted-row materializers and writers use this trait to
236/// avoid routing scalar fields back through CBOR.
237///
238
239pub trait PersistedScalar: Sized {
240    /// Canonical scalar codec identifier used by schema/runtime metadata.
241    const CODEC: ScalarCodec;
242
243    /// Encode this scalar value into its codec-specific payload bytes.
244    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError>;
245
246    /// Decode this scalar value from its codec-specific payload bytes.
247    fn decode_scalar_payload(bytes: &[u8], field_name: &'static str)
248    -> Result<Self, InternalError>;
249}
250
251/// Encode one persisted slot payload using the shared leaf codec boundary.
252pub fn encode_persisted_slot_payload<T>(
253    value: &T,
254    field_name: &'static str,
255) -> Result<Vec<u8>, InternalError>
256where
257    T: serde::Serialize,
258{
259    serialize(value).map_err(|err| {
260        InternalError::serialize_internal(format!(
261            "row encode failed for field '{field_name}': {err}",
262        ))
263    })
264}
265
266/// Encode one persisted scalar slot payload using the canonical scalar envelope.
267pub fn encode_persisted_scalar_slot_payload<T>(
268    value: &T,
269    field_name: &'static str,
270) -> Result<Vec<u8>, InternalError>
271where
272    T: PersistedScalar,
273{
274    let payload = value.encode_scalar_payload()?;
275    let mut encoded = Vec::with_capacity(payload.len() + 2);
276    encoded.push(SCALAR_SLOT_PREFIX);
277    encoded.push(SCALAR_SLOT_TAG_VALUE);
278    encoded.extend_from_slice(&payload);
279
280    if encoded.len() < 2 {
281        return Err(InternalError::serialize_internal(format!(
282            "row encode failed for field '{field_name}': scalar payload envelope underflow",
283        )));
284    }
285
286    Ok(encoded)
287}
288
289/// Encode one optional persisted scalar slot payload preserving explicit `NULL`.
290pub fn encode_persisted_option_scalar_slot_payload<T>(
291    value: &Option<T>,
292    field_name: &'static str,
293) -> Result<Vec<u8>, InternalError>
294where
295    T: PersistedScalar,
296{
297    match value {
298        Some(value) => encode_persisted_scalar_slot_payload(value, field_name),
299        None => Ok(vec![SCALAR_SLOT_PREFIX, SCALAR_SLOT_TAG_NULL]),
300    }
301}
302
303/// Decode one persisted slot payload using the shared leaf codec boundary.
304pub fn decode_persisted_slot_payload<T>(
305    bytes: &[u8],
306    field_name: &'static str,
307) -> Result<T, InternalError>
308where
309    T: serde::de::DeserializeOwned,
310{
311    deserialize(bytes).map_err(|err| {
312        InternalError::serialize_corruption(format!(
313            "row decode failed for field '{field_name}': {err}",
314        ))
315    })
316}
317
318/// Decode one persisted scalar slot payload using the canonical scalar envelope.
319pub fn decode_persisted_scalar_slot_payload<T>(
320    bytes: &[u8],
321    field_name: &'static str,
322) -> Result<T, InternalError>
323where
324    T: PersistedScalar,
325{
326    let payload = decode_scalar_slot_payload_body(bytes, field_name)?.ok_or_else(|| {
327        InternalError::serialize_corruption(format!(
328            "row decode failed for field '{field_name}': unexpected null for non-nullable scalar field",
329        ))
330    })?;
331
332    T::decode_scalar_payload(payload, field_name)
333}
334
335/// Decode one optional persisted scalar slot payload preserving explicit `NULL`.
336pub fn decode_persisted_option_scalar_slot_payload<T>(
337    bytes: &[u8],
338    field_name: &'static str,
339) -> Result<Option<T>, InternalError>
340where
341    T: PersistedScalar,
342{
343    let Some(payload) = decode_scalar_slot_payload_body(bytes, field_name)? else {
344        return Ok(None);
345    };
346
347    T::decode_scalar_payload(payload, field_name).map(Some)
348}
349
350///
351/// SlotBufferWriter
352///
353/// SlotBufferWriter captures one row worth of slot payloads before they are
354/// encoded into the canonical slot container.
355///
356
357pub(in crate::db) struct SlotBufferWriter {
358    slots: Vec<Option<Vec<u8>>>,
359}
360
361impl SlotBufferWriter {
362    /// Build one empty slot buffer for one entity model.
363    pub(in crate::db) fn for_model(model: &'static EntityModel) -> Self {
364        Self {
365            slots: vec![None; model.fields().len()],
366        }
367    }
368
369    /// Encode the buffered slots into the canonical row payload.
370    pub(in crate::db) fn finish(self) -> Result<Vec<u8>, InternalError> {
371        let field_count = u16::try_from(self.slots.len()).map_err(|_| {
372            InternalError::serialize_internal(format!(
373                "row encode failed: field count {} exceeds u16 slot table capacity",
374                self.slots.len(),
375            ))
376        })?;
377        let mut payload_bytes = Vec::new();
378        let mut slot_table = Vec::with_capacity(self.slots.len());
379
380        // Phase 1: assign each present slot one payload span inside the data section.
381        for slot_payload in self.slots {
382            match slot_payload {
383                Some(bytes) => {
384                    let start = u32::try_from(payload_bytes.len()).map_err(|_| {
385                        InternalError::serialize_internal(
386                            "row encode failed: slot payload start exceeds u32 range",
387                        )
388                    })?;
389                    let len = u32::try_from(bytes.len()).map_err(|_| {
390                        InternalError::serialize_internal(
391                            "row encode failed: slot payload length exceeds u32 range",
392                        )
393                    })?;
394                    payload_bytes.extend_from_slice(&bytes);
395                    slot_table.push((start, len));
396                }
397                None => slot_table.push((0, 0)),
398            }
399        }
400
401        // Phase 2: write the fixed-width slot header followed by concatenated payloads.
402        let mut encoded = Vec::with_capacity(
403            usize::from(field_count) * (u32::BITS as usize / 4) + 2 + payload_bytes.len(),
404        );
405        encoded.extend_from_slice(&field_count.to_be_bytes());
406        for (start, len) in slot_table {
407            encoded.extend_from_slice(&start.to_be_bytes());
408            encoded.extend_from_slice(&len.to_be_bytes());
409        }
410        encoded.extend_from_slice(&payload_bytes);
411
412        Ok(encoded)
413    }
414}
415
416impl SlotWriter for SlotBufferWriter {
417    fn write_slot(&mut self, slot: usize, payload: Option<&[u8]>) -> Result<(), InternalError> {
418        let entry = self.slots.get_mut(slot).ok_or_else(|| {
419            InternalError::serialize_internal(format!(
420                "row encode failed: slot {slot} is outside the row layout",
421            ))
422        })?;
423        *entry = payload.map(<[u8]>::to_vec);
424
425        Ok(())
426    }
427}
428
429/// Encode one entity into the canonical slot-container payload.
430pub(in crate::db) fn encode_persisted_row<E>(entity: &E) -> Result<Vec<u8>, InternalError>
431where
432    E: PersistedRow,
433{
434    let mut writer = SlotBufferWriter::for_model(E::MODEL);
435    entity.write_slots(&mut writer)?;
436    writer.finish()
437}
438
439///
440/// StructuralSlotReader
441///
442/// StructuralSlotReader adapts the current persisted-row bytes into the
443/// canonical slot-reader seam.
444/// It caches decoded field values lazily so repeated index/predicate reads do
445/// not re-run the same field decoder within one row planning pass.
446///
447
448pub(in crate::db) struct StructuralSlotReader<'a> {
449    model: &'static EntityModel,
450    field_bytes: StructuralRowFieldBytes<'a>,
451    cached_values: Vec<CachedSlotValue>,
452}
453
454impl<'a> StructuralSlotReader<'a> {
455    /// Build one slot reader over one persisted row using the current structural row scanner.
456    pub(in crate::db) fn from_raw_row(
457        raw_row: &'a RawRow,
458        model: &'static EntityModel,
459    ) -> Result<Self, InternalError> {
460        let field_bytes = StructuralRowFieldBytes::from_raw_row(raw_row, model)
461            .map_err(StructuralRowDecodeError::into_internal_error)?;
462        let cached_values = std::iter::repeat_with(|| CachedSlotValue::Pending)
463            .take(model.fields().len())
464            .collect();
465
466        Ok(Self {
467            model,
468            field_bytes,
469            cached_values,
470        })
471    }
472
473    /// Validate the decoded primary-key slot against the authoritative row key.
474    pub(in crate::db) fn validate_storage_key(
475        &self,
476        data_key: &DataKey,
477    ) -> Result<(), InternalError> {
478        let Some(primary_key_slot) = resolve_primary_key_slot(self.model) else {
479            return Err(InternalError::index_invariant(format!(
480                "entity primary key field missing during structural row validation: {}",
481                self.model.path(),
482            )));
483        };
484        let field = self.field_model(primary_key_slot)?;
485        let decoded_key = match self.get_scalar(primary_key_slot)? {
486            Some(ScalarSlotValueRef::Null) => None,
487            Some(ScalarSlotValueRef::Value(value)) => storage_key_from_scalar_ref(value),
488            None => match self.field_bytes.field(primary_key_slot) {
489                Some(raw_value) => Some(
490                    decode_storage_key_field_bytes(raw_value, field.kind).map_err(|err| {
491                        InternalError::serialize_corruption(format!(
492                            "row decode failed: primary-key value is not storage-key encodable: {data_key} ({err})",
493                        ))
494                    })?,
495                ),
496                None => None,
497            },
498        };
499        let Some(decoded_key) = decoded_key else {
500            return Err(InternalError::serialize_corruption(format!(
501                "row decode failed: missing primary-key slot while validating {data_key}",
502            )));
503        };
504        let expected_key = data_key.storage_key();
505
506        if decoded_key != expected_key {
507            return Err(InternalError::store_corruption(format!(
508                "row key mismatch: expected {expected_key}, found {decoded_key}",
509            )));
510        }
511
512        Ok(())
513    }
514
515    // Resolve one field model entry by stable slot index.
516    fn field_model(&self, slot: usize) -> Result<&FieldModel, InternalError> {
517        self.model.fields().get(slot).ok_or_else(|| {
518            InternalError::index_invariant(format!(
519                "slot lookup outside model bounds during structural row access: model='{}' slot={slot}",
520                self.model.path(),
521            ))
522        })
523    }
524}
525
526// Convert one scalar slot fast-path value into its storage-key form when the
527// field kind is storage-key-compatible.
528const fn storage_key_from_scalar_ref(value: ScalarValueRef<'_>) -> Option<StorageKey> {
529    match value {
530        ScalarValueRef::Int(value) => Some(StorageKey::Int(value)),
531        ScalarValueRef::Principal(value) => Some(StorageKey::Principal(value)),
532        ScalarValueRef::Subaccount(value) => Some(StorageKey::Subaccount(value)),
533        ScalarValueRef::Timestamp(value) => Some(StorageKey::Timestamp(value)),
534        ScalarValueRef::Uint(value) => Some(StorageKey::Uint(value)),
535        ScalarValueRef::Ulid(value) => Some(StorageKey::Ulid(value)),
536        ScalarValueRef::Unit => Some(StorageKey::Unit),
537        _ => None,
538    }
539}
540
541impl SlotReader for StructuralSlotReader<'_> {
542    fn model(&self) -> &'static EntityModel {
543        self.model
544    }
545
546    fn has(&self, slot: usize) -> bool {
547        self.field_bytes.field(slot).is_some()
548    }
549
550    fn get_bytes(&self, slot: usize) -> Option<&[u8]> {
551        self.field_bytes.field(slot)
552    }
553
554    fn get_scalar(&self, slot: usize) -> Result<Option<ScalarSlotValueRef<'_>>, InternalError> {
555        let field = self.field_model(slot)?;
556        let Some(raw_value) = self.field_bytes.field(slot) else {
557            return Ok(None);
558        };
559
560        match field.leaf_codec() {
561            LeafCodec::Scalar(codec) => {
562                decode_scalar_slot_value(raw_value, codec, field.name()).map(Some)
563            }
564            LeafCodec::CborFallback => Ok(None),
565        }
566    }
567
568    fn get_value(&mut self, slot: usize) -> Result<Option<Value>, InternalError> {
569        let cached = self.cached_values.get(slot).ok_or_else(|| {
570            InternalError::index_invariant(format!(
571                "slot cache lookup outside model bounds during structural row access: model='{}' slot={slot}",
572                self.model.path(),
573            ))
574        })?;
575        if let CachedSlotValue::Decoded(value) = cached {
576            return Ok(value.clone());
577        }
578
579        let field = self.field_model(slot)?;
580        let value = match self.get_scalar(slot)? {
581            Some(ScalarSlotValueRef::Null) => Some(Value::Null),
582            Some(ScalarSlotValueRef::Value(value)) => Some(value.into_value()),
583            None => match self.field_bytes.field(slot) {
584                Some(raw_value) => Some(decode_non_scalar_slot_value(raw_value, field)?),
585                None => None,
586            },
587        };
588        self.cached_values[slot] = CachedSlotValue::Decoded(value.clone());
589
590        Ok(value)
591    }
592}
593
594///
595/// CachedSlotValue
596///
597/// CachedSlotValue tracks whether one slot has already been decoded during the
598/// current structural row access pass.
599///
600
601enum CachedSlotValue {
602    Pending,
603    Decoded(Option<Value>),
604}
605
606// Encode one scalar slot value into the canonical prefixed scalar envelope.
607fn encode_scalar_slot_value(value: ScalarSlotValueRef<'_>) -> Vec<u8> {
608    match value {
609        ScalarSlotValueRef::Null => vec![SCALAR_SLOT_PREFIX, SCALAR_SLOT_TAG_NULL],
610        ScalarSlotValueRef::Value(value) => {
611            let mut encoded = Vec::new();
612            encoded.push(SCALAR_SLOT_PREFIX);
613            encoded.push(SCALAR_SLOT_TAG_VALUE);
614
615            match value {
616                ScalarValueRef::Blob(bytes) => encoded.extend_from_slice(bytes),
617                ScalarValueRef::Bool(value) => encoded.push(u8::from(value)),
618                ScalarValueRef::Date(value) => {
619                    encoded.extend_from_slice(&value.as_days_since_epoch().to_le_bytes());
620                }
621                ScalarValueRef::Duration(value) => {
622                    encoded.extend_from_slice(&value.as_millis().to_le_bytes());
623                }
624                ScalarValueRef::Float32(value) => {
625                    encoded.extend_from_slice(&value.get().to_bits().to_le_bytes());
626                }
627                ScalarValueRef::Float64(value) => {
628                    encoded.extend_from_slice(&value.get().to_bits().to_le_bytes());
629                }
630                ScalarValueRef::Int(value) => encoded.extend_from_slice(&value.to_le_bytes()),
631                ScalarValueRef::Principal(value) => encoded.extend_from_slice(value.as_slice()),
632                ScalarValueRef::Subaccount(value) => encoded.extend_from_slice(&value.to_bytes()),
633                ScalarValueRef::Text(value) => encoded.extend_from_slice(value.as_bytes()),
634                ScalarValueRef::Timestamp(value) => {
635                    encoded.extend_from_slice(&value.as_millis().to_le_bytes());
636                }
637                ScalarValueRef::Uint(value) => encoded.extend_from_slice(&value.to_le_bytes()),
638                ScalarValueRef::Ulid(value) => encoded.extend_from_slice(&value.to_bytes()),
639                ScalarValueRef::Unit => {}
640            }
641
642            encoded
643        }
644    }
645}
646
647// Split one scalar slot envelope into `NULL` vs payload bytes.
648fn decode_scalar_slot_payload_body<'a>(
649    bytes: &'a [u8],
650    field_name: &'static str,
651) -> Result<Option<&'a [u8]>, InternalError> {
652    let Some((&prefix, rest)) = bytes.split_first() else {
653        return Err(InternalError::serialize_corruption(format!(
654            "row decode failed for field '{field_name}': empty scalar payload",
655        )));
656    };
657    if prefix != SCALAR_SLOT_PREFIX {
658        return Err(InternalError::serialize_corruption(format!(
659            "row decode failed for field '{field_name}': scalar payload prefix mismatch",
660        )));
661    }
662    let Some((&tag, payload)) = rest.split_first() else {
663        return Err(InternalError::serialize_corruption(format!(
664            "row decode failed for field '{field_name}': truncated scalar payload tag",
665        )));
666    };
667
668    match tag {
669        SCALAR_SLOT_TAG_NULL => {
670            if !payload.is_empty() {
671                return Err(InternalError::serialize_corruption(format!(
672                    "row decode failed for field '{field_name}': null scalar payload has trailing bytes",
673                )));
674            }
675
676            Ok(None)
677        }
678        SCALAR_SLOT_TAG_VALUE => Ok(Some(payload)),
679        _ => Err(InternalError::serialize_corruption(format!(
680            "row decode failed for field '{field_name}': invalid scalar payload tag {tag}",
681        ))),
682    }
683}
684
685// Decode one scalar slot view using the field-declared scalar codec.
686#[expect(clippy::too_many_lines)]
687fn decode_scalar_slot_value<'a>(
688    bytes: &'a [u8],
689    codec: ScalarCodec,
690    field_name: &'static str,
691) -> Result<ScalarSlotValueRef<'a>, InternalError> {
692    let Some(payload) = decode_scalar_slot_payload_body(bytes, field_name)? else {
693        return Ok(ScalarSlotValueRef::Null);
694    };
695
696    let value = match codec {
697        ScalarCodec::Blob => ScalarValueRef::Blob(payload),
698        ScalarCodec::Bool => {
699            let [value] = payload else {
700                return Err(InternalError::serialize_corruption(format!(
701                    "row decode failed for field '{field_name}': bool payload must be exactly {SCALAR_BOOL_PAYLOAD_LEN} byte",
702                )));
703            };
704            match *value {
705                SCALAR_BOOL_FALSE_TAG => ScalarValueRef::Bool(false),
706                SCALAR_BOOL_TRUE_TAG => ScalarValueRef::Bool(true),
707                _ => {
708                    return Err(InternalError::serialize_corruption(format!(
709                        "row decode failed for field '{field_name}': invalid bool payload byte {value}",
710                    )));
711                }
712            }
713        }
714        ScalarCodec::Date => {
715            let bytes: [u8; SCALAR_WORD32_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
716                InternalError::serialize_corruption(format!(
717                    "row decode failed for field '{field_name}': date payload must be exactly {SCALAR_WORD32_PAYLOAD_LEN} bytes",
718                ))
719            })?;
720            ScalarValueRef::Date(Date::from_days_since_epoch(i32::from_le_bytes(bytes)))
721        }
722        ScalarCodec::Duration => {
723            let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
724                InternalError::serialize_corruption(format!(
725                    "row decode failed for field '{field_name}': duration payload must be exactly {SCALAR_WORD64_PAYLOAD_LEN} bytes",
726                ))
727            })?;
728            ScalarValueRef::Duration(Duration::from_millis(u64::from_le_bytes(bytes)))
729        }
730        ScalarCodec::Float32 => {
731            let bytes: [u8; SCALAR_WORD32_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
732                InternalError::serialize_corruption(format!(
733                    "row decode failed for field '{field_name}': float32 payload must be exactly {SCALAR_WORD32_PAYLOAD_LEN} bytes",
734                ))
735            })?;
736            let value = f32::from_bits(u32::from_le_bytes(bytes));
737            let value = Float32::try_new(value).ok_or_else(|| {
738                InternalError::serialize_corruption(format!(
739                    "row decode failed for field '{field_name}': float32 payload is non-finite",
740                ))
741            })?;
742            ScalarValueRef::Float32(value)
743        }
744        ScalarCodec::Float64 => {
745            let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
746                InternalError::serialize_corruption(format!(
747                    "row decode failed for field '{field_name}': float64 payload must be exactly {SCALAR_WORD64_PAYLOAD_LEN} bytes",
748                ))
749            })?;
750            let value = f64::from_bits(u64::from_le_bytes(bytes));
751            let value = Float64::try_new(value).ok_or_else(|| {
752                InternalError::serialize_corruption(format!(
753                    "row decode failed for field '{field_name}': float64 payload is non-finite",
754                ))
755            })?;
756            ScalarValueRef::Float64(value)
757        }
758        ScalarCodec::Int64 => {
759            let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
760                InternalError::serialize_corruption(format!(
761                    "row decode failed for field '{field_name}': int payload must be exactly {SCALAR_WORD64_PAYLOAD_LEN} bytes",
762                ))
763            })?;
764            ScalarValueRef::Int(i64::from_le_bytes(bytes))
765        }
766        ScalarCodec::Principal => {
767            ScalarValueRef::Principal(Principal::try_from_bytes(payload).map_err(|err| {
768                InternalError::serialize_corruption(format!(
769                    "row decode failed for field '{field_name}': {err}",
770                ))
771            })?)
772        }
773        ScalarCodec::Subaccount => {
774            let bytes: [u8; SCALAR_SUBACCOUNT_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
775                InternalError::serialize_corruption(format!(
776                    "row decode failed for field '{field_name}': subaccount payload must be exactly {SCALAR_SUBACCOUNT_PAYLOAD_LEN} bytes",
777                ))
778            })?;
779            ScalarValueRef::Subaccount(Subaccount::from_array(bytes))
780        }
781        ScalarCodec::Text => {
782            let value = str::from_utf8(payload).map_err(|err| {
783                InternalError::serialize_corruption(format!(
784                    "row decode failed for field '{field_name}': invalid UTF-8 text payload ({err})",
785                ))
786            })?;
787            ScalarValueRef::Text(value)
788        }
789        ScalarCodec::Timestamp => {
790            let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
791                InternalError::serialize_corruption(format!(
792                    "row decode failed for field '{field_name}': timestamp payload must be exactly {SCALAR_WORD64_PAYLOAD_LEN} bytes",
793                ))
794            })?;
795            ScalarValueRef::Timestamp(Timestamp::from_millis(i64::from_le_bytes(bytes)))
796        }
797        ScalarCodec::Uint64 => {
798            let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
799                InternalError::serialize_corruption(format!(
800                    "row decode failed for field '{field_name}': uint payload must be exactly {SCALAR_WORD64_PAYLOAD_LEN} bytes",
801                ))
802            })?;
803            ScalarValueRef::Uint(u64::from_le_bytes(bytes))
804        }
805        ScalarCodec::Ulid => {
806            let bytes: [u8; SCALAR_ULID_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
807                InternalError::serialize_corruption(format!(
808                    "row decode failed for field '{field_name}': ulid payload must be exactly {SCALAR_ULID_PAYLOAD_LEN} bytes",
809                ))
810            })?;
811            ScalarValueRef::Ulid(Ulid::from_bytes(bytes))
812        }
813        ScalarCodec::Unit => {
814            if !payload.is_empty() {
815                return Err(InternalError::serialize_corruption(format!(
816                    "row decode failed for field '{field_name}': unit payload must be empty",
817                )));
818            }
819            ScalarValueRef::Unit
820        }
821    };
822
823    Ok(ScalarSlotValueRef::Value(value))
824}
825
826macro_rules! impl_persisted_scalar_signed {
827    ($($ty:ty),* $(,)?) => {
828        $(
829            impl PersistedScalar for $ty {
830                const CODEC: ScalarCodec = ScalarCodec::Int64;
831
832                fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
833                    Ok(i64::from(*self).to_le_bytes().to_vec())
834                }
835
836                fn decode_scalar_payload(
837                    bytes: &[u8],
838                    field_name: &'static str,
839                ) -> Result<Self, InternalError> {
840                    let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
841                        InternalError::serialize_corruption(format!(
842                            "row decode failed for field '{field_name}': int payload must be exactly {SCALAR_WORD64_PAYLOAD_LEN} bytes",
843                        ))
844                    })?;
845                    <$ty>::try_from(i64::from_le_bytes(raw)).map_err(|_| {
846                        InternalError::serialize_corruption(format!(
847                            "row decode failed for field '{field_name}': integer payload out of range for target type",
848                        ))
849                    })
850                }
851            }
852        )*
853    };
854}
855
856macro_rules! impl_persisted_scalar_unsigned {
857    ($($ty:ty),* $(,)?) => {
858        $(
859            impl PersistedScalar for $ty {
860                const CODEC: ScalarCodec = ScalarCodec::Uint64;
861
862                fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
863                    Ok(u64::from(*self).to_le_bytes().to_vec())
864                }
865
866                fn decode_scalar_payload(
867                    bytes: &[u8],
868                    field_name: &'static str,
869                ) -> Result<Self, InternalError> {
870                    let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
871                        InternalError::serialize_corruption(format!(
872                            "row decode failed for field '{field_name}': uint payload must be exactly {SCALAR_WORD64_PAYLOAD_LEN} bytes",
873                        ))
874                    })?;
875                    <$ty>::try_from(u64::from_le_bytes(raw)).map_err(|_| {
876                        InternalError::serialize_corruption(format!(
877                            "row decode failed for field '{field_name}': unsigned payload out of range for target type",
878                        ))
879                    })
880                }
881            }
882        )*
883    };
884}
885
886impl_persisted_scalar_signed!(i8, i16, i32, i64);
887impl_persisted_scalar_unsigned!(u8, u16, u32, u64);
888
889impl PersistedScalar for bool {
890    const CODEC: ScalarCodec = ScalarCodec::Bool;
891
892    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
893        Ok(vec![u8::from(*self)])
894    }
895
896    fn decode_scalar_payload(
897        bytes: &[u8],
898        field_name: &'static str,
899    ) -> Result<Self, InternalError> {
900        let [value] = bytes else {
901            return Err(InternalError::serialize_corruption(format!(
902                "row decode failed for field '{field_name}': bool payload must be exactly {SCALAR_BOOL_PAYLOAD_LEN} byte",
903            )));
904        };
905
906        match *value {
907            SCALAR_BOOL_FALSE_TAG => Ok(false),
908            SCALAR_BOOL_TRUE_TAG => Ok(true),
909            _ => Err(InternalError::serialize_corruption(format!(
910                "row decode failed for field '{field_name}': invalid bool payload byte {value}",
911            ))),
912        }
913    }
914}
915
916impl PersistedScalar for String {
917    const CODEC: ScalarCodec = ScalarCodec::Text;
918
919    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
920        Ok(self.as_bytes().to_vec())
921    }
922
923    fn decode_scalar_payload(
924        bytes: &[u8],
925        field_name: &'static str,
926    ) -> Result<Self, InternalError> {
927        str::from_utf8(bytes).map(str::to_owned).map_err(|err| {
928            InternalError::serialize_corruption(format!(
929                "row decode failed for field '{field_name}': invalid UTF-8 text payload ({err})",
930            ))
931        })
932    }
933}
934
935impl PersistedScalar for Vec<u8> {
936    const CODEC: ScalarCodec = ScalarCodec::Blob;
937
938    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
939        Ok(self.clone())
940    }
941
942    fn decode_scalar_payload(
943        bytes: &[u8],
944        _field_name: &'static str,
945    ) -> Result<Self, InternalError> {
946        Ok(bytes.to_vec())
947    }
948}
949
950impl PersistedScalar for Blob {
951    const CODEC: ScalarCodec = ScalarCodec::Blob;
952
953    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
954        Ok(self.to_vec())
955    }
956
957    fn decode_scalar_payload(
958        bytes: &[u8],
959        _field_name: &'static str,
960    ) -> Result<Self, InternalError> {
961        Ok(Self::from(bytes))
962    }
963}
964
965impl PersistedScalar for Ulid {
966    const CODEC: ScalarCodec = ScalarCodec::Ulid;
967
968    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
969        Ok(self.to_bytes().to_vec())
970    }
971
972    fn decode_scalar_payload(
973        bytes: &[u8],
974        field_name: &'static str,
975    ) -> Result<Self, InternalError> {
976        Self::try_from_bytes(bytes).map_err(|err| {
977            InternalError::serialize_corruption(format!(
978                "row decode failed for field '{field_name}': {err}",
979            ))
980        })
981    }
982}
983
984impl PersistedScalar for Timestamp {
985    const CODEC: ScalarCodec = ScalarCodec::Timestamp;
986
987    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
988        Ok(self.as_millis().to_le_bytes().to_vec())
989    }
990
991    fn decode_scalar_payload(
992        bytes: &[u8],
993        field_name: &'static str,
994    ) -> Result<Self, InternalError> {
995        let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
996            InternalError::serialize_corruption(format!(
997                "row decode failed for field '{field_name}': timestamp payload must be exactly {SCALAR_WORD64_PAYLOAD_LEN} bytes",
998            ))
999        })?;
1000
1001        Ok(Self::from_millis(i64::from_le_bytes(raw)))
1002    }
1003}
1004
1005impl PersistedScalar for Date {
1006    const CODEC: ScalarCodec = ScalarCodec::Date;
1007
1008    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1009        Ok(self.as_days_since_epoch().to_le_bytes().to_vec())
1010    }
1011
1012    fn decode_scalar_payload(
1013        bytes: &[u8],
1014        field_name: &'static str,
1015    ) -> Result<Self, InternalError> {
1016        let raw: [u8; SCALAR_WORD32_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
1017            InternalError::serialize_corruption(format!(
1018                "row decode failed for field '{field_name}': date payload must be exactly {SCALAR_WORD32_PAYLOAD_LEN} bytes",
1019            ))
1020        })?;
1021
1022        Ok(Self::from_days_since_epoch(i32::from_le_bytes(raw)))
1023    }
1024}
1025
1026impl PersistedScalar for Duration {
1027    const CODEC: ScalarCodec = ScalarCodec::Duration;
1028
1029    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1030        Ok(self.as_millis().to_le_bytes().to_vec())
1031    }
1032
1033    fn decode_scalar_payload(
1034        bytes: &[u8],
1035        field_name: &'static str,
1036    ) -> Result<Self, InternalError> {
1037        let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
1038            InternalError::serialize_corruption(format!(
1039                "row decode failed for field '{field_name}': duration payload must be exactly {SCALAR_WORD64_PAYLOAD_LEN} bytes",
1040            ))
1041        })?;
1042
1043        Ok(Self::from_millis(u64::from_le_bytes(raw)))
1044    }
1045}
1046
1047impl PersistedScalar for Float32 {
1048    const CODEC: ScalarCodec = ScalarCodec::Float32;
1049
1050    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1051        Ok(self.get().to_bits().to_le_bytes().to_vec())
1052    }
1053
1054    fn decode_scalar_payload(
1055        bytes: &[u8],
1056        field_name: &'static str,
1057    ) -> Result<Self, InternalError> {
1058        let raw: [u8; SCALAR_WORD32_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
1059            InternalError::serialize_corruption(format!(
1060                "row decode failed for field '{field_name}': float32 payload must be exactly {SCALAR_WORD32_PAYLOAD_LEN} bytes",
1061            ))
1062        })?;
1063        let value = f32::from_bits(u32::from_le_bytes(raw));
1064
1065        Self::try_new(value).ok_or_else(|| {
1066            InternalError::serialize_corruption(format!(
1067                "row decode failed for field '{field_name}': float32 payload is non-finite",
1068            ))
1069        })
1070    }
1071}
1072
1073impl PersistedScalar for Float64 {
1074    const CODEC: ScalarCodec = ScalarCodec::Float64;
1075
1076    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1077        Ok(self.get().to_bits().to_le_bytes().to_vec())
1078    }
1079
1080    fn decode_scalar_payload(
1081        bytes: &[u8],
1082        field_name: &'static str,
1083    ) -> Result<Self, InternalError> {
1084        let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
1085            InternalError::serialize_corruption(format!(
1086                "row decode failed for field '{field_name}': float64 payload must be exactly {SCALAR_WORD64_PAYLOAD_LEN} bytes",
1087            ))
1088        })?;
1089        let value = f64::from_bits(u64::from_le_bytes(raw));
1090
1091        Self::try_new(value).ok_or_else(|| {
1092            InternalError::serialize_corruption(format!(
1093                "row decode failed for field '{field_name}': float64 payload is non-finite",
1094            ))
1095        })
1096    }
1097}
1098
1099impl PersistedScalar for Principal {
1100    const CODEC: ScalarCodec = ScalarCodec::Principal;
1101
1102    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1103        self.to_bytes().map_err(|err| {
1104            InternalError::serialize_internal(format!(
1105                "row encode failed for principal field: {err}",
1106            ))
1107        })
1108    }
1109
1110    fn decode_scalar_payload(
1111        bytes: &[u8],
1112        field_name: &'static str,
1113    ) -> Result<Self, InternalError> {
1114        Self::try_from_bytes(bytes).map_err(|err| {
1115            InternalError::serialize_corruption(format!(
1116                "row decode failed for field '{field_name}': {err}",
1117            ))
1118        })
1119    }
1120}
1121
1122impl PersistedScalar for Subaccount {
1123    const CODEC: ScalarCodec = ScalarCodec::Subaccount;
1124
1125    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1126        Ok(self.to_bytes().to_vec())
1127    }
1128
1129    fn decode_scalar_payload(
1130        bytes: &[u8],
1131        field_name: &'static str,
1132    ) -> Result<Self, InternalError> {
1133        let raw: [u8; SCALAR_SUBACCOUNT_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
1134            InternalError::serialize_corruption(format!(
1135                "row decode failed for field '{field_name}': subaccount payload must be exactly {SCALAR_SUBACCOUNT_PAYLOAD_LEN} bytes",
1136            ))
1137        })?;
1138
1139        Ok(Self::from_array(raw))
1140    }
1141}
1142
1143impl PersistedScalar for () {
1144    const CODEC: ScalarCodec = ScalarCodec::Unit;
1145
1146    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1147        Ok(Vec::new())
1148    }
1149
1150    fn decode_scalar_payload(
1151        bytes: &[u8],
1152        field_name: &'static str,
1153    ) -> Result<Self, InternalError> {
1154        if !bytes.is_empty() {
1155            return Err(InternalError::serialize_corruption(format!(
1156                "row decode failed for field '{field_name}': unit payload must be empty",
1157            )));
1158        }
1159
1160        Ok(())
1161    }
1162}