Skip to main content

icydb_core/db/data/
persisted_row.rs

1//! Module: data::persisted_row
2//! Responsibility: slot-oriented persisted-row seams over runtime row bytes.
3//! Does not own: row envelope versions, typed entity materialization, or query semantics.
4//! Boundary: commit/index planning, row writes, and typed materialization all
5//! consume the canonical slot-oriented persisted-row boundary here.
6
7use crate::{
8    db::data::{
9        DataKey, RawRow, StructuralRowDecodeError, StructuralRowFieldBytes,
10        decode_storage_key_field_bytes, decode_structural_field_by_kind_bytes,
11        decode_structural_value_storage_bytes,
12    },
13    error::InternalError,
14    model::{
15        entity::{EntityModel, resolve_primary_key_slot},
16        field::{FieldModel, LeafCodec, ScalarCodec},
17    },
18    serialize::{deserialize, serialize},
19    traits::EntityKind,
20    types::{Blob, Date, Duration, Float32, Float64, Principal, Subaccount, Timestamp, Ulid},
21    value::{StorageKey, Value},
22};
23use std::str;
24
25const SCALAR_SLOT_PREFIX: u8 = 0xFF;
26const SCALAR_SLOT_TAG_NULL: u8 = 0;
27const SCALAR_SLOT_TAG_VALUE: u8 = 1;
28
29///
30/// SlotReader
31///
32/// SlotReader exposes one persisted row as stable slot-addressable fields.
33/// Callers may inspect field presence, borrow raw field bytes, or decode one
34/// field value on demand.
35///
36
37pub trait SlotReader {
38    /// Return the structural model that owns this slot mapping.
39    fn model(&self) -> &'static EntityModel;
40
41    /// Return whether the given slot is present in the persisted row.
42    fn has(&self, slot: usize) -> bool;
43
44    /// Borrow the raw persisted payload for one slot when present.
45    fn get_bytes(&self, slot: usize) -> Option<&[u8]>;
46
47    /// Decode one slot as a scalar leaf when the field model declares a scalar codec.
48    fn get_scalar(&self, slot: usize) -> Result<Option<ScalarSlotValueRef<'_>>, InternalError>;
49
50    /// Decode one slot value on demand using the field contract declared by the model.
51    fn get_value(&mut self, slot: usize) -> Result<Option<Value>, InternalError>;
52}
53
54///
55/// SlotWriter
56///
57/// SlotWriter is the canonical row-container output seam used by persisted-row
58/// writers.
59///
60
61pub trait SlotWriter {
62    /// Record one slot payload for the current row.
63    fn write_slot(&mut self, slot: usize, payload: Option<&[u8]>) -> Result<(), InternalError>;
64
65    /// Record one scalar slot payload using the canonical scalar leaf envelope.
66    fn write_scalar(
67        &mut self,
68        slot: usize,
69        value: ScalarSlotValueRef<'_>,
70    ) -> Result<(), InternalError> {
71        let payload = encode_scalar_slot_value(value);
72
73        self.write_slot(slot, Some(payload.as_slice()))
74    }
75}
76
77///
78/// PersistedRow
79///
80/// PersistedRow is the derive-owned bridge between typed entities and
81/// slot-addressable persisted rows.
82/// It owns entity-specific materialization/default semantics while runtime
83/// paths stay structural at the row boundary.
84///
85
86pub trait PersistedRow: EntityKind + Sized {
87    /// Materialize one typed entity from one slot reader.
88    fn materialize_from_slots(slots: &mut dyn SlotReader) -> Result<Self, InternalError>;
89
90    /// Write one typed entity into one slot writer.
91    fn write_slots(&self, out: &mut dyn SlotWriter) -> Result<(), InternalError>;
92
93    /// Decode one slot value needed by structural planner/projection consumers.
94    fn project_slot(
95        slots: &mut dyn SlotReader,
96        slot: usize,
97    ) -> Result<Option<Value>, InternalError>;
98}
99
100/// Decode one slot value through the declared field contract without routing
101/// through `SlotReader::get_value`.
102pub(in crate::db) fn decode_slot_value_by_contract(
103    slots: &dyn SlotReader,
104    slot: usize,
105) -> Result<Option<Value>, InternalError> {
106    let field = slots.model().fields().get(slot).ok_or_else(|| {
107        InternalError::index_invariant(format!(
108            "slot lookup outside model bounds during structural row access: model='{}' slot={slot}",
109            slots.model().path(),
110        ))
111    })?;
112
113    if matches!(field.leaf_codec(), LeafCodec::Scalar(_))
114        && let Some(value) = slots.get_scalar(slot)?
115    {
116        return Ok(Some(match value {
117            ScalarSlotValueRef::Null => Value::Null,
118            ScalarSlotValueRef::Value(value) => value.into_value(),
119        }));
120    }
121
122    match slots.get_bytes(slot) {
123        Some(raw_value) => decode_non_scalar_slot_value(raw_value, field).map(Some),
124        None => Ok(None),
125    }
126}
127
128// Decode one non-scalar slot through the exact persisted contract declared by
129// the field model.
130fn decode_non_scalar_slot_value(
131    raw_value: &[u8],
132    field: &FieldModel,
133) -> Result<Value, InternalError> {
134    let decoded = match field.storage_decode() {
135        crate::model::field::FieldStorageDecode::ByKind => {
136            decode_structural_field_by_kind_bytes(raw_value, field.kind())
137        }
138        crate::model::field::FieldStorageDecode::Value => {
139            decode_structural_value_storage_bytes(raw_value)
140        }
141    };
142
143    decoded.map_err(|err| {
144        InternalError::serialize_corruption(format!(
145            "row decode failed for field '{}' kind={:?}: {err}",
146            field.name(),
147            field.kind(),
148        ))
149    })
150}
151
152///
153/// ScalarValueRef
154///
155/// ScalarValueRef is the borrowed-or-copy scalar payload view returned by the
156/// slot-reader fast path.
157/// It preserves cheap references for text/blob payloads while keeping fixed
158/// width scalar wrappers as copy values.
159///
160
161#[derive(Clone, Copy, Debug)]
162pub enum ScalarValueRef<'a> {
163    Blob(&'a [u8]),
164    Bool(bool),
165    Date(Date),
166    Duration(Duration),
167    Float32(Float32),
168    Float64(Float64),
169    Int(i64),
170    Principal(Principal),
171    Subaccount(Subaccount),
172    Text(&'a str),
173    Timestamp(Timestamp),
174    Uint(u64),
175    Ulid(Ulid),
176    Unit,
177}
178
179impl ScalarValueRef<'_> {
180    /// Materialize this scalar view into the runtime `Value` enum.
181    #[must_use]
182    pub fn into_value(self) -> Value {
183        match self {
184            Self::Blob(value) => Value::Blob(value.to_vec()),
185            Self::Bool(value) => Value::Bool(value),
186            Self::Date(value) => Value::Date(value),
187            Self::Duration(value) => Value::Duration(value),
188            Self::Float32(value) => Value::Float32(value),
189            Self::Float64(value) => Value::Float64(value),
190            Self::Int(value) => Value::Int(value),
191            Self::Principal(value) => Value::Principal(value),
192            Self::Subaccount(value) => Value::Subaccount(value),
193            Self::Text(value) => Value::Text(value.to_owned()),
194            Self::Timestamp(value) => Value::Timestamp(value),
195            Self::Uint(value) => Value::Uint(value),
196            Self::Ulid(value) => Value::Ulid(value),
197            Self::Unit => Value::Unit,
198        }
199    }
200}
201
202///
203/// ScalarSlotValueRef
204///
205/// ScalarSlotValueRef preserves the distinction between a missing slot and an
206/// explicitly persisted `NULL` scalar payload.
207/// The outer `Option` from `SlotReader::get_scalar` therefore still means
208/// "slot absent".
209///
210
211#[derive(Clone, Copy, Debug)]
212pub enum ScalarSlotValueRef<'a> {
213    Null,
214    Value(ScalarValueRef<'a>),
215}
216
217///
218/// PersistedScalar
219///
220/// PersistedScalar defines the canonical binary payload codec for one scalar
221/// leaf type.
222/// Derive-generated persisted-row materializers and writers use this trait to
223/// avoid routing scalar fields back through CBOR.
224///
225
226pub trait PersistedScalar: Sized {
227    /// Canonical scalar codec identifier used by schema/runtime metadata.
228    const CODEC: ScalarCodec;
229
230    /// Encode this scalar value into its codec-specific payload bytes.
231    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError>;
232
233    /// Decode this scalar value from its codec-specific payload bytes.
234    fn decode_scalar_payload(bytes: &[u8], field_name: &'static str)
235    -> Result<Self, InternalError>;
236}
237
238/// Encode one persisted slot payload using the shared leaf codec boundary.
239pub fn encode_persisted_slot_payload<T>(
240    value: &T,
241    field_name: &'static str,
242) -> Result<Vec<u8>, InternalError>
243where
244    T: serde::Serialize,
245{
246    serialize(value).map_err(|err| {
247        InternalError::serialize_internal(format!(
248            "row encode failed for field '{field_name}': {err}",
249        ))
250    })
251}
252
253/// Encode one persisted scalar slot payload using the canonical scalar envelope.
254pub fn encode_persisted_scalar_slot_payload<T>(
255    value: &T,
256    field_name: &'static str,
257) -> Result<Vec<u8>, InternalError>
258where
259    T: PersistedScalar,
260{
261    let payload = value.encode_scalar_payload()?;
262    let mut encoded = Vec::with_capacity(payload.len() + 2);
263    encoded.push(SCALAR_SLOT_PREFIX);
264    encoded.push(SCALAR_SLOT_TAG_VALUE);
265    encoded.extend_from_slice(&payload);
266
267    if encoded.len() < 2 {
268        return Err(InternalError::serialize_internal(format!(
269            "row encode failed for field '{field_name}': scalar payload envelope underflow",
270        )));
271    }
272
273    Ok(encoded)
274}
275
276/// Encode one optional persisted scalar slot payload preserving explicit `NULL`.
277pub fn encode_persisted_option_scalar_slot_payload<T>(
278    value: &Option<T>,
279    field_name: &'static str,
280) -> Result<Vec<u8>, InternalError>
281where
282    T: PersistedScalar,
283{
284    match value {
285        Some(value) => encode_persisted_scalar_slot_payload(value, field_name),
286        None => Ok(vec![SCALAR_SLOT_PREFIX, SCALAR_SLOT_TAG_NULL]),
287    }
288}
289
290/// Decode one persisted slot payload using the shared leaf codec boundary.
291pub fn decode_persisted_slot_payload<T>(
292    bytes: &[u8],
293    field_name: &'static str,
294) -> Result<T, InternalError>
295where
296    T: serde::de::DeserializeOwned,
297{
298    deserialize(bytes).map_err(|err| {
299        InternalError::serialize_corruption(format!(
300            "row decode failed for field '{field_name}': {err}",
301        ))
302    })
303}
304
305/// Decode one persisted scalar slot payload using the canonical scalar envelope.
306pub fn decode_persisted_scalar_slot_payload<T>(
307    bytes: &[u8],
308    field_name: &'static str,
309) -> Result<T, InternalError>
310where
311    T: PersistedScalar,
312{
313    let payload = decode_scalar_slot_payload_body(bytes, field_name)?.ok_or_else(|| {
314        InternalError::serialize_corruption(format!(
315            "row decode failed for field '{field_name}': unexpected null for non-nullable scalar field",
316        ))
317    })?;
318
319    T::decode_scalar_payload(payload, field_name)
320}
321
322/// Decode one optional persisted scalar slot payload preserving explicit `NULL`.
323pub fn decode_persisted_option_scalar_slot_payload<T>(
324    bytes: &[u8],
325    field_name: &'static str,
326) -> Result<Option<T>, InternalError>
327where
328    T: PersistedScalar,
329{
330    let Some(payload) = decode_scalar_slot_payload_body(bytes, field_name)? else {
331        return Ok(None);
332    };
333
334    T::decode_scalar_payload(payload, field_name).map(Some)
335}
336
337/// Build the canonical missing-field error for persisted-row materialization.
338#[must_use]
339pub fn missing_persisted_slot_error(field_name: &'static str) -> InternalError {
340    InternalError::serialize_corruption(format!(
341        "row decode failed: missing required field '{field_name}'",
342    ))
343}
344
345///
346/// SlotBufferWriter
347///
348/// SlotBufferWriter captures one row worth of slot payloads before they are
349/// encoded into the canonical slot container.
350///
351
352pub(in crate::db) struct SlotBufferWriter {
353    slots: Vec<Option<Vec<u8>>>,
354}
355
356impl SlotBufferWriter {
357    /// Build one empty slot buffer for one entity model.
358    pub(in crate::db) fn for_model(model: &'static EntityModel) -> Self {
359        Self {
360            slots: vec![None; model.fields().len()],
361        }
362    }
363
364    /// Encode the buffered slots into the canonical row payload.
365    pub(in crate::db) fn finish(self) -> Result<Vec<u8>, InternalError> {
366        let field_count = u16::try_from(self.slots.len()).map_err(|_| {
367            InternalError::serialize_internal(format!(
368                "row encode failed: field count {} exceeds u16 slot table capacity",
369                self.slots.len(),
370            ))
371        })?;
372        let mut payload_bytes = Vec::new();
373        let mut slot_table = Vec::with_capacity(self.slots.len());
374
375        // Phase 1: assign each present slot one payload span inside the data section.
376        for slot_payload in self.slots {
377            match slot_payload {
378                Some(bytes) => {
379                    let start = u32::try_from(payload_bytes.len()).map_err(|_| {
380                        InternalError::serialize_internal(
381                            "row encode failed: slot payload start exceeds u32 range",
382                        )
383                    })?;
384                    let len = u32::try_from(bytes.len()).map_err(|_| {
385                        InternalError::serialize_internal(
386                            "row encode failed: slot payload length exceeds u32 range",
387                        )
388                    })?;
389                    payload_bytes.extend_from_slice(&bytes);
390                    slot_table.push((start, len));
391                }
392                None => slot_table.push((0, 0)),
393            }
394        }
395
396        // Phase 2: write the fixed-width slot header followed by concatenated payloads.
397        let mut encoded = Vec::with_capacity(
398            usize::from(field_count) * (u32::BITS as usize / 4) + 2 + payload_bytes.len(),
399        );
400        encoded.extend_from_slice(&field_count.to_be_bytes());
401        for (start, len) in slot_table {
402            encoded.extend_from_slice(&start.to_be_bytes());
403            encoded.extend_from_slice(&len.to_be_bytes());
404        }
405        encoded.extend_from_slice(&payload_bytes);
406
407        Ok(encoded)
408    }
409}
410
411impl SlotWriter for SlotBufferWriter {
412    fn write_slot(&mut self, slot: usize, payload: Option<&[u8]>) -> Result<(), InternalError> {
413        let entry = self.slots.get_mut(slot).ok_or_else(|| {
414            InternalError::serialize_internal(format!(
415                "row encode failed: slot {slot} is outside the row layout",
416            ))
417        })?;
418        *entry = payload.map(<[u8]>::to_vec);
419
420        Ok(())
421    }
422}
423
424/// Encode one entity into the canonical slot-container payload.
425pub(in crate::db) fn encode_persisted_row<E>(entity: &E) -> Result<Vec<u8>, InternalError>
426where
427    E: PersistedRow,
428{
429    let mut writer = SlotBufferWriter::for_model(E::MODEL);
430    entity.write_slots(&mut writer)?;
431    writer.finish()
432}
433
434///
435/// StructuralSlotReader
436///
437/// StructuralSlotReader adapts the current persisted-row bytes into the
438/// canonical slot-reader seam.
439/// It caches decoded field values lazily so repeated index/predicate reads do
440/// not re-run the same field decoder within one row planning pass.
441///
442
443pub(in crate::db) struct StructuralSlotReader<'a> {
444    model: &'static EntityModel,
445    field_bytes: StructuralRowFieldBytes<'a>,
446    cached_values: Vec<CachedSlotValue>,
447}
448
449impl<'a> StructuralSlotReader<'a> {
450    /// Build one slot reader over one persisted row using the current structural row scanner.
451    pub(in crate::db) fn from_raw_row(
452        raw_row: &'a RawRow,
453        model: &'static EntityModel,
454    ) -> Result<Self, InternalError> {
455        let field_bytes =
456            StructuralRowFieldBytes::from_raw_row(raw_row, model).map_err(|err| match err {
457                StructuralRowDecodeError::Deserialize(source) => source,
458            })?;
459        let cached_values = std::iter::repeat_with(|| CachedSlotValue::Pending)
460            .take(model.fields().len())
461            .collect();
462
463        Ok(Self {
464            model,
465            field_bytes,
466            cached_values,
467        })
468    }
469
470    /// Validate the decoded primary-key slot against the authoritative row key.
471    pub(in crate::db) fn validate_storage_key(
472        &self,
473        data_key: &DataKey,
474    ) -> Result<(), InternalError> {
475        let Some(primary_key_slot) = resolve_primary_key_slot(self.model) else {
476            return Err(InternalError::index_invariant(format!(
477                "entity primary key field missing during structural row validation: {}",
478                self.model.path(),
479            )));
480        };
481        let field = self.field_model(primary_key_slot)?;
482        let decoded_key = match self.get_scalar(primary_key_slot)? {
483            Some(ScalarSlotValueRef::Null) => None,
484            Some(ScalarSlotValueRef::Value(value)) => storage_key_from_scalar_ref(value),
485            None => match self.field_bytes.field(primary_key_slot) {
486                Some(raw_value) => Some(
487                    decode_storage_key_field_bytes(raw_value, field.kind).map_err(|err| {
488                        InternalError::serialize_corruption(format!(
489                            "row decode failed: primary-key value is not storage-key encodable: {data_key} ({err})",
490                        ))
491                    })?,
492                ),
493                None => None,
494            },
495        };
496        let Some(decoded_key) = decoded_key else {
497            return Err(InternalError::serialize_corruption(format!(
498                "row decode failed: missing primary-key slot while validating {data_key}",
499            )));
500        };
501        let expected_key = data_key.storage_key();
502
503        if decoded_key != expected_key {
504            return Err(InternalError::store_corruption(format!(
505                "row key mismatch: expected {expected_key}, found {decoded_key}",
506            )));
507        }
508
509        Ok(())
510    }
511
512    // Resolve one field model entry by stable slot index.
513    fn field_model(&self, slot: usize) -> Result<&FieldModel, InternalError> {
514        self.model.fields().get(slot).ok_or_else(|| {
515            InternalError::index_invariant(format!(
516                "slot lookup outside model bounds during structural row access: model='{}' slot={slot}",
517                self.model.path(),
518            ))
519        })
520    }
521}
522
523// Convert one scalar slot fast-path value into its storage-key form when the
524// field kind is storage-key-compatible.
525const fn storage_key_from_scalar_ref(value: ScalarValueRef<'_>) -> Option<StorageKey> {
526    match value {
527        ScalarValueRef::Int(value) => Some(StorageKey::Int(value)),
528        ScalarValueRef::Principal(value) => Some(StorageKey::Principal(value)),
529        ScalarValueRef::Subaccount(value) => Some(StorageKey::Subaccount(value)),
530        ScalarValueRef::Timestamp(value) => Some(StorageKey::Timestamp(value)),
531        ScalarValueRef::Uint(value) => Some(StorageKey::Uint(value)),
532        ScalarValueRef::Ulid(value) => Some(StorageKey::Ulid(value)),
533        ScalarValueRef::Unit => Some(StorageKey::Unit),
534        _ => None,
535    }
536}
537
538impl SlotReader for StructuralSlotReader<'_> {
539    fn model(&self) -> &'static EntityModel {
540        self.model
541    }
542
543    fn has(&self, slot: usize) -> bool {
544        self.field_bytes.field(slot).is_some()
545    }
546
547    fn get_bytes(&self, slot: usize) -> Option<&[u8]> {
548        self.field_bytes.field(slot)
549    }
550
551    fn get_scalar(&self, slot: usize) -> Result<Option<ScalarSlotValueRef<'_>>, InternalError> {
552        let field = self.field_model(slot)?;
553        let Some(raw_value) = self.field_bytes.field(slot) else {
554            return Ok(None);
555        };
556
557        match field.leaf_codec() {
558            LeafCodec::Scalar(codec) => {
559                decode_scalar_slot_value(raw_value, codec, field.name()).map(Some)
560            }
561            LeafCodec::CborFallback => Ok(None),
562        }
563    }
564
565    fn get_value(&mut self, slot: usize) -> Result<Option<Value>, InternalError> {
566        let cached = self.cached_values.get(slot).ok_or_else(|| {
567            InternalError::index_invariant(format!(
568                "slot cache lookup outside model bounds during structural row access: model='{}' slot={slot}",
569                self.model.path(),
570            ))
571        })?;
572        if let CachedSlotValue::Decoded(value) = cached {
573            return Ok(value.clone());
574        }
575
576        let field = self.field_model(slot)?;
577        let value = match self.get_scalar(slot)? {
578            Some(ScalarSlotValueRef::Null) => Some(Value::Null),
579            Some(ScalarSlotValueRef::Value(value)) => Some(value.into_value()),
580            None => match self.field_bytes.field(slot) {
581                Some(raw_value) => Some(decode_non_scalar_slot_value(raw_value, field)?),
582                None => None,
583            },
584        };
585        self.cached_values[slot] = CachedSlotValue::Decoded(value.clone());
586
587        Ok(value)
588    }
589}
590
591///
592/// CachedSlotValue
593///
594/// CachedSlotValue tracks whether one slot has already been decoded during the
595/// current structural row access pass.
596///
597
598enum CachedSlotValue {
599    Pending,
600    Decoded(Option<Value>),
601}
602
603// Encode one scalar slot value into the canonical prefixed scalar envelope.
604fn encode_scalar_slot_value(value: ScalarSlotValueRef<'_>) -> Vec<u8> {
605    match value {
606        ScalarSlotValueRef::Null => vec![SCALAR_SLOT_PREFIX, SCALAR_SLOT_TAG_NULL],
607        ScalarSlotValueRef::Value(value) => {
608            let mut encoded = Vec::new();
609            encoded.push(SCALAR_SLOT_PREFIX);
610            encoded.push(SCALAR_SLOT_TAG_VALUE);
611
612            match value {
613                ScalarValueRef::Blob(bytes) => encoded.extend_from_slice(bytes),
614                ScalarValueRef::Bool(value) => encoded.push(u8::from(value)),
615                ScalarValueRef::Date(value) => {
616                    encoded.extend_from_slice(&value.as_days_since_epoch().to_le_bytes());
617                }
618                ScalarValueRef::Duration(value) => {
619                    encoded.extend_from_slice(&value.as_millis().to_le_bytes());
620                }
621                ScalarValueRef::Float32(value) => {
622                    encoded.extend_from_slice(&value.get().to_bits().to_le_bytes());
623                }
624                ScalarValueRef::Float64(value) => {
625                    encoded.extend_from_slice(&value.get().to_bits().to_le_bytes());
626                }
627                ScalarValueRef::Int(value) => encoded.extend_from_slice(&value.to_le_bytes()),
628                ScalarValueRef::Principal(value) => encoded.extend_from_slice(value.as_slice()),
629                ScalarValueRef::Subaccount(value) => encoded.extend_from_slice(&value.to_bytes()),
630                ScalarValueRef::Text(value) => encoded.extend_from_slice(value.as_bytes()),
631                ScalarValueRef::Timestamp(value) => {
632                    encoded.extend_from_slice(&value.as_millis().to_le_bytes());
633                }
634                ScalarValueRef::Uint(value) => encoded.extend_from_slice(&value.to_le_bytes()),
635                ScalarValueRef::Ulid(value) => encoded.extend_from_slice(&value.to_bytes()),
636                ScalarValueRef::Unit => {}
637            }
638
639            encoded
640        }
641    }
642}
643
644// Split one scalar slot envelope into `NULL` vs payload bytes.
645fn decode_scalar_slot_payload_body<'a>(
646    bytes: &'a [u8],
647    field_name: &'static str,
648) -> Result<Option<&'a [u8]>, InternalError> {
649    let Some((&prefix, rest)) = bytes.split_first() else {
650        return Err(InternalError::serialize_corruption(format!(
651            "row decode failed for field '{field_name}': empty scalar payload",
652        )));
653    };
654    if prefix != SCALAR_SLOT_PREFIX {
655        return Err(InternalError::serialize_corruption(format!(
656            "row decode failed for field '{field_name}': scalar payload prefix mismatch",
657        )));
658    }
659    let Some((&tag, payload)) = rest.split_first() else {
660        return Err(InternalError::serialize_corruption(format!(
661            "row decode failed for field '{field_name}': truncated scalar payload tag",
662        )));
663    };
664
665    match tag {
666        SCALAR_SLOT_TAG_NULL => {
667            if !payload.is_empty() {
668                return Err(InternalError::serialize_corruption(format!(
669                    "row decode failed for field '{field_name}': null scalar payload has trailing bytes",
670                )));
671            }
672
673            Ok(None)
674        }
675        SCALAR_SLOT_TAG_VALUE => Ok(Some(payload)),
676        _ => Err(InternalError::serialize_corruption(format!(
677            "row decode failed for field '{field_name}': invalid scalar payload tag {tag}",
678        ))),
679    }
680}
681
682// Decode one scalar slot view using the field-declared scalar codec.
683#[expect(clippy::too_many_lines)]
684fn decode_scalar_slot_value<'a>(
685    bytes: &'a [u8],
686    codec: ScalarCodec,
687    field_name: &'static str,
688) -> Result<ScalarSlotValueRef<'a>, InternalError> {
689    let Some(payload) = decode_scalar_slot_payload_body(bytes, field_name)? else {
690        return Ok(ScalarSlotValueRef::Null);
691    };
692
693    let value = match codec {
694        ScalarCodec::Blob => ScalarValueRef::Blob(payload),
695        ScalarCodec::Bool => {
696            let [value] = payload else {
697                return Err(InternalError::serialize_corruption(format!(
698                    "row decode failed for field '{field_name}': bool payload must be exactly 1 byte",
699                )));
700            };
701            match value {
702                0 => ScalarValueRef::Bool(false),
703                1 => ScalarValueRef::Bool(true),
704                _ => {
705                    return Err(InternalError::serialize_corruption(format!(
706                        "row decode failed for field '{field_name}': invalid bool payload byte {value}",
707                    )));
708                }
709            }
710        }
711        ScalarCodec::Date => {
712            let bytes: [u8; 4] = payload.try_into().map_err(|_| {
713                InternalError::serialize_corruption(format!(
714                    "row decode failed for field '{field_name}': date payload must be exactly 4 bytes",
715                ))
716            })?;
717            ScalarValueRef::Date(Date::from_days_since_epoch(i32::from_le_bytes(bytes)))
718        }
719        ScalarCodec::Duration => {
720            let bytes: [u8; 8] = payload.try_into().map_err(|_| {
721                InternalError::serialize_corruption(format!(
722                    "row decode failed for field '{field_name}': duration payload must be exactly 8 bytes",
723                ))
724            })?;
725            ScalarValueRef::Duration(Duration::from_millis(u64::from_le_bytes(bytes)))
726        }
727        ScalarCodec::Float32 => {
728            let bytes: [u8; 4] = payload.try_into().map_err(|_| {
729                InternalError::serialize_corruption(format!(
730                    "row decode failed for field '{field_name}': float32 payload must be exactly 4 bytes",
731                ))
732            })?;
733            let value = f32::from_bits(u32::from_le_bytes(bytes));
734            let value = Float32::try_new(value).ok_or_else(|| {
735                InternalError::serialize_corruption(format!(
736                    "row decode failed for field '{field_name}': float32 payload is non-finite",
737                ))
738            })?;
739            ScalarValueRef::Float32(value)
740        }
741        ScalarCodec::Float64 => {
742            let bytes: [u8; 8] = payload.try_into().map_err(|_| {
743                InternalError::serialize_corruption(format!(
744                    "row decode failed for field '{field_name}': float64 payload must be exactly 8 bytes",
745                ))
746            })?;
747            let value = f64::from_bits(u64::from_le_bytes(bytes));
748            let value = Float64::try_new(value).ok_or_else(|| {
749                InternalError::serialize_corruption(format!(
750                    "row decode failed for field '{field_name}': float64 payload is non-finite",
751                ))
752            })?;
753            ScalarValueRef::Float64(value)
754        }
755        ScalarCodec::Int64 => {
756            let bytes: [u8; 8] = payload.try_into().map_err(|_| {
757                InternalError::serialize_corruption(format!(
758                    "row decode failed for field '{field_name}': int payload must be exactly 8 bytes",
759                ))
760            })?;
761            ScalarValueRef::Int(i64::from_le_bytes(bytes))
762        }
763        ScalarCodec::Principal => {
764            ScalarValueRef::Principal(Principal::try_from_bytes(payload).map_err(|err| {
765                InternalError::serialize_corruption(format!(
766                    "row decode failed for field '{field_name}': {err}",
767                ))
768            })?)
769        }
770        ScalarCodec::Subaccount => {
771            let bytes: [u8; 32] = payload.try_into().map_err(|_| {
772                InternalError::serialize_corruption(format!(
773                    "row decode failed for field '{field_name}': subaccount payload must be exactly 32 bytes",
774                ))
775            })?;
776            ScalarValueRef::Subaccount(Subaccount::from_array(bytes))
777        }
778        ScalarCodec::Text => {
779            let value = str::from_utf8(payload).map_err(|err| {
780                InternalError::serialize_corruption(format!(
781                    "row decode failed for field '{field_name}': invalid UTF-8 text payload ({err})",
782                ))
783            })?;
784            ScalarValueRef::Text(value)
785        }
786        ScalarCodec::Timestamp => {
787            let bytes: [u8; 8] = payload.try_into().map_err(|_| {
788                InternalError::serialize_corruption(format!(
789                    "row decode failed for field '{field_name}': timestamp payload must be exactly 8 bytes",
790                ))
791            })?;
792            ScalarValueRef::Timestamp(Timestamp::from_millis(i64::from_le_bytes(bytes)))
793        }
794        ScalarCodec::Uint64 => {
795            let bytes: [u8; 8] = payload.try_into().map_err(|_| {
796                InternalError::serialize_corruption(format!(
797                    "row decode failed for field '{field_name}': uint payload must be exactly 8 bytes",
798                ))
799            })?;
800            ScalarValueRef::Uint(u64::from_le_bytes(bytes))
801        }
802        ScalarCodec::Ulid => {
803            let bytes: [u8; 16] = payload.try_into().map_err(|_| {
804                InternalError::serialize_corruption(format!(
805                    "row decode failed for field '{field_name}': ulid payload must be exactly 16 bytes",
806                ))
807            })?;
808            ScalarValueRef::Ulid(Ulid::from_bytes(bytes))
809        }
810        ScalarCodec::Unit => {
811            if !payload.is_empty() {
812                return Err(InternalError::serialize_corruption(format!(
813                    "row decode failed for field '{field_name}': unit payload must be empty",
814                )));
815            }
816            ScalarValueRef::Unit
817        }
818    };
819
820    Ok(ScalarSlotValueRef::Value(value))
821}
822
823macro_rules! impl_persisted_scalar_signed {
824    ($($ty:ty),* $(,)?) => {
825        $(
826            impl PersistedScalar for $ty {
827                const CODEC: ScalarCodec = ScalarCodec::Int64;
828
829                fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
830                    Ok(i64::from(*self).to_le_bytes().to_vec())
831                }
832
833                fn decode_scalar_payload(
834                    bytes: &[u8],
835                    field_name: &'static str,
836                ) -> Result<Self, InternalError> {
837                    let raw: [u8; 8] = bytes.try_into().map_err(|_| {
838                        InternalError::serialize_corruption(format!(
839                            "row decode failed for field '{field_name}': int payload must be exactly 8 bytes",
840                        ))
841                    })?;
842                    <$ty>::try_from(i64::from_le_bytes(raw)).map_err(|_| {
843                        InternalError::serialize_corruption(format!(
844                            "row decode failed for field '{field_name}': integer payload out of range for target type",
845                        ))
846                    })
847                }
848            }
849        )*
850    };
851}
852
853macro_rules! impl_persisted_scalar_unsigned {
854    ($($ty:ty),* $(,)?) => {
855        $(
856            impl PersistedScalar for $ty {
857                const CODEC: ScalarCodec = ScalarCodec::Uint64;
858
859                fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
860                    Ok(u64::from(*self).to_le_bytes().to_vec())
861                }
862
863                fn decode_scalar_payload(
864                    bytes: &[u8],
865                    field_name: &'static str,
866                ) -> Result<Self, InternalError> {
867                    let raw: [u8; 8] = bytes.try_into().map_err(|_| {
868                        InternalError::serialize_corruption(format!(
869                            "row decode failed for field '{field_name}': uint payload must be exactly 8 bytes",
870                        ))
871                    })?;
872                    <$ty>::try_from(u64::from_le_bytes(raw)).map_err(|_| {
873                        InternalError::serialize_corruption(format!(
874                            "row decode failed for field '{field_name}': unsigned payload out of range for target type",
875                        ))
876                    })
877                }
878            }
879        )*
880    };
881}
882
883impl_persisted_scalar_signed!(i8, i16, i32, i64);
884impl_persisted_scalar_unsigned!(u8, u16, u32, u64);
885
886impl PersistedScalar for bool {
887    const CODEC: ScalarCodec = ScalarCodec::Bool;
888
889    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
890        Ok(vec![u8::from(*self)])
891    }
892
893    fn decode_scalar_payload(
894        bytes: &[u8],
895        field_name: &'static str,
896    ) -> Result<Self, InternalError> {
897        let [value] = bytes else {
898            return Err(InternalError::serialize_corruption(format!(
899                "row decode failed for field '{field_name}': bool payload must be exactly 1 byte",
900            )));
901        };
902
903        match value {
904            0 => Ok(false),
905            1 => Ok(true),
906            _ => Err(InternalError::serialize_corruption(format!(
907                "row decode failed for field '{field_name}': invalid bool payload byte {value}",
908            ))),
909        }
910    }
911}
912
913impl PersistedScalar for String {
914    const CODEC: ScalarCodec = ScalarCodec::Text;
915
916    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
917        Ok(self.as_bytes().to_vec())
918    }
919
920    fn decode_scalar_payload(
921        bytes: &[u8],
922        field_name: &'static str,
923    ) -> Result<Self, InternalError> {
924        str::from_utf8(bytes).map(str::to_owned).map_err(|err| {
925            InternalError::serialize_corruption(format!(
926                "row decode failed for field '{field_name}': invalid UTF-8 text payload ({err})",
927            ))
928        })
929    }
930}
931
932impl PersistedScalar for Vec<u8> {
933    const CODEC: ScalarCodec = ScalarCodec::Blob;
934
935    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
936        Ok(self.clone())
937    }
938
939    fn decode_scalar_payload(
940        bytes: &[u8],
941        _field_name: &'static str,
942    ) -> Result<Self, InternalError> {
943        Ok(bytes.to_vec())
944    }
945}
946
947impl PersistedScalar for Blob {
948    const CODEC: ScalarCodec = ScalarCodec::Blob;
949
950    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
951        Ok(self.to_vec())
952    }
953
954    fn decode_scalar_payload(
955        bytes: &[u8],
956        _field_name: &'static str,
957    ) -> Result<Self, InternalError> {
958        Ok(Self::from(bytes))
959    }
960}
961
962impl PersistedScalar for Ulid {
963    const CODEC: ScalarCodec = ScalarCodec::Ulid;
964
965    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
966        Ok(self.to_bytes().to_vec())
967    }
968
969    fn decode_scalar_payload(
970        bytes: &[u8],
971        field_name: &'static str,
972    ) -> Result<Self, InternalError> {
973        Self::try_from_bytes(bytes).map_err(|err| {
974            InternalError::serialize_corruption(format!(
975                "row decode failed for field '{field_name}': {err}",
976            ))
977        })
978    }
979}
980
981impl PersistedScalar for Timestamp {
982    const CODEC: ScalarCodec = ScalarCodec::Timestamp;
983
984    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
985        Ok(self.as_millis().to_le_bytes().to_vec())
986    }
987
988    fn decode_scalar_payload(
989        bytes: &[u8],
990        field_name: &'static str,
991    ) -> Result<Self, InternalError> {
992        let raw: [u8; 8] = bytes.try_into().map_err(|_| {
993            InternalError::serialize_corruption(format!(
994                "row decode failed for field '{field_name}': timestamp payload must be exactly 8 bytes",
995            ))
996        })?;
997
998        Ok(Self::from_millis(i64::from_le_bytes(raw)))
999    }
1000}
1001
1002impl PersistedScalar for Date {
1003    const CODEC: ScalarCodec = ScalarCodec::Date;
1004
1005    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1006        Ok(self.as_days_since_epoch().to_le_bytes().to_vec())
1007    }
1008
1009    fn decode_scalar_payload(
1010        bytes: &[u8],
1011        field_name: &'static str,
1012    ) -> Result<Self, InternalError> {
1013        let raw: [u8; 4] = bytes.try_into().map_err(|_| {
1014            InternalError::serialize_corruption(format!(
1015                "row decode failed for field '{field_name}': date payload must be exactly 4 bytes",
1016            ))
1017        })?;
1018
1019        Ok(Self::from_days_since_epoch(i32::from_le_bytes(raw)))
1020    }
1021}
1022
1023impl PersistedScalar for Duration {
1024    const CODEC: ScalarCodec = ScalarCodec::Duration;
1025
1026    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1027        Ok(self.as_millis().to_le_bytes().to_vec())
1028    }
1029
1030    fn decode_scalar_payload(
1031        bytes: &[u8],
1032        field_name: &'static str,
1033    ) -> Result<Self, InternalError> {
1034        let raw: [u8; 8] = bytes.try_into().map_err(|_| {
1035            InternalError::serialize_corruption(format!(
1036                "row decode failed for field '{field_name}': duration payload must be exactly 8 bytes",
1037            ))
1038        })?;
1039
1040        Ok(Self::from_millis(u64::from_le_bytes(raw)))
1041    }
1042}
1043
1044impl PersistedScalar for Float32 {
1045    const CODEC: ScalarCodec = ScalarCodec::Float32;
1046
1047    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1048        Ok(self.get().to_bits().to_le_bytes().to_vec())
1049    }
1050
1051    fn decode_scalar_payload(
1052        bytes: &[u8],
1053        field_name: &'static str,
1054    ) -> Result<Self, InternalError> {
1055        let raw: [u8; 4] = bytes.try_into().map_err(|_| {
1056            InternalError::serialize_corruption(format!(
1057                "row decode failed for field '{field_name}': float32 payload must be exactly 4 bytes",
1058            ))
1059        })?;
1060        let value = f32::from_bits(u32::from_le_bytes(raw));
1061
1062        Self::try_new(value).ok_or_else(|| {
1063            InternalError::serialize_corruption(format!(
1064                "row decode failed for field '{field_name}': float32 payload is non-finite",
1065            ))
1066        })
1067    }
1068}
1069
1070impl PersistedScalar for Float64 {
1071    const CODEC: ScalarCodec = ScalarCodec::Float64;
1072
1073    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1074        Ok(self.get().to_bits().to_le_bytes().to_vec())
1075    }
1076
1077    fn decode_scalar_payload(
1078        bytes: &[u8],
1079        field_name: &'static str,
1080    ) -> Result<Self, InternalError> {
1081        let raw: [u8; 8] = bytes.try_into().map_err(|_| {
1082            InternalError::serialize_corruption(format!(
1083                "row decode failed for field '{field_name}': float64 payload must be exactly 8 bytes",
1084            ))
1085        })?;
1086        let value = f64::from_bits(u64::from_le_bytes(raw));
1087
1088        Self::try_new(value).ok_or_else(|| {
1089            InternalError::serialize_corruption(format!(
1090                "row decode failed for field '{field_name}': float64 payload is non-finite",
1091            ))
1092        })
1093    }
1094}
1095
1096impl PersistedScalar for Principal {
1097    const CODEC: ScalarCodec = ScalarCodec::Principal;
1098
1099    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1100        self.to_bytes().map_err(|err| {
1101            InternalError::serialize_internal(format!(
1102                "row encode failed for principal field: {err}",
1103            ))
1104        })
1105    }
1106
1107    fn decode_scalar_payload(
1108        bytes: &[u8],
1109        field_name: &'static str,
1110    ) -> Result<Self, InternalError> {
1111        Self::try_from_bytes(bytes).map_err(|err| {
1112            InternalError::serialize_corruption(format!(
1113                "row decode failed for field '{field_name}': {err}",
1114            ))
1115        })
1116    }
1117}
1118
1119impl PersistedScalar for Subaccount {
1120    const CODEC: ScalarCodec = ScalarCodec::Subaccount;
1121
1122    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1123        Ok(self.to_bytes().to_vec())
1124    }
1125
1126    fn decode_scalar_payload(
1127        bytes: &[u8],
1128        field_name: &'static str,
1129    ) -> Result<Self, InternalError> {
1130        let raw: [u8; 32] = bytes.try_into().map_err(|_| {
1131            InternalError::serialize_corruption(format!(
1132                "row decode failed for field '{field_name}': subaccount payload must be exactly 32 bytes",
1133            ))
1134        })?;
1135
1136        Ok(Self::from_array(raw))
1137    }
1138}
1139
1140impl PersistedScalar for () {
1141    const CODEC: ScalarCodec = ScalarCodec::Unit;
1142
1143    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1144        Ok(Vec::new())
1145    }
1146
1147    fn decode_scalar_payload(
1148        bytes: &[u8],
1149        field_name: &'static str,
1150    ) -> Result<Self, InternalError> {
1151        if !bytes.is_empty() {
1152            return Err(InternalError::serialize_corruption(format!(
1153                "row decode failed for field '{field_name}': unit payload must be empty",
1154            )));
1155        }
1156
1157        Ok(())
1158    }
1159}