Skip to main content

icydb_core/db/data/
persisted_row.rs

1//! Module: data::persisted_row
2//! Responsibility: slot-oriented persisted-row seams over runtime row bytes.
3//! Does not own: row envelope versions, typed entity materialization, or query semantics.
4//! Boundary: commit/index planning, row writes, and typed materialization all
5//! consume the canonical slot-oriented persisted-row boundary here.
6
7use crate::{
8    db::data::{
9        DataKey, RawRow, StructuralRowDecodeError, StructuralRowFieldBytes,
10        decode_structural_field_by_kind_bytes, decode_structural_value_storage_bytes,
11    },
12    error::InternalError,
13    model::{
14        entity::{EntityModel, resolve_primary_key_slot},
15        field::{FieldModel, LeafCodec, ScalarCodec},
16    },
17    serialize::{deserialize, serialize},
18    traits::EntityKind,
19    types::{Blob, Date, Duration, Float32, Float64, Principal, Subaccount, Timestamp, Ulid},
20    value::{StorageKey, Value},
21};
22use std::str;
23
24const SCALAR_SLOT_PREFIX: u8 = 0xFF;
25const SCALAR_SLOT_TAG_NULL: u8 = 0;
26const SCALAR_SLOT_TAG_VALUE: u8 = 1;
27
28///
29/// SlotReader
30///
31/// SlotReader exposes one persisted row as stable slot-addressable fields.
32/// Callers may inspect field presence, borrow raw field bytes, or decode one
33/// field value on demand.
34///
35
36pub trait SlotReader {
37    /// Return the structural model that owns this slot mapping.
38    fn model(&self) -> &'static EntityModel;
39
40    /// Return whether the given slot is present in the persisted row.
41    fn has(&self, slot: usize) -> bool;
42
43    /// Borrow the raw persisted payload for one slot when present.
44    fn get_bytes(&self, slot: usize) -> Option<&[u8]>;
45
46    /// Decode one slot as a scalar leaf when the field model declares a scalar codec.
47    fn get_scalar(&self, slot: usize) -> Result<Option<ScalarSlotValueRef<'_>>, InternalError>;
48
49    /// Decode one slot value on demand using the field contract declared by the model.
50    fn get_value(&mut self, slot: usize) -> Result<Option<Value>, InternalError>;
51}
52
53///
54/// SlotWriter
55///
56/// SlotWriter is the canonical row-container output seam used by persisted-row
57/// writers.
58///
59
60pub trait SlotWriter {
61    /// Record one slot payload for the current row.
62    fn write_slot(&mut self, slot: usize, payload: Option<&[u8]>) -> Result<(), InternalError>;
63
64    /// Record one scalar slot payload using the canonical scalar leaf envelope.
65    fn write_scalar(
66        &mut self,
67        slot: usize,
68        value: ScalarSlotValueRef<'_>,
69    ) -> Result<(), InternalError> {
70        let payload = encode_scalar_slot_value(value);
71
72        self.write_slot(slot, Some(payload.as_slice()))
73    }
74}
75
76///
77/// PersistedRow
78///
79/// PersistedRow is the derive-owned bridge between typed entities and
80/// slot-addressable persisted rows.
81/// It owns entity-specific materialization/default semantics while runtime
82/// paths stay structural at the row boundary.
83///
84
85pub trait PersistedRow: EntityKind + Sized {
86    /// Materialize one typed entity from one slot reader.
87    fn materialize_from_slots(slots: &mut dyn SlotReader) -> Result<Self, InternalError>;
88
89    /// Write one typed entity into one slot writer.
90    fn write_slots(&self, out: &mut dyn SlotWriter) -> Result<(), InternalError>;
91
92    /// Decode one slot value needed by structural planner/projection consumers.
93    fn project_slot(
94        slots: &mut dyn SlotReader,
95        slot: usize,
96    ) -> Result<Option<Value>, InternalError>;
97}
98
99/// Decode one slot value through the declared field contract without routing
100/// through `SlotReader::get_value`.
101pub(in crate::db) fn decode_slot_value_by_contract(
102    slots: &dyn SlotReader,
103    slot: usize,
104) -> Result<Option<Value>, InternalError> {
105    let field = slots.model().fields().get(slot).ok_or_else(|| {
106        InternalError::index_invariant(format!(
107            "slot lookup outside model bounds during structural row access: model='{}' slot={slot}",
108            slots.model().path(),
109        ))
110    })?;
111
112    if matches!(field.leaf_codec(), LeafCodec::Scalar(_))
113        && let Some(value) = slots.get_scalar(slot)?
114    {
115        return Ok(Some(match value {
116            ScalarSlotValueRef::Null => Value::Null,
117            ScalarSlotValueRef::Value(value) => value.into_value(),
118        }));
119    }
120
121    match slots.get_bytes(slot) {
122        Some(raw_value) => decode_non_scalar_slot_value(raw_value, field).map(Some),
123        None => Ok(None),
124    }
125}
126
127// Decode one non-scalar slot through the exact persisted contract declared by
128// the field model.
129fn decode_non_scalar_slot_value(
130    raw_value: &[u8],
131    field: &FieldModel,
132) -> Result<Value, InternalError> {
133    let decoded = match field.storage_decode() {
134        crate::model::field::FieldStorageDecode::ByKind => {
135            decode_structural_field_by_kind_bytes(raw_value, field.kind())
136        }
137        crate::model::field::FieldStorageDecode::Value => {
138            decode_structural_value_storage_bytes(raw_value)
139        }
140    };
141
142    decoded.map_err(|err| {
143        InternalError::serialize_corruption(format!(
144            "row decode failed for field '{}' kind={:?}: {err}",
145            field.name(),
146            field.kind(),
147        ))
148    })
149}
150
151///
152/// ScalarValueRef
153///
154/// ScalarValueRef is the borrowed-or-copy scalar payload view returned by the
155/// slot-reader fast path.
156/// It preserves cheap references for text/blob payloads while keeping fixed
157/// width scalar wrappers as copy values.
158///
159
160#[derive(Clone, Copy, Debug)]
161pub enum ScalarValueRef<'a> {
162    Blob(&'a [u8]),
163    Bool(bool),
164    Date(Date),
165    Duration(Duration),
166    Float32(Float32),
167    Float64(Float64),
168    Int(i64),
169    Principal(Principal),
170    Subaccount(Subaccount),
171    Text(&'a str),
172    Timestamp(Timestamp),
173    Uint(u64),
174    Ulid(Ulid),
175    Unit,
176}
177
178impl ScalarValueRef<'_> {
179    /// Materialize this scalar view into the runtime `Value` enum.
180    #[must_use]
181    pub fn into_value(self) -> Value {
182        match self {
183            Self::Blob(value) => Value::Blob(value.to_vec()),
184            Self::Bool(value) => Value::Bool(value),
185            Self::Date(value) => Value::Date(value),
186            Self::Duration(value) => Value::Duration(value),
187            Self::Float32(value) => Value::Float32(value),
188            Self::Float64(value) => Value::Float64(value),
189            Self::Int(value) => Value::Int(value),
190            Self::Principal(value) => Value::Principal(value),
191            Self::Subaccount(value) => Value::Subaccount(value),
192            Self::Text(value) => Value::Text(value.to_owned()),
193            Self::Timestamp(value) => Value::Timestamp(value),
194            Self::Uint(value) => Value::Uint(value),
195            Self::Ulid(value) => Value::Ulid(value),
196            Self::Unit => Value::Unit,
197        }
198    }
199}
200
201///
202/// ScalarSlotValueRef
203///
204/// ScalarSlotValueRef preserves the distinction between a missing slot and an
205/// explicitly persisted `NULL` scalar payload.
206/// The outer `Option` from `SlotReader::get_scalar` therefore still means
207/// "slot absent".
208///
209
210#[derive(Clone, Copy, Debug)]
211pub enum ScalarSlotValueRef<'a> {
212    Null,
213    Value(ScalarValueRef<'a>),
214}
215
216///
217/// PersistedScalar
218///
219/// PersistedScalar defines the canonical binary payload codec for one scalar
220/// leaf type.
221/// Derive-generated persisted-row materializers and writers use this trait to
222/// avoid routing scalar fields back through CBOR.
223///
224
225pub trait PersistedScalar: Sized {
226    /// Canonical scalar codec identifier used by schema/runtime metadata.
227    const CODEC: ScalarCodec;
228
229    /// Encode this scalar value into its codec-specific payload bytes.
230    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError>;
231
232    /// Decode this scalar value from its codec-specific payload bytes.
233    fn decode_scalar_payload(bytes: &[u8], field_name: &'static str)
234    -> Result<Self, InternalError>;
235}
236
237/// Encode one persisted slot payload using the shared leaf codec boundary.
238pub fn encode_persisted_slot_payload<T>(
239    value: &T,
240    field_name: &'static str,
241) -> Result<Vec<u8>, InternalError>
242where
243    T: serde::Serialize,
244{
245    serialize(value).map_err(|err| {
246        InternalError::serialize_internal(format!(
247            "row encode failed for field '{field_name}': {err}",
248        ))
249    })
250}
251
252/// Encode one persisted scalar slot payload using the canonical scalar envelope.
253pub fn encode_persisted_scalar_slot_payload<T>(
254    value: &T,
255    field_name: &'static str,
256) -> Result<Vec<u8>, InternalError>
257where
258    T: PersistedScalar,
259{
260    let payload = value.encode_scalar_payload()?;
261    let mut encoded = Vec::with_capacity(payload.len() + 2);
262    encoded.push(SCALAR_SLOT_PREFIX);
263    encoded.push(SCALAR_SLOT_TAG_VALUE);
264    encoded.extend_from_slice(&payload);
265
266    if encoded.len() < 2 {
267        return Err(InternalError::serialize_internal(format!(
268            "row encode failed for field '{field_name}': scalar payload envelope underflow",
269        )));
270    }
271
272    Ok(encoded)
273}
274
275/// Encode one optional persisted scalar slot payload preserving explicit `NULL`.
276pub fn encode_persisted_option_scalar_slot_payload<T>(
277    value: &Option<T>,
278    field_name: &'static str,
279) -> Result<Vec<u8>, InternalError>
280where
281    T: PersistedScalar,
282{
283    match value {
284        Some(value) => encode_persisted_scalar_slot_payload(value, field_name),
285        None => Ok(vec![SCALAR_SLOT_PREFIX, SCALAR_SLOT_TAG_NULL]),
286    }
287}
288
289/// Decode one persisted slot payload using the shared leaf codec boundary.
290pub fn decode_persisted_slot_payload<T>(
291    bytes: &[u8],
292    field_name: &'static str,
293) -> Result<T, InternalError>
294where
295    T: serde::de::DeserializeOwned,
296{
297    deserialize(bytes).map_err(|err| {
298        InternalError::serialize_corruption(format!(
299            "row decode failed for field '{field_name}': {err}",
300        ))
301    })
302}
303
304/// Decode one persisted scalar slot payload using the canonical scalar envelope.
305pub fn decode_persisted_scalar_slot_payload<T>(
306    bytes: &[u8],
307    field_name: &'static str,
308) -> Result<T, InternalError>
309where
310    T: PersistedScalar,
311{
312    let payload = decode_scalar_slot_payload_body(bytes, field_name)?.ok_or_else(|| {
313        InternalError::serialize_corruption(format!(
314            "row decode failed for field '{field_name}': unexpected null for non-nullable scalar field",
315        ))
316    })?;
317
318    T::decode_scalar_payload(payload, field_name)
319}
320
321/// Decode one optional persisted scalar slot payload preserving explicit `NULL`.
322pub fn decode_persisted_option_scalar_slot_payload<T>(
323    bytes: &[u8],
324    field_name: &'static str,
325) -> Result<Option<T>, InternalError>
326where
327    T: PersistedScalar,
328{
329    let Some(payload) = decode_scalar_slot_payload_body(bytes, field_name)? else {
330        return Ok(None);
331    };
332
333    T::decode_scalar_payload(payload, field_name).map(Some)
334}
335
336/// Build the canonical missing-field error for persisted-row materialization.
337#[must_use]
338pub fn missing_persisted_slot_error(field_name: &'static str) -> InternalError {
339    InternalError::serialize_corruption(format!(
340        "row decode failed: missing required field '{field_name}'",
341    ))
342}
343
344///
345/// SlotBufferWriter
346///
347/// SlotBufferWriter captures one row worth of slot payloads before they are
348/// encoded into the canonical slot container.
349///
350
351pub(in crate::db) struct SlotBufferWriter {
352    slots: Vec<Option<Vec<u8>>>,
353}
354
355impl SlotBufferWriter {
356    /// Build one empty slot buffer for one entity model.
357    pub(in crate::db) fn for_model(model: &'static EntityModel) -> Self {
358        Self {
359            slots: vec![None; model.fields().len()],
360        }
361    }
362
363    /// Encode the buffered slots into the canonical row payload.
364    pub(in crate::db) fn finish(self) -> Result<Vec<u8>, InternalError> {
365        let field_count = u16::try_from(self.slots.len()).map_err(|_| {
366            InternalError::serialize_internal(format!(
367                "row encode failed: field count {} exceeds u16 slot table capacity",
368                self.slots.len(),
369            ))
370        })?;
371        let mut payload_bytes = Vec::new();
372        let mut slot_table = Vec::with_capacity(self.slots.len());
373
374        // Phase 1: assign each present slot one payload span inside the data section.
375        for slot_payload in self.slots {
376            match slot_payload {
377                Some(bytes) => {
378                    let start = u32::try_from(payload_bytes.len()).map_err(|_| {
379                        InternalError::serialize_internal(
380                            "row encode failed: slot payload start exceeds u32 range",
381                        )
382                    })?;
383                    let len = u32::try_from(bytes.len()).map_err(|_| {
384                        InternalError::serialize_internal(
385                            "row encode failed: slot payload length exceeds u32 range",
386                        )
387                    })?;
388                    payload_bytes.extend_from_slice(&bytes);
389                    slot_table.push((start, len));
390                }
391                None => slot_table.push((0, 0)),
392            }
393        }
394
395        // Phase 2: write the fixed-width slot header followed by concatenated payloads.
396        let mut encoded = Vec::with_capacity(
397            usize::from(field_count) * (u32::BITS as usize / 4) + 2 + payload_bytes.len(),
398        );
399        encoded.extend_from_slice(&field_count.to_be_bytes());
400        for (start, len) in slot_table {
401            encoded.extend_from_slice(&start.to_be_bytes());
402            encoded.extend_from_slice(&len.to_be_bytes());
403        }
404        encoded.extend_from_slice(&payload_bytes);
405
406        Ok(encoded)
407    }
408}
409
410impl SlotWriter for SlotBufferWriter {
411    fn write_slot(&mut self, slot: usize, payload: Option<&[u8]>) -> Result<(), InternalError> {
412        let entry = self.slots.get_mut(slot).ok_or_else(|| {
413            InternalError::serialize_internal(format!(
414                "row encode failed: slot {slot} is outside the row layout",
415            ))
416        })?;
417        *entry = payload.map(<[u8]>::to_vec);
418
419        Ok(())
420    }
421}
422
423/// Encode one entity into the canonical slot-container payload.
424pub(in crate::db) fn encode_persisted_row<E>(entity: &E) -> Result<Vec<u8>, InternalError>
425where
426    E: PersistedRow,
427{
428    let mut writer = SlotBufferWriter::for_model(E::MODEL);
429    entity.write_slots(&mut writer)?;
430    writer.finish()
431}
432
433///
434/// StructuralSlotReader
435///
436/// StructuralSlotReader adapts the current persisted-row bytes into the
437/// canonical slot-reader seam.
438/// It caches decoded field values lazily so repeated index/predicate reads do
439/// not re-run the same field decoder within one row planning pass.
440///
441
442pub(in crate::db) struct StructuralSlotReader<'a> {
443    model: &'static EntityModel,
444    field_bytes: StructuralRowFieldBytes<'a>,
445    cached_values: Vec<CachedSlotValue>,
446}
447
448impl<'a> StructuralSlotReader<'a> {
449    /// Build one slot reader over one persisted row using the current structural row scanner.
450    pub(in crate::db) fn from_raw_row(
451        raw_row: &'a RawRow,
452        model: &'static EntityModel,
453    ) -> Result<Self, InternalError> {
454        let field_bytes =
455            StructuralRowFieldBytes::from_raw_row(raw_row, model).map_err(|err| match err {
456                StructuralRowDecodeError::Deserialize(source) => source,
457            })?;
458        let cached_values = std::iter::repeat_with(|| CachedSlotValue::Pending)
459            .take(model.fields().len())
460            .collect();
461
462        Ok(Self {
463            model,
464            field_bytes,
465            cached_values,
466        })
467    }
468
469    /// Validate the decoded primary-key slot against the authoritative row key.
470    pub(in crate::db) fn validate_storage_key_for_entity<E: EntityKind>(
471        &self,
472        data_key: &DataKey,
473    ) -> Result<(), InternalError> {
474        let Some(primary_key_slot) = resolve_primary_key_slot(E::MODEL) else {
475            return Err(InternalError::index_invariant(format!(
476                "entity primary key field missing during structural row validation: {} field={}",
477                E::PATH,
478                E::PRIMARY_KEY
479            )));
480        };
481        let field = self.field_model(primary_key_slot)?;
482        let primary_key_value = match self.get_scalar(primary_key_slot)? {
483            Some(ScalarSlotValueRef::Null) => Some(Value::Null),
484            Some(ScalarSlotValueRef::Value(value)) => Some(value.into_value()),
485            None => match self.field_bytes.field(primary_key_slot) {
486                Some(raw_value) => Some(decode_non_scalar_slot_value(raw_value, field)?),
487                None => None,
488            },
489        };
490        let Some(primary_key_value) = primary_key_value else {
491            return Err(InternalError::serialize_corruption(format!(
492                "row decode failed: missing primary-key slot while validating {data_key}",
493            )));
494        };
495        let decoded_key = StorageKey::try_from_value(&primary_key_value).map_err(|err| {
496            InternalError::serialize_corruption(format!(
497                "row decode failed: primary-key value is not storage-key encodable: {data_key} ({err})",
498            ))
499        })?;
500        let expected_key = data_key.storage_key();
501
502        if decoded_key != expected_key {
503            return Err(InternalError::store_corruption(format!(
504                "row key mismatch: expected {expected_key}, found {decoded_key}",
505            )));
506        }
507
508        Ok(())
509    }
510
511    // Resolve one field model entry by stable slot index.
512    fn field_model(&self, slot: usize) -> Result<&FieldModel, InternalError> {
513        self.model.fields().get(slot).ok_or_else(|| {
514            InternalError::index_invariant(format!(
515                "slot lookup outside model bounds during structural row access: model='{}' slot={slot}",
516                self.model.path(),
517            ))
518        })
519    }
520}
521
522impl SlotReader for StructuralSlotReader<'_> {
523    fn model(&self) -> &'static EntityModel {
524        self.model
525    }
526
527    fn has(&self, slot: usize) -> bool {
528        self.field_bytes.field(slot).is_some()
529    }
530
531    fn get_bytes(&self, slot: usize) -> Option<&[u8]> {
532        self.field_bytes.field(slot)
533    }
534
535    fn get_scalar(&self, slot: usize) -> Result<Option<ScalarSlotValueRef<'_>>, InternalError> {
536        let field = self.field_model(slot)?;
537        let Some(raw_value) = self.field_bytes.field(slot) else {
538            return Ok(None);
539        };
540
541        match field.leaf_codec() {
542            LeafCodec::Scalar(codec) => {
543                decode_scalar_slot_value(raw_value, codec, field.name()).map(Some)
544            }
545            LeafCodec::CborFallback => Ok(None),
546        }
547    }
548
549    fn get_value(&mut self, slot: usize) -> Result<Option<Value>, InternalError> {
550        let cached = self.cached_values.get(slot).ok_or_else(|| {
551            InternalError::index_invariant(format!(
552                "slot cache lookup outside model bounds during structural row access: model='{}' slot={slot}",
553                self.model.path(),
554            ))
555        })?;
556        if let CachedSlotValue::Decoded(value) = cached {
557            return Ok(value.clone());
558        }
559
560        let field = self.field_model(slot)?;
561        let value = match self.get_scalar(slot)? {
562            Some(ScalarSlotValueRef::Null) => Some(Value::Null),
563            Some(ScalarSlotValueRef::Value(value)) => Some(value.into_value()),
564            None => match self.field_bytes.field(slot) {
565                Some(raw_value) => Some(decode_non_scalar_slot_value(raw_value, field)?),
566                None => None,
567            },
568        };
569        self.cached_values[slot] = CachedSlotValue::Decoded(value.clone());
570
571        Ok(value)
572    }
573}
574
575///
576/// CachedSlotValue
577///
578/// CachedSlotValue tracks whether one slot has already been decoded during the
579/// current structural row access pass.
580///
581
582enum CachedSlotValue {
583    Pending,
584    Decoded(Option<Value>),
585}
586
587// Encode one scalar slot value into the canonical prefixed scalar envelope.
588fn encode_scalar_slot_value(value: ScalarSlotValueRef<'_>) -> Vec<u8> {
589    match value {
590        ScalarSlotValueRef::Null => vec![SCALAR_SLOT_PREFIX, SCALAR_SLOT_TAG_NULL],
591        ScalarSlotValueRef::Value(value) => {
592            let mut encoded = Vec::new();
593            encoded.push(SCALAR_SLOT_PREFIX);
594            encoded.push(SCALAR_SLOT_TAG_VALUE);
595
596            match value {
597                ScalarValueRef::Blob(bytes) => encoded.extend_from_slice(bytes),
598                ScalarValueRef::Bool(value) => encoded.push(u8::from(value)),
599                ScalarValueRef::Date(value) => {
600                    encoded.extend_from_slice(&value.as_days_since_epoch().to_le_bytes());
601                }
602                ScalarValueRef::Duration(value) => {
603                    encoded.extend_from_slice(&value.as_millis().to_le_bytes());
604                }
605                ScalarValueRef::Float32(value) => {
606                    encoded.extend_from_slice(&value.get().to_bits().to_le_bytes());
607                }
608                ScalarValueRef::Float64(value) => {
609                    encoded.extend_from_slice(&value.get().to_bits().to_le_bytes());
610                }
611                ScalarValueRef::Int(value) => encoded.extend_from_slice(&value.to_le_bytes()),
612                ScalarValueRef::Principal(value) => encoded.extend_from_slice(value.as_slice()),
613                ScalarValueRef::Subaccount(value) => encoded.extend_from_slice(&value.to_bytes()),
614                ScalarValueRef::Text(value) => encoded.extend_from_slice(value.as_bytes()),
615                ScalarValueRef::Timestamp(value) => {
616                    encoded.extend_from_slice(&value.as_millis().to_le_bytes());
617                }
618                ScalarValueRef::Uint(value) => encoded.extend_from_slice(&value.to_le_bytes()),
619                ScalarValueRef::Ulid(value) => encoded.extend_from_slice(&value.to_bytes()),
620                ScalarValueRef::Unit => {}
621            }
622
623            encoded
624        }
625    }
626}
627
628// Split one scalar slot envelope into `NULL` vs payload bytes.
629fn decode_scalar_slot_payload_body<'a>(
630    bytes: &'a [u8],
631    field_name: &'static str,
632) -> Result<Option<&'a [u8]>, InternalError> {
633    let Some((&prefix, rest)) = bytes.split_first() else {
634        return Err(InternalError::serialize_corruption(format!(
635            "row decode failed for field '{field_name}': empty scalar payload",
636        )));
637    };
638    if prefix != SCALAR_SLOT_PREFIX {
639        return Err(InternalError::serialize_corruption(format!(
640            "row decode failed for field '{field_name}': scalar payload prefix mismatch",
641        )));
642    }
643    let Some((&tag, payload)) = rest.split_first() else {
644        return Err(InternalError::serialize_corruption(format!(
645            "row decode failed for field '{field_name}': truncated scalar payload tag",
646        )));
647    };
648
649    match tag {
650        SCALAR_SLOT_TAG_NULL => {
651            if !payload.is_empty() {
652                return Err(InternalError::serialize_corruption(format!(
653                    "row decode failed for field '{field_name}': null scalar payload has trailing bytes",
654                )));
655            }
656
657            Ok(None)
658        }
659        SCALAR_SLOT_TAG_VALUE => Ok(Some(payload)),
660        _ => Err(InternalError::serialize_corruption(format!(
661            "row decode failed for field '{field_name}': invalid scalar payload tag {tag}",
662        ))),
663    }
664}
665
666// Decode one scalar slot view using the field-declared scalar codec.
667#[expect(clippy::too_many_lines)]
668fn decode_scalar_slot_value<'a>(
669    bytes: &'a [u8],
670    codec: ScalarCodec,
671    field_name: &'static str,
672) -> Result<ScalarSlotValueRef<'a>, InternalError> {
673    let Some(payload) = decode_scalar_slot_payload_body(bytes, field_name)? else {
674        return Ok(ScalarSlotValueRef::Null);
675    };
676
677    let value = match codec {
678        ScalarCodec::Blob => ScalarValueRef::Blob(payload),
679        ScalarCodec::Bool => {
680            let [value] = payload else {
681                return Err(InternalError::serialize_corruption(format!(
682                    "row decode failed for field '{field_name}': bool payload must be exactly 1 byte",
683                )));
684            };
685            match value {
686                0 => ScalarValueRef::Bool(false),
687                1 => ScalarValueRef::Bool(true),
688                _ => {
689                    return Err(InternalError::serialize_corruption(format!(
690                        "row decode failed for field '{field_name}': invalid bool payload byte {value}",
691                    )));
692                }
693            }
694        }
695        ScalarCodec::Date => {
696            let bytes: [u8; 4] = payload.try_into().map_err(|_| {
697                InternalError::serialize_corruption(format!(
698                    "row decode failed for field '{field_name}': date payload must be exactly 4 bytes",
699                ))
700            })?;
701            ScalarValueRef::Date(Date::from_days_since_epoch(i32::from_le_bytes(bytes)))
702        }
703        ScalarCodec::Duration => {
704            let bytes: [u8; 8] = payload.try_into().map_err(|_| {
705                InternalError::serialize_corruption(format!(
706                    "row decode failed for field '{field_name}': duration payload must be exactly 8 bytes",
707                ))
708            })?;
709            ScalarValueRef::Duration(Duration::from_millis(u64::from_le_bytes(bytes)))
710        }
711        ScalarCodec::Float32 => {
712            let bytes: [u8; 4] = payload.try_into().map_err(|_| {
713                InternalError::serialize_corruption(format!(
714                    "row decode failed for field '{field_name}': float32 payload must be exactly 4 bytes",
715                ))
716            })?;
717            let value = f32::from_bits(u32::from_le_bytes(bytes));
718            let value = Float32::try_new(value).ok_or_else(|| {
719                InternalError::serialize_corruption(format!(
720                    "row decode failed for field '{field_name}': float32 payload is non-finite",
721                ))
722            })?;
723            ScalarValueRef::Float32(value)
724        }
725        ScalarCodec::Float64 => {
726            let bytes: [u8; 8] = payload.try_into().map_err(|_| {
727                InternalError::serialize_corruption(format!(
728                    "row decode failed for field '{field_name}': float64 payload must be exactly 8 bytes",
729                ))
730            })?;
731            let value = f64::from_bits(u64::from_le_bytes(bytes));
732            let value = Float64::try_new(value).ok_or_else(|| {
733                InternalError::serialize_corruption(format!(
734                    "row decode failed for field '{field_name}': float64 payload is non-finite",
735                ))
736            })?;
737            ScalarValueRef::Float64(value)
738        }
739        ScalarCodec::Int64 => {
740            let bytes: [u8; 8] = payload.try_into().map_err(|_| {
741                InternalError::serialize_corruption(format!(
742                    "row decode failed for field '{field_name}': int payload must be exactly 8 bytes",
743                ))
744            })?;
745            ScalarValueRef::Int(i64::from_le_bytes(bytes))
746        }
747        ScalarCodec::Principal => {
748            ScalarValueRef::Principal(Principal::try_from_bytes(payload).map_err(|err| {
749                InternalError::serialize_corruption(format!(
750                    "row decode failed for field '{field_name}': {err}",
751                ))
752            })?)
753        }
754        ScalarCodec::Subaccount => {
755            let bytes: [u8; 32] = payload.try_into().map_err(|_| {
756                InternalError::serialize_corruption(format!(
757                    "row decode failed for field '{field_name}': subaccount payload must be exactly 32 bytes",
758                ))
759            })?;
760            ScalarValueRef::Subaccount(Subaccount::from_array(bytes))
761        }
762        ScalarCodec::Text => {
763            let value = str::from_utf8(payload).map_err(|err| {
764                InternalError::serialize_corruption(format!(
765                    "row decode failed for field '{field_name}': invalid UTF-8 text payload ({err})",
766                ))
767            })?;
768            ScalarValueRef::Text(value)
769        }
770        ScalarCodec::Timestamp => {
771            let bytes: [u8; 8] = payload.try_into().map_err(|_| {
772                InternalError::serialize_corruption(format!(
773                    "row decode failed for field '{field_name}': timestamp payload must be exactly 8 bytes",
774                ))
775            })?;
776            ScalarValueRef::Timestamp(Timestamp::from_millis(i64::from_le_bytes(bytes)))
777        }
778        ScalarCodec::Uint64 => {
779            let bytes: [u8; 8] = payload.try_into().map_err(|_| {
780                InternalError::serialize_corruption(format!(
781                    "row decode failed for field '{field_name}': uint payload must be exactly 8 bytes",
782                ))
783            })?;
784            ScalarValueRef::Uint(u64::from_le_bytes(bytes))
785        }
786        ScalarCodec::Ulid => {
787            let bytes: [u8; 16] = payload.try_into().map_err(|_| {
788                InternalError::serialize_corruption(format!(
789                    "row decode failed for field '{field_name}': ulid payload must be exactly 16 bytes",
790                ))
791            })?;
792            ScalarValueRef::Ulid(Ulid::from_bytes(bytes))
793        }
794        ScalarCodec::Unit => {
795            if !payload.is_empty() {
796                return Err(InternalError::serialize_corruption(format!(
797                    "row decode failed for field '{field_name}': unit payload must be empty",
798                )));
799            }
800            ScalarValueRef::Unit
801        }
802    };
803
804    Ok(ScalarSlotValueRef::Value(value))
805}
806
807macro_rules! impl_persisted_scalar_signed {
808    ($($ty:ty),* $(,)?) => {
809        $(
810            impl PersistedScalar for $ty {
811                const CODEC: ScalarCodec = ScalarCodec::Int64;
812
813                fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
814                    Ok(i64::from(*self).to_le_bytes().to_vec())
815                }
816
817                fn decode_scalar_payload(
818                    bytes: &[u8],
819                    field_name: &'static str,
820                ) -> Result<Self, InternalError> {
821                    let raw: [u8; 8] = bytes.try_into().map_err(|_| {
822                        InternalError::serialize_corruption(format!(
823                            "row decode failed for field '{field_name}': int payload must be exactly 8 bytes",
824                        ))
825                    })?;
826                    <$ty>::try_from(i64::from_le_bytes(raw)).map_err(|_| {
827                        InternalError::serialize_corruption(format!(
828                            "row decode failed for field '{field_name}': integer payload out of range for target type",
829                        ))
830                    })
831                }
832            }
833        )*
834    };
835}
836
837macro_rules! impl_persisted_scalar_unsigned {
838    ($($ty:ty),* $(,)?) => {
839        $(
840            impl PersistedScalar for $ty {
841                const CODEC: ScalarCodec = ScalarCodec::Uint64;
842
843                fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
844                    Ok(u64::from(*self).to_le_bytes().to_vec())
845                }
846
847                fn decode_scalar_payload(
848                    bytes: &[u8],
849                    field_name: &'static str,
850                ) -> Result<Self, InternalError> {
851                    let raw: [u8; 8] = bytes.try_into().map_err(|_| {
852                        InternalError::serialize_corruption(format!(
853                            "row decode failed for field '{field_name}': uint payload must be exactly 8 bytes",
854                        ))
855                    })?;
856                    <$ty>::try_from(u64::from_le_bytes(raw)).map_err(|_| {
857                        InternalError::serialize_corruption(format!(
858                            "row decode failed for field '{field_name}': unsigned payload out of range for target type",
859                        ))
860                    })
861                }
862            }
863        )*
864    };
865}
866
867impl_persisted_scalar_signed!(i8, i16, i32, i64);
868impl_persisted_scalar_unsigned!(u8, u16, u32, u64);
869
870impl PersistedScalar for bool {
871    const CODEC: ScalarCodec = ScalarCodec::Bool;
872
873    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
874        Ok(vec![u8::from(*self)])
875    }
876
877    fn decode_scalar_payload(
878        bytes: &[u8],
879        field_name: &'static str,
880    ) -> Result<Self, InternalError> {
881        let [value] = bytes else {
882            return Err(InternalError::serialize_corruption(format!(
883                "row decode failed for field '{field_name}': bool payload must be exactly 1 byte",
884            )));
885        };
886
887        match value {
888            0 => Ok(false),
889            1 => Ok(true),
890            _ => Err(InternalError::serialize_corruption(format!(
891                "row decode failed for field '{field_name}': invalid bool payload byte {value}",
892            ))),
893        }
894    }
895}
896
897impl PersistedScalar for String {
898    const CODEC: ScalarCodec = ScalarCodec::Text;
899
900    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
901        Ok(self.as_bytes().to_vec())
902    }
903
904    fn decode_scalar_payload(
905        bytes: &[u8],
906        field_name: &'static str,
907    ) -> Result<Self, InternalError> {
908        str::from_utf8(bytes).map(str::to_owned).map_err(|err| {
909            InternalError::serialize_corruption(format!(
910                "row decode failed for field '{field_name}': invalid UTF-8 text payload ({err})",
911            ))
912        })
913    }
914}
915
916impl PersistedScalar for Vec<u8> {
917    const CODEC: ScalarCodec = ScalarCodec::Blob;
918
919    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
920        Ok(self.clone())
921    }
922
923    fn decode_scalar_payload(
924        bytes: &[u8],
925        _field_name: &'static str,
926    ) -> Result<Self, InternalError> {
927        Ok(bytes.to_vec())
928    }
929}
930
931impl PersistedScalar for Blob {
932    const CODEC: ScalarCodec = ScalarCodec::Blob;
933
934    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
935        Ok(self.to_vec())
936    }
937
938    fn decode_scalar_payload(
939        bytes: &[u8],
940        _field_name: &'static str,
941    ) -> Result<Self, InternalError> {
942        Ok(Self::from(bytes))
943    }
944}
945
946impl PersistedScalar for Ulid {
947    const CODEC: ScalarCodec = ScalarCodec::Ulid;
948
949    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
950        Ok(self.to_bytes().to_vec())
951    }
952
953    fn decode_scalar_payload(
954        bytes: &[u8],
955        field_name: &'static str,
956    ) -> Result<Self, InternalError> {
957        Self::try_from_bytes(bytes).map_err(|err| {
958            InternalError::serialize_corruption(format!(
959                "row decode failed for field '{field_name}': {err}",
960            ))
961        })
962    }
963}
964
965impl PersistedScalar for Timestamp {
966    const CODEC: ScalarCodec = ScalarCodec::Timestamp;
967
968    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
969        Ok(self.as_millis().to_le_bytes().to_vec())
970    }
971
972    fn decode_scalar_payload(
973        bytes: &[u8],
974        field_name: &'static str,
975    ) -> Result<Self, InternalError> {
976        let raw: [u8; 8] = bytes.try_into().map_err(|_| {
977            InternalError::serialize_corruption(format!(
978                "row decode failed for field '{field_name}': timestamp payload must be exactly 8 bytes",
979            ))
980        })?;
981
982        Ok(Self::from_millis(i64::from_le_bytes(raw)))
983    }
984}
985
986impl PersistedScalar for Date {
987    const CODEC: ScalarCodec = ScalarCodec::Date;
988
989    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
990        Ok(self.as_days_since_epoch().to_le_bytes().to_vec())
991    }
992
993    fn decode_scalar_payload(
994        bytes: &[u8],
995        field_name: &'static str,
996    ) -> Result<Self, InternalError> {
997        let raw: [u8; 4] = bytes.try_into().map_err(|_| {
998            InternalError::serialize_corruption(format!(
999                "row decode failed for field '{field_name}': date payload must be exactly 4 bytes",
1000            ))
1001        })?;
1002
1003        Ok(Self::from_days_since_epoch(i32::from_le_bytes(raw)))
1004    }
1005}
1006
1007impl PersistedScalar for Duration {
1008    const CODEC: ScalarCodec = ScalarCodec::Duration;
1009
1010    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1011        Ok(self.as_millis().to_le_bytes().to_vec())
1012    }
1013
1014    fn decode_scalar_payload(
1015        bytes: &[u8],
1016        field_name: &'static str,
1017    ) -> Result<Self, InternalError> {
1018        let raw: [u8; 8] = bytes.try_into().map_err(|_| {
1019            InternalError::serialize_corruption(format!(
1020                "row decode failed for field '{field_name}': duration payload must be exactly 8 bytes",
1021            ))
1022        })?;
1023
1024        Ok(Self::from_millis(u64::from_le_bytes(raw)))
1025    }
1026}
1027
1028impl PersistedScalar for Float32 {
1029    const CODEC: ScalarCodec = ScalarCodec::Float32;
1030
1031    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1032        Ok(self.get().to_bits().to_le_bytes().to_vec())
1033    }
1034
1035    fn decode_scalar_payload(
1036        bytes: &[u8],
1037        field_name: &'static str,
1038    ) -> Result<Self, InternalError> {
1039        let raw: [u8; 4] = bytes.try_into().map_err(|_| {
1040            InternalError::serialize_corruption(format!(
1041                "row decode failed for field '{field_name}': float32 payload must be exactly 4 bytes",
1042            ))
1043        })?;
1044        let value = f32::from_bits(u32::from_le_bytes(raw));
1045
1046        Self::try_new(value).ok_or_else(|| {
1047            InternalError::serialize_corruption(format!(
1048                "row decode failed for field '{field_name}': float32 payload is non-finite",
1049            ))
1050        })
1051    }
1052}
1053
1054impl PersistedScalar for Float64 {
1055    const CODEC: ScalarCodec = ScalarCodec::Float64;
1056
1057    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1058        Ok(self.get().to_bits().to_le_bytes().to_vec())
1059    }
1060
1061    fn decode_scalar_payload(
1062        bytes: &[u8],
1063        field_name: &'static str,
1064    ) -> Result<Self, InternalError> {
1065        let raw: [u8; 8] = bytes.try_into().map_err(|_| {
1066            InternalError::serialize_corruption(format!(
1067                "row decode failed for field '{field_name}': float64 payload must be exactly 8 bytes",
1068            ))
1069        })?;
1070        let value = f64::from_bits(u64::from_le_bytes(raw));
1071
1072        Self::try_new(value).ok_or_else(|| {
1073            InternalError::serialize_corruption(format!(
1074                "row decode failed for field '{field_name}': float64 payload is non-finite",
1075            ))
1076        })
1077    }
1078}
1079
1080impl PersistedScalar for Principal {
1081    const CODEC: ScalarCodec = ScalarCodec::Principal;
1082
1083    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1084        self.to_bytes().map_err(|err| {
1085            InternalError::serialize_internal(format!(
1086                "row encode failed for principal field: {err}",
1087            ))
1088        })
1089    }
1090
1091    fn decode_scalar_payload(
1092        bytes: &[u8],
1093        field_name: &'static str,
1094    ) -> Result<Self, InternalError> {
1095        Self::try_from_bytes(bytes).map_err(|err| {
1096            InternalError::serialize_corruption(format!(
1097                "row decode failed for field '{field_name}': {err}",
1098            ))
1099        })
1100    }
1101}
1102
1103impl PersistedScalar for Subaccount {
1104    const CODEC: ScalarCodec = ScalarCodec::Subaccount;
1105
1106    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1107        Ok(self.to_bytes().to_vec())
1108    }
1109
1110    fn decode_scalar_payload(
1111        bytes: &[u8],
1112        field_name: &'static str,
1113    ) -> Result<Self, InternalError> {
1114        let raw: [u8; 32] = bytes.try_into().map_err(|_| {
1115            InternalError::serialize_corruption(format!(
1116                "row decode failed for field '{field_name}': subaccount payload must be exactly 32 bytes",
1117            ))
1118        })?;
1119
1120        Ok(Self::from_array(raw))
1121    }
1122}
1123
1124impl PersistedScalar for () {
1125    const CODEC: ScalarCodec = ScalarCodec::Unit;
1126
1127    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1128        Ok(Vec::new())
1129    }
1130
1131    fn decode_scalar_payload(
1132        bytes: &[u8],
1133        field_name: &'static str,
1134    ) -> Result<Self, InternalError> {
1135        if !bytes.is_empty() {
1136            return Err(InternalError::serialize_corruption(format!(
1137                "row decode failed for field '{field_name}': unit payload must be empty",
1138            )));
1139        }
1140
1141        Ok(())
1142    }
1143}