Skip to main content

icydb_core/db/data/
persisted_row.rs

1//! Module: data::persisted_row
2//! Responsibility: slot-oriented persisted-row seams over runtime row bytes.
3//! Does not own: row envelope versions, typed entity materialization, or query semantics.
4//! Boundary: commit/index planning, row writes, and typed materialization all
5//! consume the canonical slot-oriented persisted-row boundary here.
6
7use crate::{
8    db::data::{
9        DataKey, RawRow, StructuralRowDecodeError, StructuralRowFieldBytes,
10        decode_storage_key_field_bytes, decode_structural_field_by_kind_bytes,
11        decode_structural_value_storage_bytes,
12    },
13    error::InternalError,
14    model::{
15        entity::{EntityModel, resolve_primary_key_slot},
16        field::{FieldModel, LeafCodec, ScalarCodec},
17    },
18    serialize::{deserialize, serialize},
19    traits::EntityKind,
20    types::{Blob, Date, Duration, Float32, Float64, Principal, Subaccount, Timestamp, Ulid},
21    value::{StorageKey, Value},
22};
23use std::str;
24
25const SCALAR_SLOT_PREFIX: u8 = 0xFF;
26const SCALAR_SLOT_TAG_NULL: u8 = 0;
27const SCALAR_SLOT_TAG_VALUE: u8 = 1;
28
29const SCALAR_BOOL_PAYLOAD_LEN: usize = 1;
30const SCALAR_WORD32_PAYLOAD_LEN: usize = 4;
31const SCALAR_WORD64_PAYLOAD_LEN: usize = 8;
32const SCALAR_ULID_PAYLOAD_LEN: usize = 16;
33const SCALAR_SUBACCOUNT_PAYLOAD_LEN: usize = 32;
34
35const SCALAR_BOOL_FALSE_TAG: u8 = 0;
36const SCALAR_BOOL_TRUE_TAG: u8 = 1;
37
38///
39/// SlotReader
40///
41/// SlotReader exposes one persisted row as stable slot-addressable fields.
42/// Callers may inspect field presence, borrow raw field bytes, or decode one
43/// field value on demand.
44///
45
46pub trait SlotReader {
47    /// Return the structural model that owns this slot mapping.
48    fn model(&self) -> &'static EntityModel;
49
50    /// Return whether the given slot is present in the persisted row.
51    fn has(&self, slot: usize) -> bool;
52
53    /// Borrow the raw persisted payload for one slot when present.
54    fn get_bytes(&self, slot: usize) -> Option<&[u8]>;
55
56    /// Decode one slot as a scalar leaf when the field model declares a scalar codec.
57    fn get_scalar(&self, slot: usize) -> Result<Option<ScalarSlotValueRef<'_>>, InternalError>;
58
59    /// Decode one slot value on demand using the field contract declared by the model.
60    fn get_value(&mut self, slot: usize) -> Result<Option<Value>, InternalError>;
61}
62
63///
64/// SlotWriter
65///
66/// SlotWriter is the canonical row-container output seam used by persisted-row
67/// writers.
68///
69
70pub trait SlotWriter {
71    /// Record one slot payload for the current row.
72    fn write_slot(&mut self, slot: usize, payload: Option<&[u8]>) -> Result<(), InternalError>;
73
74    /// Record one scalar slot payload using the canonical scalar leaf envelope.
75    fn write_scalar(
76        &mut self,
77        slot: usize,
78        value: ScalarSlotValueRef<'_>,
79    ) -> Result<(), InternalError> {
80        let payload = encode_scalar_slot_value(value);
81
82        self.write_slot(slot, Some(payload.as_slice()))
83    }
84}
85
86///
87/// PersistedRow
88///
89/// PersistedRow is the derive-owned bridge between typed entities and
90/// slot-addressable persisted rows.
91/// It owns entity-specific materialization/default semantics while runtime
92/// paths stay structural at the row boundary.
93///
94
95pub trait PersistedRow: EntityKind + Sized {
96    /// Materialize one typed entity from one slot reader.
97    fn materialize_from_slots(slots: &mut dyn SlotReader) -> Result<Self, InternalError>;
98
99    /// Write one typed entity into one slot writer.
100    fn write_slots(&self, out: &mut dyn SlotWriter) -> Result<(), InternalError>;
101
102    /// Decode one slot value needed by structural planner/projection consumers.
103    fn project_slot(slots: &mut dyn SlotReader, slot: usize) -> Result<Option<Value>, InternalError>
104    where
105        Self: crate::traits::FieldProjection,
106    {
107        let entity = Self::materialize_from_slots(slots)?;
108
109        Ok(<Self as crate::traits::FieldProjection>::get_value_by_index(&entity, slot))
110    }
111}
112
113/// Decode one slot value through the declared field contract without routing
114/// through `SlotReader::get_value`.
115pub(in crate::db) fn decode_slot_value_by_contract(
116    slots: &dyn SlotReader,
117    slot: usize,
118) -> Result<Option<Value>, InternalError> {
119    let field = slots.model().fields().get(slot).ok_or_else(|| {
120        InternalError::persisted_row_slot_lookup_out_of_bounds(slots.model().path(), slot)
121    })?;
122
123    if matches!(field.leaf_codec(), LeafCodec::Scalar(_))
124        && let Some(value) = slots.get_scalar(slot)?
125    {
126        return Ok(Some(match value {
127            ScalarSlotValueRef::Null => Value::Null,
128            ScalarSlotValueRef::Value(value) => value.into_value(),
129        }));
130    }
131
132    match slots.get_bytes(slot) {
133        Some(raw_value) => decode_non_scalar_slot_value(raw_value, field).map(Some),
134        None => Ok(None),
135    }
136}
137
138// Decode one non-scalar slot through the exact persisted contract declared by
139// the field model.
140fn decode_non_scalar_slot_value(
141    raw_value: &[u8],
142    field: &FieldModel,
143) -> Result<Value, InternalError> {
144    let decoded = match field.storage_decode() {
145        crate::model::field::FieldStorageDecode::ByKind => {
146            decode_structural_field_by_kind_bytes(raw_value, field.kind())
147        }
148        crate::model::field::FieldStorageDecode::Value => {
149            decode_structural_value_storage_bytes(raw_value)
150        }
151    };
152
153    decoded.map_err(|err| {
154        InternalError::persisted_row_field_kind_decode_failed(field.name(), field.kind(), err)
155    })
156}
157
158///
159/// ScalarValueRef
160///
161/// ScalarValueRef is the borrowed-or-copy scalar payload view returned by the
162/// slot-reader fast path.
163/// It preserves cheap references for text/blob payloads while keeping fixed
164/// width scalar wrappers as copy values.
165///
166
167#[derive(Clone, Copy, Debug)]
168pub enum ScalarValueRef<'a> {
169    Blob(&'a [u8]),
170    Bool(bool),
171    Date(Date),
172    Duration(Duration),
173    Float32(Float32),
174    Float64(Float64),
175    Int(i64),
176    Principal(Principal),
177    Subaccount(Subaccount),
178    Text(&'a str),
179    Timestamp(Timestamp),
180    Uint(u64),
181    Ulid(Ulid),
182    Unit,
183}
184
185impl ScalarValueRef<'_> {
186    /// Materialize this scalar view into the runtime `Value` enum.
187    #[must_use]
188    pub fn into_value(self) -> Value {
189        match self {
190            Self::Blob(value) => Value::Blob(value.to_vec()),
191            Self::Bool(value) => Value::Bool(value),
192            Self::Date(value) => Value::Date(value),
193            Self::Duration(value) => Value::Duration(value),
194            Self::Float32(value) => Value::Float32(value),
195            Self::Float64(value) => Value::Float64(value),
196            Self::Int(value) => Value::Int(value),
197            Self::Principal(value) => Value::Principal(value),
198            Self::Subaccount(value) => Value::Subaccount(value),
199            Self::Text(value) => Value::Text(value.to_owned()),
200            Self::Timestamp(value) => Value::Timestamp(value),
201            Self::Uint(value) => Value::Uint(value),
202            Self::Ulid(value) => Value::Ulid(value),
203            Self::Unit => Value::Unit,
204        }
205    }
206}
207
208///
209/// ScalarSlotValueRef
210///
211/// ScalarSlotValueRef preserves the distinction between a missing slot and an
212/// explicitly persisted `NULL` scalar payload.
213/// The outer `Option` from `SlotReader::get_scalar` therefore still means
214/// "slot absent".
215///
216
217#[derive(Clone, Copy, Debug)]
218pub enum ScalarSlotValueRef<'a> {
219    Null,
220    Value(ScalarValueRef<'a>),
221}
222
223///
224/// PersistedScalar
225///
226/// PersistedScalar defines the canonical binary payload codec for one scalar
227/// leaf type.
228/// Derive-generated persisted-row materializers and writers use this trait to
229/// avoid routing scalar fields back through CBOR.
230///
231
232pub trait PersistedScalar: Sized {
233    /// Canonical scalar codec identifier used by schema/runtime metadata.
234    const CODEC: ScalarCodec;
235
236    /// Encode this scalar value into its codec-specific payload bytes.
237    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError>;
238
239    /// Decode this scalar value from its codec-specific payload bytes.
240    fn decode_scalar_payload(bytes: &[u8], field_name: &'static str)
241    -> Result<Self, InternalError>;
242}
243
244/// Encode one persisted slot payload using the shared leaf codec boundary.
245pub fn encode_persisted_slot_payload<T>(
246    value: &T,
247    field_name: &'static str,
248) -> Result<Vec<u8>, InternalError>
249where
250    T: serde::Serialize,
251{
252    serialize(value)
253        .map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err))
254}
255
256/// Encode one persisted scalar slot payload using the canonical scalar envelope.
257pub fn encode_persisted_scalar_slot_payload<T>(
258    value: &T,
259    field_name: &'static str,
260) -> Result<Vec<u8>, InternalError>
261where
262    T: PersistedScalar,
263{
264    let payload = value.encode_scalar_payload()?;
265    let mut encoded = Vec::with_capacity(payload.len() + 2);
266    encoded.push(SCALAR_SLOT_PREFIX);
267    encoded.push(SCALAR_SLOT_TAG_VALUE);
268    encoded.extend_from_slice(&payload);
269
270    if encoded.len() < 2 {
271        return Err(InternalError::persisted_row_field_encode_failed(
272            field_name,
273            "scalar payload envelope underflow",
274        ));
275    }
276
277    Ok(encoded)
278}
279
280/// Encode one optional persisted scalar slot payload preserving explicit `NULL`.
281pub fn encode_persisted_option_scalar_slot_payload<T>(
282    value: &Option<T>,
283    field_name: &'static str,
284) -> Result<Vec<u8>, InternalError>
285where
286    T: PersistedScalar,
287{
288    match value {
289        Some(value) => encode_persisted_scalar_slot_payload(value, field_name),
290        None => Ok(vec![SCALAR_SLOT_PREFIX, SCALAR_SLOT_TAG_NULL]),
291    }
292}
293
294/// Decode one persisted slot payload using the shared leaf codec boundary.
295pub fn decode_persisted_slot_payload<T>(
296    bytes: &[u8],
297    field_name: &'static str,
298) -> Result<T, InternalError>
299where
300    T: serde::de::DeserializeOwned,
301{
302    deserialize(bytes)
303        .map_err(|err| InternalError::persisted_row_field_decode_failed(field_name, err))
304}
305
306/// Decode one persisted scalar slot payload using the canonical scalar envelope.
307pub fn decode_persisted_scalar_slot_payload<T>(
308    bytes: &[u8],
309    field_name: &'static str,
310) -> Result<T, InternalError>
311where
312    T: PersistedScalar,
313{
314    let payload = decode_scalar_slot_payload_body(bytes, field_name)?.ok_or_else(|| {
315        InternalError::persisted_row_field_decode_failed(
316            field_name,
317            "unexpected null for non-nullable scalar field",
318        )
319    })?;
320
321    T::decode_scalar_payload(payload, field_name)
322}
323
324/// Decode one optional persisted scalar slot payload preserving explicit `NULL`.
325pub fn decode_persisted_option_scalar_slot_payload<T>(
326    bytes: &[u8],
327    field_name: &'static str,
328) -> Result<Option<T>, InternalError>
329where
330    T: PersistedScalar,
331{
332    let Some(payload) = decode_scalar_slot_payload_body(bytes, field_name)? else {
333        return Ok(None);
334    };
335
336    T::decode_scalar_payload(payload, field_name).map(Some)
337}
338
339///
340/// SlotBufferWriter
341///
342/// SlotBufferWriter captures one row worth of slot payloads before they are
343/// encoded into the canonical slot container.
344///
345
346pub(in crate::db) struct SlotBufferWriter {
347    slots: Vec<Option<Vec<u8>>>,
348}
349
350impl SlotBufferWriter {
351    /// Build one empty slot buffer for one entity model.
352    pub(in crate::db) fn for_model(model: &'static EntityModel) -> Self {
353        Self {
354            slots: vec![None; model.fields().len()],
355        }
356    }
357
358    /// Encode the buffered slots into the canonical row payload.
359    pub(in crate::db) fn finish(self) -> Result<Vec<u8>, InternalError> {
360        let field_count = u16::try_from(self.slots.len()).map_err(|_| {
361            InternalError::persisted_row_encode_failed(format!(
362                "field count {} exceeds u16 slot table capacity",
363                self.slots.len(),
364            ))
365        })?;
366        let mut payload_bytes = Vec::new();
367        let mut slot_table = Vec::with_capacity(self.slots.len());
368
369        // Phase 1: assign each present slot one payload span inside the data section.
370        for slot_payload in self.slots {
371            match slot_payload {
372                Some(bytes) => {
373                    let start = u32::try_from(payload_bytes.len()).map_err(|_| {
374                        InternalError::persisted_row_encode_failed(
375                            "slot payload start exceeds u32 range",
376                        )
377                    })?;
378                    let len = u32::try_from(bytes.len()).map_err(|_| {
379                        InternalError::persisted_row_encode_failed(
380                            "slot payload length exceeds u32 range",
381                        )
382                    })?;
383                    payload_bytes.extend_from_slice(&bytes);
384                    slot_table.push((start, len));
385                }
386                None => slot_table.push((0, 0)),
387            }
388        }
389
390        // Phase 2: write the fixed-width slot header followed by concatenated payloads.
391        let mut encoded = Vec::with_capacity(
392            usize::from(field_count) * (u32::BITS as usize / 4) + 2 + payload_bytes.len(),
393        );
394        encoded.extend_from_slice(&field_count.to_be_bytes());
395        for (start, len) in slot_table {
396            encoded.extend_from_slice(&start.to_be_bytes());
397            encoded.extend_from_slice(&len.to_be_bytes());
398        }
399        encoded.extend_from_slice(&payload_bytes);
400
401        Ok(encoded)
402    }
403}
404
405impl SlotWriter for SlotBufferWriter {
406    fn write_slot(&mut self, slot: usize, payload: Option<&[u8]>) -> Result<(), InternalError> {
407        let entry = self.slots.get_mut(slot).ok_or_else(|| {
408            InternalError::persisted_row_encode_failed(format!(
409                "slot {slot} is outside the row layout",
410            ))
411        })?;
412        *entry = payload.map(<[u8]>::to_vec);
413
414        Ok(())
415    }
416}
417
418/// Encode one entity into the canonical slot-container payload.
419pub(in crate::db) fn encode_persisted_row<E>(entity: &E) -> Result<Vec<u8>, InternalError>
420where
421    E: PersistedRow,
422{
423    let mut writer = SlotBufferWriter::for_model(E::MODEL);
424    entity.write_slots(&mut writer)?;
425    writer.finish()
426}
427
428///
429/// StructuralSlotReader
430///
431/// StructuralSlotReader adapts the current persisted-row bytes into the
432/// canonical slot-reader seam.
433/// It caches decoded field values lazily so repeated index/predicate reads do
434/// not re-run the same field decoder within one row planning pass.
435///
436
437pub(in crate::db) struct StructuralSlotReader<'a> {
438    model: &'static EntityModel,
439    field_bytes: StructuralRowFieldBytes<'a>,
440    cached_values: Vec<CachedSlotValue>,
441}
442
443impl<'a> StructuralSlotReader<'a> {
444    /// Build one slot reader over one persisted row using the current structural row scanner.
445    pub(in crate::db) fn from_raw_row(
446        raw_row: &'a RawRow,
447        model: &'static EntityModel,
448    ) -> Result<Self, InternalError> {
449        let field_bytes = StructuralRowFieldBytes::from_raw_row(raw_row, model)
450            .map_err(StructuralRowDecodeError::into_internal_error)?;
451        let cached_values = std::iter::repeat_with(|| CachedSlotValue::Pending)
452            .take(model.fields().len())
453            .collect();
454
455        Ok(Self {
456            model,
457            field_bytes,
458            cached_values,
459        })
460    }
461
462    /// Validate the decoded primary-key slot against the authoritative row key.
463    pub(in crate::db) fn validate_storage_key(
464        &self,
465        data_key: &DataKey,
466    ) -> Result<(), InternalError> {
467        let Some(primary_key_slot) = resolve_primary_key_slot(self.model) else {
468            return Err(InternalError::persisted_row_primary_key_field_missing(
469                self.model.path(),
470            ));
471        };
472        let field = self.field_model(primary_key_slot)?;
473        let decoded_key = match self.get_scalar(primary_key_slot)? {
474            Some(ScalarSlotValueRef::Null) => None,
475            Some(ScalarSlotValueRef::Value(value)) => storage_key_from_scalar_ref(value),
476            None => match self.field_bytes.field(primary_key_slot) {
477                Some(raw_value) => Some(
478                    decode_storage_key_field_bytes(raw_value, field.kind).map_err(|err| {
479                        InternalError::persisted_row_primary_key_not_storage_encodable(
480                            data_key, err,
481                        )
482                    })?,
483                ),
484                None => None,
485            },
486        };
487        let Some(decoded_key) = decoded_key else {
488            return Err(InternalError::persisted_row_primary_key_slot_missing(
489                data_key,
490            ));
491        };
492        let expected_key = data_key.storage_key();
493
494        if decoded_key != expected_key {
495            return Err(InternalError::persisted_row_key_mismatch(
496                expected_key,
497                decoded_key,
498            ));
499        }
500
501        Ok(())
502    }
503
504    // Resolve one field model entry by stable slot index.
505    fn field_model(&self, slot: usize) -> Result<&FieldModel, InternalError> {
506        self.model.fields().get(slot).ok_or_else(|| {
507            InternalError::persisted_row_slot_lookup_out_of_bounds(self.model.path(), slot)
508        })
509    }
510}
511
512// Convert one scalar slot fast-path value into its storage-key form when the
513// field kind is storage-key-compatible.
514const fn storage_key_from_scalar_ref(value: ScalarValueRef<'_>) -> Option<StorageKey> {
515    match value {
516        ScalarValueRef::Int(value) => Some(StorageKey::Int(value)),
517        ScalarValueRef::Principal(value) => Some(StorageKey::Principal(value)),
518        ScalarValueRef::Subaccount(value) => Some(StorageKey::Subaccount(value)),
519        ScalarValueRef::Timestamp(value) => Some(StorageKey::Timestamp(value)),
520        ScalarValueRef::Uint(value) => Some(StorageKey::Uint(value)),
521        ScalarValueRef::Ulid(value) => Some(StorageKey::Ulid(value)),
522        ScalarValueRef::Unit => Some(StorageKey::Unit),
523        _ => None,
524    }
525}
526
527impl SlotReader for StructuralSlotReader<'_> {
528    fn model(&self) -> &'static EntityModel {
529        self.model
530    }
531
532    fn has(&self, slot: usize) -> bool {
533        self.field_bytes.field(slot).is_some()
534    }
535
536    fn get_bytes(&self, slot: usize) -> Option<&[u8]> {
537        self.field_bytes.field(slot)
538    }
539
540    fn get_scalar(&self, slot: usize) -> Result<Option<ScalarSlotValueRef<'_>>, InternalError> {
541        let field = self.field_model(slot)?;
542        let Some(raw_value) = self.field_bytes.field(slot) else {
543            return Ok(None);
544        };
545
546        match field.leaf_codec() {
547            LeafCodec::Scalar(codec) => {
548                decode_scalar_slot_value(raw_value, codec, field.name()).map(Some)
549            }
550            LeafCodec::CborFallback => Ok(None),
551        }
552    }
553
554    fn get_value(&mut self, slot: usize) -> Result<Option<Value>, InternalError> {
555        let cached = self.cached_values.get(slot).ok_or_else(|| {
556            InternalError::persisted_row_slot_cache_lookup_out_of_bounds(self.model.path(), slot)
557        })?;
558        if let CachedSlotValue::Decoded(value) = cached {
559            return Ok(value.clone());
560        }
561
562        let field = self.field_model(slot)?;
563        let value = match self.get_scalar(slot)? {
564            Some(ScalarSlotValueRef::Null) => Some(Value::Null),
565            Some(ScalarSlotValueRef::Value(value)) => Some(value.into_value()),
566            None => match self.field_bytes.field(slot) {
567                Some(raw_value) => Some(decode_non_scalar_slot_value(raw_value, field)?),
568                None => None,
569            },
570        };
571        self.cached_values[slot] = CachedSlotValue::Decoded(value.clone());
572
573        Ok(value)
574    }
575}
576
577///
578/// CachedSlotValue
579///
580/// CachedSlotValue tracks whether one slot has already been decoded during the
581/// current structural row access pass.
582///
583
584enum CachedSlotValue {
585    Pending,
586    Decoded(Option<Value>),
587}
588
589// Encode one scalar slot value into the canonical prefixed scalar envelope.
590fn encode_scalar_slot_value(value: ScalarSlotValueRef<'_>) -> Vec<u8> {
591    match value {
592        ScalarSlotValueRef::Null => vec![SCALAR_SLOT_PREFIX, SCALAR_SLOT_TAG_NULL],
593        ScalarSlotValueRef::Value(value) => {
594            let mut encoded = Vec::new();
595            encoded.push(SCALAR_SLOT_PREFIX);
596            encoded.push(SCALAR_SLOT_TAG_VALUE);
597
598            match value {
599                ScalarValueRef::Blob(bytes) => encoded.extend_from_slice(bytes),
600                ScalarValueRef::Bool(value) => encoded.push(u8::from(value)),
601                ScalarValueRef::Date(value) => {
602                    encoded.extend_from_slice(&value.as_days_since_epoch().to_le_bytes());
603                }
604                ScalarValueRef::Duration(value) => {
605                    encoded.extend_from_slice(&value.as_millis().to_le_bytes());
606                }
607                ScalarValueRef::Float32(value) => {
608                    encoded.extend_from_slice(&value.get().to_bits().to_le_bytes());
609                }
610                ScalarValueRef::Float64(value) => {
611                    encoded.extend_from_slice(&value.get().to_bits().to_le_bytes());
612                }
613                ScalarValueRef::Int(value) => encoded.extend_from_slice(&value.to_le_bytes()),
614                ScalarValueRef::Principal(value) => encoded.extend_from_slice(value.as_slice()),
615                ScalarValueRef::Subaccount(value) => encoded.extend_from_slice(&value.to_bytes()),
616                ScalarValueRef::Text(value) => encoded.extend_from_slice(value.as_bytes()),
617                ScalarValueRef::Timestamp(value) => {
618                    encoded.extend_from_slice(&value.as_millis().to_le_bytes());
619                }
620                ScalarValueRef::Uint(value) => encoded.extend_from_slice(&value.to_le_bytes()),
621                ScalarValueRef::Ulid(value) => encoded.extend_from_slice(&value.to_bytes()),
622                ScalarValueRef::Unit => {}
623            }
624
625            encoded
626        }
627    }
628}
629
630// Split one scalar slot envelope into `NULL` vs payload bytes.
631fn decode_scalar_slot_payload_body<'a>(
632    bytes: &'a [u8],
633    field_name: &'static str,
634) -> Result<Option<&'a [u8]>, InternalError> {
635    let Some((&prefix, rest)) = bytes.split_first() else {
636        return Err(InternalError::persisted_row_field_decode_failed(
637            field_name,
638            "empty scalar payload",
639        ));
640    };
641    if prefix != SCALAR_SLOT_PREFIX {
642        return Err(InternalError::persisted_row_field_decode_failed(
643            field_name,
644            "scalar payload prefix mismatch",
645        ));
646    }
647    let Some((&tag, payload)) = rest.split_first() else {
648        return Err(InternalError::persisted_row_field_decode_failed(
649            field_name,
650            "truncated scalar payload tag",
651        ));
652    };
653
654    match tag {
655        SCALAR_SLOT_TAG_NULL => {
656            if !payload.is_empty() {
657                return Err(InternalError::persisted_row_field_decode_failed(
658                    field_name,
659                    "null scalar payload has trailing bytes",
660                ));
661            }
662
663            Ok(None)
664        }
665        SCALAR_SLOT_TAG_VALUE => Ok(Some(payload)),
666        _ => Err(InternalError::persisted_row_field_decode_failed(
667            field_name,
668            format!("invalid scalar payload tag {tag}"),
669        )),
670    }
671}
672
673// Decode one scalar slot view using the field-declared scalar codec.
674#[expect(clippy::too_many_lines)]
675fn decode_scalar_slot_value<'a>(
676    bytes: &'a [u8],
677    codec: ScalarCodec,
678    field_name: &'static str,
679) -> Result<ScalarSlotValueRef<'a>, InternalError> {
680    let Some(payload) = decode_scalar_slot_payload_body(bytes, field_name)? else {
681        return Ok(ScalarSlotValueRef::Null);
682    };
683
684    let value = match codec {
685        ScalarCodec::Blob => ScalarValueRef::Blob(payload),
686        ScalarCodec::Bool => {
687            let [value] = payload else {
688                return Err(
689                    InternalError::persisted_row_field_payload_exact_len_required(
690                        field_name,
691                        "bool",
692                        SCALAR_BOOL_PAYLOAD_LEN,
693                    ),
694                );
695            };
696            match *value {
697                SCALAR_BOOL_FALSE_TAG => ScalarValueRef::Bool(false),
698                SCALAR_BOOL_TRUE_TAG => ScalarValueRef::Bool(true),
699                _ => {
700                    return Err(InternalError::persisted_row_field_payload_invalid_byte(
701                        field_name, "bool", *value,
702                    ));
703                }
704            }
705        }
706        ScalarCodec::Date => {
707            let bytes: [u8; SCALAR_WORD32_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
708                InternalError::persisted_row_field_payload_exact_len_required(
709                    field_name,
710                    "date",
711                    SCALAR_WORD32_PAYLOAD_LEN,
712                )
713            })?;
714            ScalarValueRef::Date(Date::from_days_since_epoch(i32::from_le_bytes(bytes)))
715        }
716        ScalarCodec::Duration => {
717            let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
718                InternalError::persisted_row_field_payload_exact_len_required(
719                    field_name,
720                    "duration",
721                    SCALAR_WORD64_PAYLOAD_LEN,
722                )
723            })?;
724            ScalarValueRef::Duration(Duration::from_millis(u64::from_le_bytes(bytes)))
725        }
726        ScalarCodec::Float32 => {
727            let bytes: [u8; SCALAR_WORD32_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
728                InternalError::persisted_row_field_payload_exact_len_required(
729                    field_name,
730                    "float32",
731                    SCALAR_WORD32_PAYLOAD_LEN,
732                )
733            })?;
734            let value = f32::from_bits(u32::from_le_bytes(bytes));
735            let value = Float32::try_new(value).ok_or_else(|| {
736                InternalError::persisted_row_field_payload_non_finite(field_name, "float32")
737            })?;
738            ScalarValueRef::Float32(value)
739        }
740        ScalarCodec::Float64 => {
741            let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
742                InternalError::persisted_row_field_payload_exact_len_required(
743                    field_name,
744                    "float64",
745                    SCALAR_WORD64_PAYLOAD_LEN,
746                )
747            })?;
748            let value = f64::from_bits(u64::from_le_bytes(bytes));
749            let value = Float64::try_new(value).ok_or_else(|| {
750                InternalError::persisted_row_field_payload_non_finite(field_name, "float64")
751            })?;
752            ScalarValueRef::Float64(value)
753        }
754        ScalarCodec::Int64 => {
755            let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
756                InternalError::persisted_row_field_payload_exact_len_required(
757                    field_name,
758                    "int",
759                    SCALAR_WORD64_PAYLOAD_LEN,
760                )
761            })?;
762            ScalarValueRef::Int(i64::from_le_bytes(bytes))
763        }
764        ScalarCodec::Principal => ScalarValueRef::Principal(
765            Principal::try_from_bytes(payload)
766                .map_err(|err| InternalError::persisted_row_field_decode_failed(field_name, err))?,
767        ),
768        ScalarCodec::Subaccount => {
769            let bytes: [u8; SCALAR_SUBACCOUNT_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
770                InternalError::persisted_row_field_payload_exact_len_required(
771                    field_name,
772                    "subaccount",
773                    SCALAR_SUBACCOUNT_PAYLOAD_LEN,
774                )
775            })?;
776            ScalarValueRef::Subaccount(Subaccount::from_array(bytes))
777        }
778        ScalarCodec::Text => {
779            let value = str::from_utf8(payload).map_err(|err| {
780                InternalError::persisted_row_field_text_payload_invalid_utf8(field_name, err)
781            })?;
782            ScalarValueRef::Text(value)
783        }
784        ScalarCodec::Timestamp => {
785            let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
786                InternalError::persisted_row_field_payload_exact_len_required(
787                    field_name,
788                    "timestamp",
789                    SCALAR_WORD64_PAYLOAD_LEN,
790                )
791            })?;
792            ScalarValueRef::Timestamp(Timestamp::from_millis(i64::from_le_bytes(bytes)))
793        }
794        ScalarCodec::Uint64 => {
795            let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
796                InternalError::persisted_row_field_payload_exact_len_required(
797                    field_name,
798                    "uint",
799                    SCALAR_WORD64_PAYLOAD_LEN,
800                )
801            })?;
802            ScalarValueRef::Uint(u64::from_le_bytes(bytes))
803        }
804        ScalarCodec::Ulid => {
805            let bytes: [u8; SCALAR_ULID_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
806                InternalError::persisted_row_field_payload_exact_len_required(
807                    field_name,
808                    "ulid",
809                    SCALAR_ULID_PAYLOAD_LEN,
810                )
811            })?;
812            ScalarValueRef::Ulid(Ulid::from_bytes(bytes))
813        }
814        ScalarCodec::Unit => {
815            if !payload.is_empty() {
816                return Err(InternalError::persisted_row_field_payload_must_be_empty(
817                    field_name, "unit",
818                ));
819            }
820            ScalarValueRef::Unit
821        }
822    };
823
824    Ok(ScalarSlotValueRef::Value(value))
825}
826
827macro_rules! impl_persisted_scalar_signed {
828    ($($ty:ty),* $(,)?) => {
829        $(
830            impl PersistedScalar for $ty {
831                const CODEC: ScalarCodec = ScalarCodec::Int64;
832
833                fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
834                    Ok(i64::from(*self).to_le_bytes().to_vec())
835                }
836
837                fn decode_scalar_payload(
838                    bytes: &[u8],
839                    field_name: &'static str,
840                ) -> Result<Self, InternalError> {
841                    let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
842                        InternalError::persisted_row_field_payload_exact_len_required(
843                            field_name,
844                            "int",
845                            SCALAR_WORD64_PAYLOAD_LEN,
846                        )
847                    })?;
848                    <$ty>::try_from(i64::from_le_bytes(raw)).map_err(|_| {
849                        InternalError::persisted_row_field_payload_out_of_range(
850                            field_name,
851                            "integer",
852                        )
853                    })
854                }
855            }
856        )*
857    };
858}
859
860macro_rules! impl_persisted_scalar_unsigned {
861    ($($ty:ty),* $(,)?) => {
862        $(
863            impl PersistedScalar for $ty {
864                const CODEC: ScalarCodec = ScalarCodec::Uint64;
865
866                fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
867                    Ok(u64::from(*self).to_le_bytes().to_vec())
868                }
869
870                fn decode_scalar_payload(
871                    bytes: &[u8],
872                    field_name: &'static str,
873                ) -> Result<Self, InternalError> {
874                    let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
875                        InternalError::persisted_row_field_payload_exact_len_required(
876                            field_name,
877                            "uint",
878                            SCALAR_WORD64_PAYLOAD_LEN,
879                        )
880                    })?;
881                    <$ty>::try_from(u64::from_le_bytes(raw)).map_err(|_| {
882                        InternalError::persisted_row_field_payload_out_of_range(
883                            field_name,
884                            "unsigned",
885                        )
886                    })
887                }
888            }
889        )*
890    };
891}
892
893impl_persisted_scalar_signed!(i8, i16, i32, i64);
894impl_persisted_scalar_unsigned!(u8, u16, u32, u64);
895
896impl PersistedScalar for bool {
897    const CODEC: ScalarCodec = ScalarCodec::Bool;
898
899    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
900        Ok(vec![u8::from(*self)])
901    }
902
903    fn decode_scalar_payload(
904        bytes: &[u8],
905        field_name: &'static str,
906    ) -> Result<Self, InternalError> {
907        let [value] = bytes else {
908            return Err(
909                InternalError::persisted_row_field_payload_exact_len_required(
910                    field_name,
911                    "bool",
912                    SCALAR_BOOL_PAYLOAD_LEN,
913                ),
914            );
915        };
916
917        match *value {
918            SCALAR_BOOL_FALSE_TAG => Ok(false),
919            SCALAR_BOOL_TRUE_TAG => Ok(true),
920            _ => Err(InternalError::persisted_row_field_payload_invalid_byte(
921                field_name, "bool", *value,
922            )),
923        }
924    }
925}
926
927impl PersistedScalar for String {
928    const CODEC: ScalarCodec = ScalarCodec::Text;
929
930    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
931        Ok(self.as_bytes().to_vec())
932    }
933
934    fn decode_scalar_payload(
935        bytes: &[u8],
936        field_name: &'static str,
937    ) -> Result<Self, InternalError> {
938        str::from_utf8(bytes).map(str::to_owned).map_err(|err| {
939            InternalError::persisted_row_field_text_payload_invalid_utf8(field_name, err)
940        })
941    }
942}
943
944impl PersistedScalar for Vec<u8> {
945    const CODEC: ScalarCodec = ScalarCodec::Blob;
946
947    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
948        Ok(self.clone())
949    }
950
951    fn decode_scalar_payload(
952        bytes: &[u8],
953        _field_name: &'static str,
954    ) -> Result<Self, InternalError> {
955        Ok(bytes.to_vec())
956    }
957}
958
959impl PersistedScalar for Blob {
960    const CODEC: ScalarCodec = ScalarCodec::Blob;
961
962    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
963        Ok(self.to_vec())
964    }
965
966    fn decode_scalar_payload(
967        bytes: &[u8],
968        _field_name: &'static str,
969    ) -> Result<Self, InternalError> {
970        Ok(Self::from(bytes))
971    }
972}
973
974impl PersistedScalar for Ulid {
975    const CODEC: ScalarCodec = ScalarCodec::Ulid;
976
977    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
978        Ok(self.to_bytes().to_vec())
979    }
980
981    fn decode_scalar_payload(
982        bytes: &[u8],
983        field_name: &'static str,
984    ) -> Result<Self, InternalError> {
985        Self::try_from_bytes(bytes)
986            .map_err(|err| InternalError::persisted_row_field_decode_failed(field_name, err))
987    }
988}
989
990impl PersistedScalar for Timestamp {
991    const CODEC: ScalarCodec = ScalarCodec::Timestamp;
992
993    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
994        Ok(self.as_millis().to_le_bytes().to_vec())
995    }
996
997    fn decode_scalar_payload(
998        bytes: &[u8],
999        field_name: &'static str,
1000    ) -> Result<Self, InternalError> {
1001        let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
1002            InternalError::persisted_row_field_payload_exact_len_required(
1003                field_name,
1004                "timestamp",
1005                SCALAR_WORD64_PAYLOAD_LEN,
1006            )
1007        })?;
1008
1009        Ok(Self::from_millis(i64::from_le_bytes(raw)))
1010    }
1011}
1012
1013impl PersistedScalar for Date {
1014    const CODEC: ScalarCodec = ScalarCodec::Date;
1015
1016    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1017        Ok(self.as_days_since_epoch().to_le_bytes().to_vec())
1018    }
1019
1020    fn decode_scalar_payload(
1021        bytes: &[u8],
1022        field_name: &'static str,
1023    ) -> Result<Self, InternalError> {
1024        let raw: [u8; SCALAR_WORD32_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
1025            InternalError::persisted_row_field_payload_exact_len_required(
1026                field_name,
1027                "date",
1028                SCALAR_WORD32_PAYLOAD_LEN,
1029            )
1030        })?;
1031
1032        Ok(Self::from_days_since_epoch(i32::from_le_bytes(raw)))
1033    }
1034}
1035
1036impl PersistedScalar for Duration {
1037    const CODEC: ScalarCodec = ScalarCodec::Duration;
1038
1039    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1040        Ok(self.as_millis().to_le_bytes().to_vec())
1041    }
1042
1043    fn decode_scalar_payload(
1044        bytes: &[u8],
1045        field_name: &'static str,
1046    ) -> Result<Self, InternalError> {
1047        let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
1048            InternalError::persisted_row_field_payload_exact_len_required(
1049                field_name,
1050                "duration",
1051                SCALAR_WORD64_PAYLOAD_LEN,
1052            )
1053        })?;
1054
1055        Ok(Self::from_millis(u64::from_le_bytes(raw)))
1056    }
1057}
1058
1059impl PersistedScalar for Float32 {
1060    const CODEC: ScalarCodec = ScalarCodec::Float32;
1061
1062    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1063        Ok(self.get().to_bits().to_le_bytes().to_vec())
1064    }
1065
1066    fn decode_scalar_payload(
1067        bytes: &[u8],
1068        field_name: &'static str,
1069    ) -> Result<Self, InternalError> {
1070        let raw: [u8; SCALAR_WORD32_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
1071            InternalError::persisted_row_field_payload_exact_len_required(
1072                field_name,
1073                "float32",
1074                SCALAR_WORD32_PAYLOAD_LEN,
1075            )
1076        })?;
1077        let value = f32::from_bits(u32::from_le_bytes(raw));
1078
1079        Self::try_new(value).ok_or_else(|| {
1080            InternalError::persisted_row_field_payload_non_finite(field_name, "float32")
1081        })
1082    }
1083}
1084
1085impl PersistedScalar for Float64 {
1086    const CODEC: ScalarCodec = ScalarCodec::Float64;
1087
1088    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1089        Ok(self.get().to_bits().to_le_bytes().to_vec())
1090    }
1091
1092    fn decode_scalar_payload(
1093        bytes: &[u8],
1094        field_name: &'static str,
1095    ) -> Result<Self, InternalError> {
1096        let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
1097            InternalError::persisted_row_field_payload_exact_len_required(
1098                field_name,
1099                "float64",
1100                SCALAR_WORD64_PAYLOAD_LEN,
1101            )
1102        })?;
1103        let value = f64::from_bits(u64::from_le_bytes(raw));
1104
1105        Self::try_new(value).ok_or_else(|| {
1106            InternalError::persisted_row_field_payload_non_finite(field_name, "float64")
1107        })
1108    }
1109}
1110
1111impl PersistedScalar for Principal {
1112    const CODEC: ScalarCodec = ScalarCodec::Principal;
1113
1114    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1115        self.to_bytes()
1116            .map_err(|err| InternalError::persisted_row_field_encode_failed("principal", err))
1117    }
1118
1119    fn decode_scalar_payload(
1120        bytes: &[u8],
1121        field_name: &'static str,
1122    ) -> Result<Self, InternalError> {
1123        Self::try_from_bytes(bytes)
1124            .map_err(|err| InternalError::persisted_row_field_decode_failed(field_name, err))
1125    }
1126}
1127
1128impl PersistedScalar for Subaccount {
1129    const CODEC: ScalarCodec = ScalarCodec::Subaccount;
1130
1131    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1132        Ok(self.to_bytes().to_vec())
1133    }
1134
1135    fn decode_scalar_payload(
1136        bytes: &[u8],
1137        field_name: &'static str,
1138    ) -> Result<Self, InternalError> {
1139        let raw: [u8; SCALAR_SUBACCOUNT_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
1140            InternalError::persisted_row_field_payload_exact_len_required(
1141                field_name,
1142                "subaccount",
1143                SCALAR_SUBACCOUNT_PAYLOAD_LEN,
1144            )
1145        })?;
1146
1147        Ok(Self::from_array(raw))
1148    }
1149}
1150
1151impl PersistedScalar for () {
1152    const CODEC: ScalarCodec = ScalarCodec::Unit;
1153
1154    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1155        Ok(Vec::new())
1156    }
1157
1158    fn decode_scalar_payload(
1159        bytes: &[u8],
1160        field_name: &'static str,
1161    ) -> Result<Self, InternalError> {
1162        if !bytes.is_empty() {
1163            return Err(InternalError::persisted_row_field_payload_must_be_empty(
1164                field_name, "unit",
1165            ));
1166        }
1167
1168        Ok(())
1169    }
1170}