Skip to main content

icydb_core/db/data/
persisted_row.rs

1//! Module: data::persisted_row
2//! Responsibility: slot-oriented persisted-row seams over runtime row bytes.
3//! Does not own: row envelope versions, typed entity materialization, or query semantics.
4//! Boundary: commit/index planning, row writes, and typed materialization all
5//! consume the canonical slot-oriented persisted-row boundary here.
6
7use crate::{
8    db::data::{
9        DataKey, RawRow, StructuralRowDecodeError, StructuralRowFieldBytes,
10        decode_storage_key_field_bytes, decode_structural_field_by_kind_bytes,
11        decode_structural_value_storage_bytes,
12    },
13    error::InternalError,
14    model::{
15        entity::{EntityModel, resolve_primary_key_slot},
16        field::{FieldModel, LeafCodec, ScalarCodec},
17    },
18    serialize::{deserialize, serialize},
19    traits::EntityKind,
20    types::{Blob, Date, Duration, Float32, Float64, Principal, Subaccount, Timestamp, Ulid},
21    value::{StorageKey, Value},
22};
23use std::str;
24
25const SCALAR_SLOT_PREFIX: u8 = 0xFF;
26const SCALAR_SLOT_TAG_NULL: u8 = 0;
27const SCALAR_SLOT_TAG_VALUE: u8 = 1;
28
29const SCALAR_BOOL_PAYLOAD_LEN: usize = 1;
30const SCALAR_WORD32_PAYLOAD_LEN: usize = 4;
31const SCALAR_WORD64_PAYLOAD_LEN: usize = 8;
32const SCALAR_ULID_PAYLOAD_LEN: usize = 16;
33const SCALAR_SUBACCOUNT_PAYLOAD_LEN: usize = 32;
34
35const SCALAR_BOOL_FALSE_TAG: u8 = 0;
36const SCALAR_BOOL_TRUE_TAG: u8 = 1;
37
38///
39/// SlotReader
40///
41/// SlotReader exposes one persisted row as stable slot-addressable fields.
42/// Callers may inspect field presence, borrow raw field bytes, or decode one
43/// field value on demand.
44///
45
46pub trait SlotReader {
47    /// Return the structural model that owns this slot mapping.
48    fn model(&self) -> &'static EntityModel;
49
50    /// Return whether the given slot is present in the persisted row.
51    fn has(&self, slot: usize) -> bool;
52
53    /// Borrow the raw persisted payload for one slot when present.
54    fn get_bytes(&self, slot: usize) -> Option<&[u8]>;
55
56    /// Decode one slot as a scalar leaf when the field model declares a scalar codec.
57    fn get_scalar(&self, slot: usize) -> Result<Option<ScalarSlotValueRef<'_>>, InternalError>;
58
59    /// Decode one slot value on demand using the field contract declared by the model.
60    fn get_value(&mut self, slot: usize) -> Result<Option<Value>, InternalError>;
61}
62
63///
64/// SlotWriter
65///
66/// SlotWriter is the canonical row-container output seam used by persisted-row
67/// writers.
68///
69
70pub trait SlotWriter {
71    /// Record one slot payload for the current row.
72    fn write_slot(&mut self, slot: usize, payload: Option<&[u8]>) -> Result<(), InternalError>;
73
74    /// Record one scalar slot payload using the canonical scalar leaf envelope.
75    fn write_scalar(
76        &mut self,
77        slot: usize,
78        value: ScalarSlotValueRef<'_>,
79    ) -> Result<(), InternalError> {
80        let payload = encode_scalar_slot_value(value);
81
82        self.write_slot(slot, Some(payload.as_slice()))
83    }
84}
85
86///
87/// PersistedRow
88///
89/// PersistedRow is the derive-owned bridge between typed entities and
90/// slot-addressable persisted rows.
91/// It owns entity-specific materialization/default semantics while runtime
92/// paths stay structural at the row boundary.
93///
94
95pub trait PersistedRow: EntityKind + Sized {
96    /// Materialize one typed entity from one slot reader.
97    fn materialize_from_slots(slots: &mut dyn SlotReader) -> Result<Self, InternalError>;
98
99    /// Write one typed entity into one slot writer.
100    fn write_slots(&self, out: &mut dyn SlotWriter) -> Result<(), InternalError>;
101
102    /// Decode one slot value needed by structural planner/projection consumers.
103    fn project_slot(slots: &mut dyn SlotReader, slot: usize) -> Result<Option<Value>, InternalError>
104    where
105        Self: crate::traits::FieldProjection,
106    {
107        let entity = Self::materialize_from_slots(slots)?;
108
109        Ok(<Self as crate::traits::FieldProjection>::get_value_by_index(&entity, slot))
110    }
111}
112
113/// Decode one slot value through the declared field contract without routing
114/// through `SlotReader::get_value`.
115pub(in crate::db) fn decode_slot_value_by_contract(
116    slots: &dyn SlotReader,
117    slot: usize,
118) -> Result<Option<Value>, InternalError> {
119    let field = slots.model().fields().get(slot).ok_or_else(|| {
120        InternalError::index_invariant(format!(
121            "slot lookup outside model bounds during structural row access: model='{}' slot={slot}",
122            slots.model().path(),
123        ))
124    })?;
125
126    if matches!(field.leaf_codec(), LeafCodec::Scalar(_))
127        && let Some(value) = slots.get_scalar(slot)?
128    {
129        return Ok(Some(match value {
130            ScalarSlotValueRef::Null => Value::Null,
131            ScalarSlotValueRef::Value(value) => value.into_value(),
132        }));
133    }
134
135    match slots.get_bytes(slot) {
136        Some(raw_value) => decode_non_scalar_slot_value(raw_value, field).map(Some),
137        None => Ok(None),
138    }
139}
140
141// Decode one non-scalar slot through the exact persisted contract declared by
142// the field model.
143fn decode_non_scalar_slot_value(
144    raw_value: &[u8],
145    field: &FieldModel,
146) -> Result<Value, InternalError> {
147    let decoded = match field.storage_decode() {
148        crate::model::field::FieldStorageDecode::ByKind => {
149            decode_structural_field_by_kind_bytes(raw_value, field.kind())
150        }
151        crate::model::field::FieldStorageDecode::Value => {
152            decode_structural_value_storage_bytes(raw_value)
153        }
154    };
155
156    decoded.map_err(|err| {
157        InternalError::serialize_corruption(format!(
158            "row decode failed for field '{}' kind={:?}: {err}",
159            field.name(),
160            field.kind(),
161        ))
162    })
163}
164
165///
166/// ScalarValueRef
167///
168/// ScalarValueRef is the borrowed-or-copy scalar payload view returned by the
169/// slot-reader fast path.
170/// It preserves cheap references for text/blob payloads while keeping fixed
171/// width scalar wrappers as copy values.
172///
173
174#[derive(Clone, Copy, Debug)]
175pub enum ScalarValueRef<'a> {
176    Blob(&'a [u8]),
177    Bool(bool),
178    Date(Date),
179    Duration(Duration),
180    Float32(Float32),
181    Float64(Float64),
182    Int(i64),
183    Principal(Principal),
184    Subaccount(Subaccount),
185    Text(&'a str),
186    Timestamp(Timestamp),
187    Uint(u64),
188    Ulid(Ulid),
189    Unit,
190}
191
192impl ScalarValueRef<'_> {
193    /// Materialize this scalar view into the runtime `Value` enum.
194    #[must_use]
195    pub fn into_value(self) -> Value {
196        match self {
197            Self::Blob(value) => Value::Blob(value.to_vec()),
198            Self::Bool(value) => Value::Bool(value),
199            Self::Date(value) => Value::Date(value),
200            Self::Duration(value) => Value::Duration(value),
201            Self::Float32(value) => Value::Float32(value),
202            Self::Float64(value) => Value::Float64(value),
203            Self::Int(value) => Value::Int(value),
204            Self::Principal(value) => Value::Principal(value),
205            Self::Subaccount(value) => Value::Subaccount(value),
206            Self::Text(value) => Value::Text(value.to_owned()),
207            Self::Timestamp(value) => Value::Timestamp(value),
208            Self::Uint(value) => Value::Uint(value),
209            Self::Ulid(value) => Value::Ulid(value),
210            Self::Unit => Value::Unit,
211        }
212    }
213}
214
215///
216/// ScalarSlotValueRef
217///
218/// ScalarSlotValueRef preserves the distinction between a missing slot and an
219/// explicitly persisted `NULL` scalar payload.
220/// The outer `Option` from `SlotReader::get_scalar` therefore still means
221/// "slot absent".
222///
223
224#[derive(Clone, Copy, Debug)]
225pub enum ScalarSlotValueRef<'a> {
226    Null,
227    Value(ScalarValueRef<'a>),
228}
229
230///
231/// PersistedScalar
232///
233/// PersistedScalar defines the canonical binary payload codec for one scalar
234/// leaf type.
235/// Derive-generated persisted-row materializers and writers use this trait to
236/// avoid routing scalar fields back through CBOR.
237///
238
239pub trait PersistedScalar: Sized {
240    /// Canonical scalar codec identifier used by schema/runtime metadata.
241    const CODEC: ScalarCodec;
242
243    /// Encode this scalar value into its codec-specific payload bytes.
244    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError>;
245
246    /// Decode this scalar value from its codec-specific payload bytes.
247    fn decode_scalar_payload(bytes: &[u8], field_name: &'static str)
248    -> Result<Self, InternalError>;
249}
250
251/// Encode one persisted slot payload using the shared leaf codec boundary.
252pub fn encode_persisted_slot_payload<T>(
253    value: &T,
254    field_name: &'static str,
255) -> Result<Vec<u8>, InternalError>
256where
257    T: serde::Serialize,
258{
259    serialize(value).map_err(|err| {
260        InternalError::serialize_internal(format!(
261            "row encode failed for field '{field_name}': {err}",
262        ))
263    })
264}
265
266/// Encode one persisted scalar slot payload using the canonical scalar envelope.
267pub fn encode_persisted_scalar_slot_payload<T>(
268    value: &T,
269    field_name: &'static str,
270) -> Result<Vec<u8>, InternalError>
271where
272    T: PersistedScalar,
273{
274    let payload = value.encode_scalar_payload()?;
275    let mut encoded = Vec::with_capacity(payload.len() + 2);
276    encoded.push(SCALAR_SLOT_PREFIX);
277    encoded.push(SCALAR_SLOT_TAG_VALUE);
278    encoded.extend_from_slice(&payload);
279
280    if encoded.len() < 2 {
281        return Err(InternalError::serialize_internal(format!(
282            "row encode failed for field '{field_name}': scalar payload envelope underflow",
283        )));
284    }
285
286    Ok(encoded)
287}
288
289/// Encode one optional persisted scalar slot payload preserving explicit `NULL`.
290pub fn encode_persisted_option_scalar_slot_payload<T>(
291    value: &Option<T>,
292    field_name: &'static str,
293) -> Result<Vec<u8>, InternalError>
294where
295    T: PersistedScalar,
296{
297    match value {
298        Some(value) => encode_persisted_scalar_slot_payload(value, field_name),
299        None => Ok(vec![SCALAR_SLOT_PREFIX, SCALAR_SLOT_TAG_NULL]),
300    }
301}
302
303/// Decode one persisted slot payload using the shared leaf codec boundary.
304pub fn decode_persisted_slot_payload<T>(
305    bytes: &[u8],
306    field_name: &'static str,
307) -> Result<T, InternalError>
308where
309    T: serde::de::DeserializeOwned,
310{
311    deserialize(bytes).map_err(|err| {
312        InternalError::serialize_corruption(format!(
313            "row decode failed for field '{field_name}': {err}",
314        ))
315    })
316}
317
318/// Decode one persisted scalar slot payload using the canonical scalar envelope.
319pub fn decode_persisted_scalar_slot_payload<T>(
320    bytes: &[u8],
321    field_name: &'static str,
322) -> Result<T, InternalError>
323where
324    T: PersistedScalar,
325{
326    let payload = decode_scalar_slot_payload_body(bytes, field_name)?.ok_or_else(|| {
327        InternalError::serialize_corruption(format!(
328            "row decode failed for field '{field_name}': unexpected null for non-nullable scalar field",
329        ))
330    })?;
331
332    T::decode_scalar_payload(payload, field_name)
333}
334
335/// Decode one optional persisted scalar slot payload preserving explicit `NULL`.
336pub fn decode_persisted_option_scalar_slot_payload<T>(
337    bytes: &[u8],
338    field_name: &'static str,
339) -> Result<Option<T>, InternalError>
340where
341    T: PersistedScalar,
342{
343    let Some(payload) = decode_scalar_slot_payload_body(bytes, field_name)? else {
344        return Ok(None);
345    };
346
347    T::decode_scalar_payload(payload, field_name).map(Some)
348}
349
350/// Build the canonical missing-field error for persisted-row materialization.
351#[must_use]
352pub fn missing_persisted_slot_error(field_name: &'static str) -> InternalError {
353    InternalError::serialize_corruption(format!(
354        "row decode failed: missing required field '{field_name}'",
355    ))
356}
357
358///
359/// SlotBufferWriter
360///
361/// SlotBufferWriter captures one row worth of slot payloads before they are
362/// encoded into the canonical slot container.
363///
364
365pub(in crate::db) struct SlotBufferWriter {
366    slots: Vec<Option<Vec<u8>>>,
367}
368
369impl SlotBufferWriter {
370    /// Build one empty slot buffer for one entity model.
371    pub(in crate::db) fn for_model(model: &'static EntityModel) -> Self {
372        Self {
373            slots: vec![None; model.fields().len()],
374        }
375    }
376
377    /// Encode the buffered slots into the canonical row payload.
378    pub(in crate::db) fn finish(self) -> Result<Vec<u8>, InternalError> {
379        let field_count = u16::try_from(self.slots.len()).map_err(|_| {
380            InternalError::serialize_internal(format!(
381                "row encode failed: field count {} exceeds u16 slot table capacity",
382                self.slots.len(),
383            ))
384        })?;
385        let mut payload_bytes = Vec::new();
386        let mut slot_table = Vec::with_capacity(self.slots.len());
387
388        // Phase 1: assign each present slot one payload span inside the data section.
389        for slot_payload in self.slots {
390            match slot_payload {
391                Some(bytes) => {
392                    let start = u32::try_from(payload_bytes.len()).map_err(|_| {
393                        InternalError::serialize_internal(
394                            "row encode failed: slot payload start exceeds u32 range",
395                        )
396                    })?;
397                    let len = u32::try_from(bytes.len()).map_err(|_| {
398                        InternalError::serialize_internal(
399                            "row encode failed: slot payload length exceeds u32 range",
400                        )
401                    })?;
402                    payload_bytes.extend_from_slice(&bytes);
403                    slot_table.push((start, len));
404                }
405                None => slot_table.push((0, 0)),
406            }
407        }
408
409        // Phase 2: write the fixed-width slot header followed by concatenated payloads.
410        let mut encoded = Vec::with_capacity(
411            usize::from(field_count) * (u32::BITS as usize / 4) + 2 + payload_bytes.len(),
412        );
413        encoded.extend_from_slice(&field_count.to_be_bytes());
414        for (start, len) in slot_table {
415            encoded.extend_from_slice(&start.to_be_bytes());
416            encoded.extend_from_slice(&len.to_be_bytes());
417        }
418        encoded.extend_from_slice(&payload_bytes);
419
420        Ok(encoded)
421    }
422}
423
424impl SlotWriter for SlotBufferWriter {
425    fn write_slot(&mut self, slot: usize, payload: Option<&[u8]>) -> Result<(), InternalError> {
426        let entry = self.slots.get_mut(slot).ok_or_else(|| {
427            InternalError::serialize_internal(format!(
428                "row encode failed: slot {slot} is outside the row layout",
429            ))
430        })?;
431        *entry = payload.map(<[u8]>::to_vec);
432
433        Ok(())
434    }
435}
436
437/// Encode one entity into the canonical slot-container payload.
438pub(in crate::db) fn encode_persisted_row<E>(entity: &E) -> Result<Vec<u8>, InternalError>
439where
440    E: PersistedRow,
441{
442    let mut writer = SlotBufferWriter::for_model(E::MODEL);
443    entity.write_slots(&mut writer)?;
444    writer.finish()
445}
446
447///
448/// StructuralSlotReader
449///
450/// StructuralSlotReader adapts the current persisted-row bytes into the
451/// canonical slot-reader seam.
452/// It caches decoded field values lazily so repeated index/predicate reads do
453/// not re-run the same field decoder within one row planning pass.
454///
455
456pub(in crate::db) struct StructuralSlotReader<'a> {
457    model: &'static EntityModel,
458    field_bytes: StructuralRowFieldBytes<'a>,
459    cached_values: Vec<CachedSlotValue>,
460}
461
462impl<'a> StructuralSlotReader<'a> {
463    /// Build one slot reader over one persisted row using the current structural row scanner.
464    pub(in crate::db) fn from_raw_row(
465        raw_row: &'a RawRow,
466        model: &'static EntityModel,
467    ) -> Result<Self, InternalError> {
468        let field_bytes =
469            StructuralRowFieldBytes::from_raw_row(raw_row, model).map_err(|err| match err {
470                StructuralRowDecodeError::Deserialize(source) => source,
471            })?;
472        let cached_values = std::iter::repeat_with(|| CachedSlotValue::Pending)
473            .take(model.fields().len())
474            .collect();
475
476        Ok(Self {
477            model,
478            field_bytes,
479            cached_values,
480        })
481    }
482
483    /// Validate the decoded primary-key slot against the authoritative row key.
484    pub(in crate::db) fn validate_storage_key(
485        &self,
486        data_key: &DataKey,
487    ) -> Result<(), InternalError> {
488        let Some(primary_key_slot) = resolve_primary_key_slot(self.model) else {
489            return Err(InternalError::index_invariant(format!(
490                "entity primary key field missing during structural row validation: {}",
491                self.model.path(),
492            )));
493        };
494        let field = self.field_model(primary_key_slot)?;
495        let decoded_key = match self.get_scalar(primary_key_slot)? {
496            Some(ScalarSlotValueRef::Null) => None,
497            Some(ScalarSlotValueRef::Value(value)) => storage_key_from_scalar_ref(value),
498            None => match self.field_bytes.field(primary_key_slot) {
499                Some(raw_value) => Some(
500                    decode_storage_key_field_bytes(raw_value, field.kind).map_err(|err| {
501                        InternalError::serialize_corruption(format!(
502                            "row decode failed: primary-key value is not storage-key encodable: {data_key} ({err})",
503                        ))
504                    })?,
505                ),
506                None => None,
507            },
508        };
509        let Some(decoded_key) = decoded_key else {
510            return Err(InternalError::serialize_corruption(format!(
511                "row decode failed: missing primary-key slot while validating {data_key}",
512            )));
513        };
514        let expected_key = data_key.storage_key();
515
516        if decoded_key != expected_key {
517            return Err(InternalError::store_corruption(format!(
518                "row key mismatch: expected {expected_key}, found {decoded_key}",
519            )));
520        }
521
522        Ok(())
523    }
524
525    // Resolve one field model entry by stable slot index.
526    fn field_model(&self, slot: usize) -> Result<&FieldModel, InternalError> {
527        self.model.fields().get(slot).ok_or_else(|| {
528            InternalError::index_invariant(format!(
529                "slot lookup outside model bounds during structural row access: model='{}' slot={slot}",
530                self.model.path(),
531            ))
532        })
533    }
534}
535
536// Convert one scalar slot fast-path value into its storage-key form when the
537// field kind is storage-key-compatible.
538const fn storage_key_from_scalar_ref(value: ScalarValueRef<'_>) -> Option<StorageKey> {
539    match value {
540        ScalarValueRef::Int(value) => Some(StorageKey::Int(value)),
541        ScalarValueRef::Principal(value) => Some(StorageKey::Principal(value)),
542        ScalarValueRef::Subaccount(value) => Some(StorageKey::Subaccount(value)),
543        ScalarValueRef::Timestamp(value) => Some(StorageKey::Timestamp(value)),
544        ScalarValueRef::Uint(value) => Some(StorageKey::Uint(value)),
545        ScalarValueRef::Ulid(value) => Some(StorageKey::Ulid(value)),
546        ScalarValueRef::Unit => Some(StorageKey::Unit),
547        _ => None,
548    }
549}
550
551impl SlotReader for StructuralSlotReader<'_> {
552    fn model(&self) -> &'static EntityModel {
553        self.model
554    }
555
556    fn has(&self, slot: usize) -> bool {
557        self.field_bytes.field(slot).is_some()
558    }
559
560    fn get_bytes(&self, slot: usize) -> Option<&[u8]> {
561        self.field_bytes.field(slot)
562    }
563
564    fn get_scalar(&self, slot: usize) -> Result<Option<ScalarSlotValueRef<'_>>, InternalError> {
565        let field = self.field_model(slot)?;
566        let Some(raw_value) = self.field_bytes.field(slot) else {
567            return Ok(None);
568        };
569
570        match field.leaf_codec() {
571            LeafCodec::Scalar(codec) => {
572                decode_scalar_slot_value(raw_value, codec, field.name()).map(Some)
573            }
574            LeafCodec::CborFallback => Ok(None),
575        }
576    }
577
578    fn get_value(&mut self, slot: usize) -> Result<Option<Value>, InternalError> {
579        let cached = self.cached_values.get(slot).ok_or_else(|| {
580            InternalError::index_invariant(format!(
581                "slot cache lookup outside model bounds during structural row access: model='{}' slot={slot}",
582                self.model.path(),
583            ))
584        })?;
585        if let CachedSlotValue::Decoded(value) = cached {
586            return Ok(value.clone());
587        }
588
589        let field = self.field_model(slot)?;
590        let value = match self.get_scalar(slot)? {
591            Some(ScalarSlotValueRef::Null) => Some(Value::Null),
592            Some(ScalarSlotValueRef::Value(value)) => Some(value.into_value()),
593            None => match self.field_bytes.field(slot) {
594                Some(raw_value) => Some(decode_non_scalar_slot_value(raw_value, field)?),
595                None => None,
596            },
597        };
598        self.cached_values[slot] = CachedSlotValue::Decoded(value.clone());
599
600        Ok(value)
601    }
602}
603
604///
605/// CachedSlotValue
606///
607/// CachedSlotValue tracks whether one slot has already been decoded during the
608/// current structural row access pass.
609///
610
611enum CachedSlotValue {
612    Pending,
613    Decoded(Option<Value>),
614}
615
616// Encode one scalar slot value into the canonical prefixed scalar envelope.
617fn encode_scalar_slot_value(value: ScalarSlotValueRef<'_>) -> Vec<u8> {
618    match value {
619        ScalarSlotValueRef::Null => vec![SCALAR_SLOT_PREFIX, SCALAR_SLOT_TAG_NULL],
620        ScalarSlotValueRef::Value(value) => {
621            let mut encoded = Vec::new();
622            encoded.push(SCALAR_SLOT_PREFIX);
623            encoded.push(SCALAR_SLOT_TAG_VALUE);
624
625            match value {
626                ScalarValueRef::Blob(bytes) => encoded.extend_from_slice(bytes),
627                ScalarValueRef::Bool(value) => encoded.push(u8::from(value)),
628                ScalarValueRef::Date(value) => {
629                    encoded.extend_from_slice(&value.as_days_since_epoch().to_le_bytes());
630                }
631                ScalarValueRef::Duration(value) => {
632                    encoded.extend_from_slice(&value.as_millis().to_le_bytes());
633                }
634                ScalarValueRef::Float32(value) => {
635                    encoded.extend_from_slice(&value.get().to_bits().to_le_bytes());
636                }
637                ScalarValueRef::Float64(value) => {
638                    encoded.extend_from_slice(&value.get().to_bits().to_le_bytes());
639                }
640                ScalarValueRef::Int(value) => encoded.extend_from_slice(&value.to_le_bytes()),
641                ScalarValueRef::Principal(value) => encoded.extend_from_slice(value.as_slice()),
642                ScalarValueRef::Subaccount(value) => encoded.extend_from_slice(&value.to_bytes()),
643                ScalarValueRef::Text(value) => encoded.extend_from_slice(value.as_bytes()),
644                ScalarValueRef::Timestamp(value) => {
645                    encoded.extend_from_slice(&value.as_millis().to_le_bytes());
646                }
647                ScalarValueRef::Uint(value) => encoded.extend_from_slice(&value.to_le_bytes()),
648                ScalarValueRef::Ulid(value) => encoded.extend_from_slice(&value.to_bytes()),
649                ScalarValueRef::Unit => {}
650            }
651
652            encoded
653        }
654    }
655}
656
657// Split one scalar slot envelope into `NULL` vs payload bytes.
658fn decode_scalar_slot_payload_body<'a>(
659    bytes: &'a [u8],
660    field_name: &'static str,
661) -> Result<Option<&'a [u8]>, InternalError> {
662    let Some((&prefix, rest)) = bytes.split_first() else {
663        return Err(InternalError::serialize_corruption(format!(
664            "row decode failed for field '{field_name}': empty scalar payload",
665        )));
666    };
667    if prefix != SCALAR_SLOT_PREFIX {
668        return Err(InternalError::serialize_corruption(format!(
669            "row decode failed for field '{field_name}': scalar payload prefix mismatch",
670        )));
671    }
672    let Some((&tag, payload)) = rest.split_first() else {
673        return Err(InternalError::serialize_corruption(format!(
674            "row decode failed for field '{field_name}': truncated scalar payload tag",
675        )));
676    };
677
678    match tag {
679        SCALAR_SLOT_TAG_NULL => {
680            if !payload.is_empty() {
681                return Err(InternalError::serialize_corruption(format!(
682                    "row decode failed for field '{field_name}': null scalar payload has trailing bytes",
683                )));
684            }
685
686            Ok(None)
687        }
688        SCALAR_SLOT_TAG_VALUE => Ok(Some(payload)),
689        _ => Err(InternalError::serialize_corruption(format!(
690            "row decode failed for field '{field_name}': invalid scalar payload tag {tag}",
691        ))),
692    }
693}
694
695// Decode one scalar slot view using the field-declared scalar codec.
696#[expect(clippy::too_many_lines)]
697fn decode_scalar_slot_value<'a>(
698    bytes: &'a [u8],
699    codec: ScalarCodec,
700    field_name: &'static str,
701) -> Result<ScalarSlotValueRef<'a>, InternalError> {
702    let Some(payload) = decode_scalar_slot_payload_body(bytes, field_name)? else {
703        return Ok(ScalarSlotValueRef::Null);
704    };
705
706    let value = match codec {
707        ScalarCodec::Blob => ScalarValueRef::Blob(payload),
708        ScalarCodec::Bool => {
709            let [value] = payload else {
710                return Err(InternalError::serialize_corruption(format!(
711                    "row decode failed for field '{field_name}': bool payload must be exactly {SCALAR_BOOL_PAYLOAD_LEN} byte",
712                )));
713            };
714            match *value {
715                SCALAR_BOOL_FALSE_TAG => ScalarValueRef::Bool(false),
716                SCALAR_BOOL_TRUE_TAG => ScalarValueRef::Bool(true),
717                _ => {
718                    return Err(InternalError::serialize_corruption(format!(
719                        "row decode failed for field '{field_name}': invalid bool payload byte {value}",
720                    )));
721                }
722            }
723        }
724        ScalarCodec::Date => {
725            let bytes: [u8; SCALAR_WORD32_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
726                InternalError::serialize_corruption(format!(
727                    "row decode failed for field '{field_name}': date payload must be exactly {SCALAR_WORD32_PAYLOAD_LEN} bytes",
728                ))
729            })?;
730            ScalarValueRef::Date(Date::from_days_since_epoch(i32::from_le_bytes(bytes)))
731        }
732        ScalarCodec::Duration => {
733            let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
734                InternalError::serialize_corruption(format!(
735                    "row decode failed for field '{field_name}': duration payload must be exactly {SCALAR_WORD64_PAYLOAD_LEN} bytes",
736                ))
737            })?;
738            ScalarValueRef::Duration(Duration::from_millis(u64::from_le_bytes(bytes)))
739        }
740        ScalarCodec::Float32 => {
741            let bytes: [u8; SCALAR_WORD32_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
742                InternalError::serialize_corruption(format!(
743                    "row decode failed for field '{field_name}': float32 payload must be exactly {SCALAR_WORD32_PAYLOAD_LEN} bytes",
744                ))
745            })?;
746            let value = f32::from_bits(u32::from_le_bytes(bytes));
747            let value = Float32::try_new(value).ok_or_else(|| {
748                InternalError::serialize_corruption(format!(
749                    "row decode failed for field '{field_name}': float32 payload is non-finite",
750                ))
751            })?;
752            ScalarValueRef::Float32(value)
753        }
754        ScalarCodec::Float64 => {
755            let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
756                InternalError::serialize_corruption(format!(
757                    "row decode failed for field '{field_name}': float64 payload must be exactly {SCALAR_WORD64_PAYLOAD_LEN} bytes",
758                ))
759            })?;
760            let value = f64::from_bits(u64::from_le_bytes(bytes));
761            let value = Float64::try_new(value).ok_or_else(|| {
762                InternalError::serialize_corruption(format!(
763                    "row decode failed for field '{field_name}': float64 payload is non-finite",
764                ))
765            })?;
766            ScalarValueRef::Float64(value)
767        }
768        ScalarCodec::Int64 => {
769            let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
770                InternalError::serialize_corruption(format!(
771                    "row decode failed for field '{field_name}': int payload must be exactly {SCALAR_WORD64_PAYLOAD_LEN} bytes",
772                ))
773            })?;
774            ScalarValueRef::Int(i64::from_le_bytes(bytes))
775        }
776        ScalarCodec::Principal => {
777            ScalarValueRef::Principal(Principal::try_from_bytes(payload).map_err(|err| {
778                InternalError::serialize_corruption(format!(
779                    "row decode failed for field '{field_name}': {err}",
780                ))
781            })?)
782        }
783        ScalarCodec::Subaccount => {
784            let bytes: [u8; SCALAR_SUBACCOUNT_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
785                InternalError::serialize_corruption(format!(
786                    "row decode failed for field '{field_name}': subaccount payload must be exactly {SCALAR_SUBACCOUNT_PAYLOAD_LEN} bytes",
787                ))
788            })?;
789            ScalarValueRef::Subaccount(Subaccount::from_array(bytes))
790        }
791        ScalarCodec::Text => {
792            let value = str::from_utf8(payload).map_err(|err| {
793                InternalError::serialize_corruption(format!(
794                    "row decode failed for field '{field_name}': invalid UTF-8 text payload ({err})",
795                ))
796            })?;
797            ScalarValueRef::Text(value)
798        }
799        ScalarCodec::Timestamp => {
800            let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
801                InternalError::serialize_corruption(format!(
802                    "row decode failed for field '{field_name}': timestamp payload must be exactly {SCALAR_WORD64_PAYLOAD_LEN} bytes",
803                ))
804            })?;
805            ScalarValueRef::Timestamp(Timestamp::from_millis(i64::from_le_bytes(bytes)))
806        }
807        ScalarCodec::Uint64 => {
808            let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
809                InternalError::serialize_corruption(format!(
810                    "row decode failed for field '{field_name}': uint payload must be exactly {SCALAR_WORD64_PAYLOAD_LEN} bytes",
811                ))
812            })?;
813            ScalarValueRef::Uint(u64::from_le_bytes(bytes))
814        }
815        ScalarCodec::Ulid => {
816            let bytes: [u8; SCALAR_ULID_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
817                InternalError::serialize_corruption(format!(
818                    "row decode failed for field '{field_name}': ulid payload must be exactly {SCALAR_ULID_PAYLOAD_LEN} bytes",
819                ))
820            })?;
821            ScalarValueRef::Ulid(Ulid::from_bytes(bytes))
822        }
823        ScalarCodec::Unit => {
824            if !payload.is_empty() {
825                return Err(InternalError::serialize_corruption(format!(
826                    "row decode failed for field '{field_name}': unit payload must be empty",
827                )));
828            }
829            ScalarValueRef::Unit
830        }
831    };
832
833    Ok(ScalarSlotValueRef::Value(value))
834}
835
836macro_rules! impl_persisted_scalar_signed {
837    ($($ty:ty),* $(,)?) => {
838        $(
839            impl PersistedScalar for $ty {
840                const CODEC: ScalarCodec = ScalarCodec::Int64;
841
842                fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
843                    Ok(i64::from(*self).to_le_bytes().to_vec())
844                }
845
846                fn decode_scalar_payload(
847                    bytes: &[u8],
848                    field_name: &'static str,
849                ) -> Result<Self, InternalError> {
850                    let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
851                        InternalError::serialize_corruption(format!(
852                            "row decode failed for field '{field_name}': int payload must be exactly {SCALAR_WORD64_PAYLOAD_LEN} bytes",
853                        ))
854                    })?;
855                    <$ty>::try_from(i64::from_le_bytes(raw)).map_err(|_| {
856                        InternalError::serialize_corruption(format!(
857                            "row decode failed for field '{field_name}': integer payload out of range for target type",
858                        ))
859                    })
860                }
861            }
862        )*
863    };
864}
865
866macro_rules! impl_persisted_scalar_unsigned {
867    ($($ty:ty),* $(,)?) => {
868        $(
869            impl PersistedScalar for $ty {
870                const CODEC: ScalarCodec = ScalarCodec::Uint64;
871
872                fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
873                    Ok(u64::from(*self).to_le_bytes().to_vec())
874                }
875
876                fn decode_scalar_payload(
877                    bytes: &[u8],
878                    field_name: &'static str,
879                ) -> Result<Self, InternalError> {
880                    let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
881                        InternalError::serialize_corruption(format!(
882                            "row decode failed for field '{field_name}': uint payload must be exactly {SCALAR_WORD64_PAYLOAD_LEN} bytes",
883                        ))
884                    })?;
885                    <$ty>::try_from(u64::from_le_bytes(raw)).map_err(|_| {
886                        InternalError::serialize_corruption(format!(
887                            "row decode failed for field '{field_name}': unsigned payload out of range for target type",
888                        ))
889                    })
890                }
891            }
892        )*
893    };
894}
895
896impl_persisted_scalar_signed!(i8, i16, i32, i64);
897impl_persisted_scalar_unsigned!(u8, u16, u32, u64);
898
899impl PersistedScalar for bool {
900    const CODEC: ScalarCodec = ScalarCodec::Bool;
901
902    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
903        Ok(vec![u8::from(*self)])
904    }
905
906    fn decode_scalar_payload(
907        bytes: &[u8],
908        field_name: &'static str,
909    ) -> Result<Self, InternalError> {
910        let [value] = bytes else {
911            return Err(InternalError::serialize_corruption(format!(
912                "row decode failed for field '{field_name}': bool payload must be exactly {SCALAR_BOOL_PAYLOAD_LEN} byte",
913            )));
914        };
915
916        match *value {
917            SCALAR_BOOL_FALSE_TAG => Ok(false),
918            SCALAR_BOOL_TRUE_TAG => Ok(true),
919            _ => Err(InternalError::serialize_corruption(format!(
920                "row decode failed for field '{field_name}': invalid bool payload byte {value}",
921            ))),
922        }
923    }
924}
925
926impl PersistedScalar for String {
927    const CODEC: ScalarCodec = ScalarCodec::Text;
928
929    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
930        Ok(self.as_bytes().to_vec())
931    }
932
933    fn decode_scalar_payload(
934        bytes: &[u8],
935        field_name: &'static str,
936    ) -> Result<Self, InternalError> {
937        str::from_utf8(bytes).map(str::to_owned).map_err(|err| {
938            InternalError::serialize_corruption(format!(
939                "row decode failed for field '{field_name}': invalid UTF-8 text payload ({err})",
940            ))
941        })
942    }
943}
944
945impl PersistedScalar for Vec<u8> {
946    const CODEC: ScalarCodec = ScalarCodec::Blob;
947
948    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
949        Ok(self.clone())
950    }
951
952    fn decode_scalar_payload(
953        bytes: &[u8],
954        _field_name: &'static str,
955    ) -> Result<Self, InternalError> {
956        Ok(bytes.to_vec())
957    }
958}
959
960impl PersistedScalar for Blob {
961    const CODEC: ScalarCodec = ScalarCodec::Blob;
962
963    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
964        Ok(self.to_vec())
965    }
966
967    fn decode_scalar_payload(
968        bytes: &[u8],
969        _field_name: &'static str,
970    ) -> Result<Self, InternalError> {
971        Ok(Self::from(bytes))
972    }
973}
974
975impl PersistedScalar for Ulid {
976    const CODEC: ScalarCodec = ScalarCodec::Ulid;
977
978    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
979        Ok(self.to_bytes().to_vec())
980    }
981
982    fn decode_scalar_payload(
983        bytes: &[u8],
984        field_name: &'static str,
985    ) -> Result<Self, InternalError> {
986        Self::try_from_bytes(bytes).map_err(|err| {
987            InternalError::serialize_corruption(format!(
988                "row decode failed for field '{field_name}': {err}",
989            ))
990        })
991    }
992}
993
994impl PersistedScalar for Timestamp {
995    const CODEC: ScalarCodec = ScalarCodec::Timestamp;
996
997    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
998        Ok(self.as_millis().to_le_bytes().to_vec())
999    }
1000
1001    fn decode_scalar_payload(
1002        bytes: &[u8],
1003        field_name: &'static str,
1004    ) -> Result<Self, InternalError> {
1005        let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
1006            InternalError::serialize_corruption(format!(
1007                "row decode failed for field '{field_name}': timestamp payload must be exactly {SCALAR_WORD64_PAYLOAD_LEN} bytes",
1008            ))
1009        })?;
1010
1011        Ok(Self::from_millis(i64::from_le_bytes(raw)))
1012    }
1013}
1014
1015impl PersistedScalar for Date {
1016    const CODEC: ScalarCodec = ScalarCodec::Date;
1017
1018    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1019        Ok(self.as_days_since_epoch().to_le_bytes().to_vec())
1020    }
1021
1022    fn decode_scalar_payload(
1023        bytes: &[u8],
1024        field_name: &'static str,
1025    ) -> Result<Self, InternalError> {
1026        let raw: [u8; SCALAR_WORD32_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
1027            InternalError::serialize_corruption(format!(
1028                "row decode failed for field '{field_name}': date payload must be exactly {SCALAR_WORD32_PAYLOAD_LEN} bytes",
1029            ))
1030        })?;
1031
1032        Ok(Self::from_days_since_epoch(i32::from_le_bytes(raw)))
1033    }
1034}
1035
1036impl PersistedScalar for Duration {
1037    const CODEC: ScalarCodec = ScalarCodec::Duration;
1038
1039    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1040        Ok(self.as_millis().to_le_bytes().to_vec())
1041    }
1042
1043    fn decode_scalar_payload(
1044        bytes: &[u8],
1045        field_name: &'static str,
1046    ) -> Result<Self, InternalError> {
1047        let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
1048            InternalError::serialize_corruption(format!(
1049                "row decode failed for field '{field_name}': duration payload must be exactly {SCALAR_WORD64_PAYLOAD_LEN} bytes",
1050            ))
1051        })?;
1052
1053        Ok(Self::from_millis(u64::from_le_bytes(raw)))
1054    }
1055}
1056
1057impl PersistedScalar for Float32 {
1058    const CODEC: ScalarCodec = ScalarCodec::Float32;
1059
1060    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1061        Ok(self.get().to_bits().to_le_bytes().to_vec())
1062    }
1063
1064    fn decode_scalar_payload(
1065        bytes: &[u8],
1066        field_name: &'static str,
1067    ) -> Result<Self, InternalError> {
1068        let raw: [u8; SCALAR_WORD32_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
1069            InternalError::serialize_corruption(format!(
1070                "row decode failed for field '{field_name}': float32 payload must be exactly {SCALAR_WORD32_PAYLOAD_LEN} bytes",
1071            ))
1072        })?;
1073        let value = f32::from_bits(u32::from_le_bytes(raw));
1074
1075        Self::try_new(value).ok_or_else(|| {
1076            InternalError::serialize_corruption(format!(
1077                "row decode failed for field '{field_name}': float32 payload is non-finite",
1078            ))
1079        })
1080    }
1081}
1082
1083impl PersistedScalar for Float64 {
1084    const CODEC: ScalarCodec = ScalarCodec::Float64;
1085
1086    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1087        Ok(self.get().to_bits().to_le_bytes().to_vec())
1088    }
1089
1090    fn decode_scalar_payload(
1091        bytes: &[u8],
1092        field_name: &'static str,
1093    ) -> Result<Self, InternalError> {
1094        let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
1095            InternalError::serialize_corruption(format!(
1096                "row decode failed for field '{field_name}': float64 payload must be exactly {SCALAR_WORD64_PAYLOAD_LEN} bytes",
1097            ))
1098        })?;
1099        let value = f64::from_bits(u64::from_le_bytes(raw));
1100
1101        Self::try_new(value).ok_or_else(|| {
1102            InternalError::serialize_corruption(format!(
1103                "row decode failed for field '{field_name}': float64 payload is non-finite",
1104            ))
1105        })
1106    }
1107}
1108
1109impl PersistedScalar for Principal {
1110    const CODEC: ScalarCodec = ScalarCodec::Principal;
1111
1112    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1113        self.to_bytes().map_err(|err| {
1114            InternalError::serialize_internal(format!(
1115                "row encode failed for principal field: {err}",
1116            ))
1117        })
1118    }
1119
1120    fn decode_scalar_payload(
1121        bytes: &[u8],
1122        field_name: &'static str,
1123    ) -> Result<Self, InternalError> {
1124        Self::try_from_bytes(bytes).map_err(|err| {
1125            InternalError::serialize_corruption(format!(
1126                "row decode failed for field '{field_name}': {err}",
1127            ))
1128        })
1129    }
1130}
1131
1132impl PersistedScalar for Subaccount {
1133    const CODEC: ScalarCodec = ScalarCodec::Subaccount;
1134
1135    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1136        Ok(self.to_bytes().to_vec())
1137    }
1138
1139    fn decode_scalar_payload(
1140        bytes: &[u8],
1141        field_name: &'static str,
1142    ) -> Result<Self, InternalError> {
1143        let raw: [u8; SCALAR_SUBACCOUNT_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
1144            InternalError::serialize_corruption(format!(
1145                "row decode failed for field '{field_name}': subaccount payload must be exactly {SCALAR_SUBACCOUNT_PAYLOAD_LEN} bytes",
1146            ))
1147        })?;
1148
1149        Ok(Self::from_array(raw))
1150    }
1151}
1152
1153impl PersistedScalar for () {
1154    const CODEC: ScalarCodec = ScalarCodec::Unit;
1155
1156    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1157        Ok(Vec::new())
1158    }
1159
1160    fn decode_scalar_payload(
1161        bytes: &[u8],
1162        field_name: &'static str,
1163    ) -> Result<Self, InternalError> {
1164        if !bytes.is_empty() {
1165            return Err(InternalError::serialize_corruption(format!(
1166                "row decode failed for field '{field_name}': unit payload must be empty",
1167            )));
1168        }
1169
1170        Ok(())
1171    }
1172}