Skip to main content

icydb_core/db/data/persisted_row/
codec.rs

1//! Module: db::data::persisted_row::codec
2//! Defines the persisted scalar row-slot encoding and borrowed decoding helpers
3//! used by runtime row access.
4
5use crate::{
6    db::data::{decode_structural_value_storage_bytes, encode_structural_value_storage_bytes},
7    error::InternalError,
8    model::field::ScalarCodec,
9    traits::{FieldValue, field_value_vec_from_value},
10    types::{Blob, Date, Duration, Float32, Float64, Principal, Subaccount, Timestamp, Ulid, Unit},
11    value::Value,
12};
13use serde::{Serialize, de::DeserializeOwned};
14use serde_cbor::{from_slice, to_vec};
15use std::{
16    panic::{AssertUnwindSafe, catch_unwind},
17    str,
18};
19
20const SCALAR_SLOT_PREFIX: u8 = 0xFF;
21const SCALAR_SLOT_TAG_NULL: u8 = 0;
22const SCALAR_SLOT_TAG_VALUE: u8 = 1;
23const CBOR_NULL_PAYLOAD: [u8; 1] = [0xF6];
24
25const SCALAR_BOOL_PAYLOAD_LEN: usize = 1;
26const SCALAR_WORD32_PAYLOAD_LEN: usize = 4;
27const SCALAR_WORD64_PAYLOAD_LEN: usize = 8;
28const SCALAR_ULID_PAYLOAD_LEN: usize = 16;
29const SCALAR_SUBACCOUNT_PAYLOAD_LEN: usize = 32;
30
31const SCALAR_BOOL_FALSE_TAG: u8 = 0;
32const SCALAR_BOOL_TRUE_TAG: u8 = 1;
33
34///
35/// ScalarValueRef
36///
37/// ScalarValueRef is the borrowed-or-copy scalar payload view returned by the
38/// slot-reader fast path.
39/// It preserves cheap references for text/blob payloads while keeping fixed
40/// width scalar wrappers as copy values.
41///
42
43#[derive(Clone, Copy, Debug)]
44pub enum ScalarValueRef<'a> {
45    Blob(&'a [u8]),
46    Bool(bool),
47    Date(Date),
48    Duration(Duration),
49    Float32(Float32),
50    Float64(Float64),
51    Int(i64),
52    Principal(Principal),
53    Subaccount(Subaccount),
54    Text(&'a str),
55    Timestamp(Timestamp),
56    Uint(u64),
57    Ulid(Ulid),
58    Unit,
59}
60
61impl ScalarValueRef<'_> {
62    /// Materialize this scalar view into the runtime `Value` enum.
63    #[must_use]
64    pub fn into_value(self) -> Value {
65        match self {
66            Self::Blob(value) => Value::Blob(value.to_vec()),
67            Self::Bool(value) => Value::Bool(value),
68            Self::Date(value) => Value::Date(value),
69            Self::Duration(value) => Value::Duration(value),
70            Self::Float32(value) => Value::Float32(value),
71            Self::Float64(value) => Value::Float64(value),
72            Self::Int(value) => Value::Int(value),
73            Self::Principal(value) => Value::Principal(value),
74            Self::Subaccount(value) => Value::Subaccount(value),
75            Self::Text(value) => Value::Text(value.to_owned()),
76            Self::Timestamp(value) => Value::Timestamp(value),
77            Self::Uint(value) => Value::Uint(value),
78            Self::Ulid(value) => Value::Ulid(value),
79            Self::Unit => Value::Unit,
80        }
81    }
82}
83
84///
85/// ScalarSlotValueRef
86///
87/// ScalarSlotValueRef preserves the distinction between a missing slot and an
88/// explicitly persisted `NULL` scalar payload.
89/// The outer `Option` from `SlotReader::get_scalar` therefore still means
90/// "slot absent".
91///
92
93#[derive(Clone, Copy, Debug)]
94pub enum ScalarSlotValueRef<'a> {
95    Null,
96    Value(ScalarValueRef<'a>),
97}
98
99///
100/// PersistedScalar
101///
102/// PersistedScalar defines the canonical binary payload codec for one scalar
103/// leaf type.
104/// Derive-generated persisted-row materializers and writers use this trait to
105/// avoid routing scalar fields back through CBOR.
106///
107
108pub trait PersistedScalar: Sized {
109    /// Canonical scalar codec identifier used by schema/runtime metadata.
110    const CODEC: ScalarCodec;
111
112    /// Encode this scalar value into its codec-specific payload bytes.
113    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError>;
114
115    /// Decode this scalar value from its codec-specific payload bytes.
116    fn decode_scalar_payload(bytes: &[u8], field_name: &'static str)
117    -> Result<Self, InternalError>;
118}
119
120/// Encode one persisted slot payload using the shared leaf codec boundary.
121pub fn encode_persisted_slot_payload<T>(
122    value: &T,
123    field_name: &'static str,
124) -> Result<Vec<u8>, InternalError>
125where
126    T: Serialize,
127{
128    encode_structural_slot_payload_bytes(value, field_name)
129}
130
131/// Encode one persisted scalar slot payload using the canonical scalar envelope.
132pub fn encode_persisted_scalar_slot_payload<T>(
133    value: &T,
134    field_name: &'static str,
135) -> Result<Vec<u8>, InternalError>
136where
137    T: PersistedScalar,
138{
139    let payload = value.encode_scalar_payload()?;
140    let mut encoded = Vec::with_capacity(payload.len() + 2);
141    encoded.push(SCALAR_SLOT_PREFIX);
142    encoded.push(SCALAR_SLOT_TAG_VALUE);
143    encoded.extend_from_slice(&payload);
144
145    if encoded.len() < 2 {
146        return Err(InternalError::persisted_row_field_encode_failed(
147            field_name,
148            "scalar payload envelope underflow",
149        ));
150    }
151
152    Ok(encoded)
153}
154
155/// Encode one optional persisted scalar slot payload preserving explicit `NULL`.
156pub fn encode_persisted_option_scalar_slot_payload<T>(
157    value: &Option<T>,
158    field_name: &'static str,
159) -> Result<Vec<u8>, InternalError>
160where
161    T: PersistedScalar,
162{
163    match value {
164        Some(value) => encode_persisted_scalar_slot_payload(value, field_name),
165        None => Ok(vec![SCALAR_SLOT_PREFIX, SCALAR_SLOT_TAG_NULL]),
166    }
167}
168
169/// Decode one persisted slot payload using the shared leaf codec boundary.
170pub fn decode_persisted_slot_payload<T>(
171    bytes: &[u8],
172    field_name: &'static str,
173) -> Result<T, InternalError>
174where
175    T: DeserializeOwned,
176{
177    decode_structural_slot_payload_bytes(bytes, field_name)
178}
179
180// Decode one structural persisted payload, preserving the explicit CBOR null
181// sentinel instead of forcing each wrapper to repeat the same branch.
182fn decode_persisted_structural_slot_payload<T>(
183    bytes: &[u8],
184    field_name: &'static str,
185) -> Result<Option<T>, InternalError>
186where
187    T: DeserializeOwned,
188{
189    if persisted_slot_payload_is_null(bytes) {
190        return Ok(None);
191    }
192
193    decode_persisted_slot_payload(bytes, field_name).map(Some)
194}
195
196/// Decode one non-null persisted slot payload through the shared leaf codec boundary.
197pub fn decode_persisted_non_null_slot_payload<T>(
198    bytes: &[u8],
199    field_name: &'static str,
200) -> Result<T, InternalError>
201where
202    T: DeserializeOwned,
203{
204    decode_persisted_structural_slot_payload(bytes, field_name)?.ok_or_else(|| {
205        InternalError::persisted_row_field_decode_failed(
206            field_name,
207            "unexpected null for non-nullable field",
208        )
209    })
210}
211
212/// Decode one optional persisted slot payload preserving explicit CBOR `NULL`.
213pub fn decode_persisted_option_slot_payload<T>(
214    bytes: &[u8],
215    field_name: &'static str,
216) -> Result<Option<T>, InternalError>
217where
218    T: DeserializeOwned,
219{
220    decode_persisted_structural_slot_payload(bytes, field_name)
221}
222
223// Encode one arbitrary structural field payload directly into its persisted
224// CBOR bytes without routing through the generic serialize facade.
225fn encode_structural_slot_payload_bytes<T>(
226    value: &T,
227    field_name: &'static str,
228) -> Result<Vec<u8>, InternalError>
229where
230    T: Serialize,
231{
232    to_vec(value).map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err))
233}
234
235// Decode one arbitrary structural field payload directly from its persisted
236// CBOR bytes while preserving the local "no panic escapes decode" boundary.
237fn decode_structural_slot_payload_bytes<T>(
238    bytes: &[u8],
239    field_name: &'static str,
240) -> Result<T, InternalError>
241where
242    T: DeserializeOwned,
243{
244    let result = catch_unwind(AssertUnwindSafe(|| from_slice(bytes)));
245
246    match result {
247        Ok(Ok(value)) => Ok(value),
248        Ok(Err(err)) => Err(InternalError::persisted_row_field_decode_failed(
249            field_name, err,
250        )),
251        Err(_) => Err(InternalError::persisted_row_field_decode_failed(
252            field_name,
253            "panic during persisted structural field decode",
254        )),
255    }
256}
257
258// Decode one persisted `Value` payload, then let a narrow owner-local
259// conversion closure rebuild the typed field contract.
260fn decode_persisted_value_payload_as<T, F>(
261    bytes: &[u8],
262    field_name: &'static str,
263    mismatch: impl FnOnce() -> String,
264    decode: F,
265) -> Result<T, InternalError>
266where
267    F: FnOnce(&Value) -> Option<T>,
268{
269    let value = decode_structural_value_storage_bytes(bytes)
270        .map_err(|err| InternalError::persisted_row_field_decode_failed(field_name, err))?;
271
272    decode(&value)
273        .ok_or_else(|| InternalError::persisted_row_field_decode_failed(field_name, mismatch()))
274}
275
276// Encode one already materialized `Value` through the shared structural leaf
277// payload seam.
278fn encode_persisted_value_payload(
279    value: Value,
280    field_name: &'static str,
281) -> Result<Vec<u8>, InternalError> {
282    encode_structural_value_storage_bytes(&value)
283        .map_err(|err| InternalError::persisted_row_field_encode_failed(field_name, err))
284}
285
286/// Decode one persisted custom-schema payload through `Value` and reconstruct
287/// the typed field via `FieldValue`.
288pub fn decode_persisted_custom_slot_payload<T>(
289    bytes: &[u8],
290    field_name: &'static str,
291) -> Result<T, InternalError>
292where
293    T: FieldValue,
294{
295    decode_persisted_value_payload_as(
296        bytes,
297        field_name,
298        || {
299            format!(
300                "value payload does not match {}",
301                std::any::type_name::<T>()
302            )
303        },
304        T::from_value,
305    )
306}
307
308/// Decode one persisted repeated custom-schema payload through `Value::List`
309/// and reconstruct each item via `FieldValue`.
310pub fn decode_persisted_custom_many_slot_payload<T>(
311    bytes: &[u8],
312    field_name: &'static str,
313) -> Result<Vec<T>, InternalError>
314where
315    T: FieldValue,
316{
317    decode_persisted_value_payload_as(
318        bytes,
319        field_name,
320        || {
321            format!(
322                "value payload does not match Vec<{}>",
323                std::any::type_name::<T>()
324            )
325        },
326        field_value_vec_from_value::<T>,
327    )
328}
329
330/// Encode one custom-schema field payload through its `FieldValue`
331/// representation so structural decode preserves nested custom values.
332pub fn encode_persisted_custom_slot_payload<T>(
333    value: &T,
334    field_name: &'static str,
335) -> Result<Vec<u8>, InternalError>
336where
337    T: FieldValue,
338{
339    encode_persisted_value_payload(value.to_value(), field_name)
340}
341
342/// Encode one repeated custom-schema payload through `Value::List`.
343pub fn encode_persisted_custom_many_slot_payload<T>(
344    values: &[T],
345    field_name: &'static str,
346) -> Result<Vec<u8>, InternalError>
347where
348    T: FieldValue,
349{
350    encode_persisted_value_payload(
351        Value::List(values.iter().map(FieldValue::to_value).collect()),
352        field_name,
353    )
354}
355
356/// Decode one persisted scalar slot payload using the canonical scalar envelope.
357pub fn decode_persisted_scalar_slot_payload<T>(
358    bytes: &[u8],
359    field_name: &'static str,
360) -> Result<T, InternalError>
361where
362    T: PersistedScalar,
363{
364    let payload = decode_scalar_slot_payload_body(bytes, field_name)?.ok_or_else(|| {
365        InternalError::persisted_row_field_decode_failed(
366            field_name,
367            "unexpected null for non-nullable scalar field",
368        )
369    })?;
370
371    T::decode_scalar_payload(payload, field_name)
372}
373
374/// Decode one optional persisted scalar slot payload preserving explicit `NULL`.
375pub fn decode_persisted_option_scalar_slot_payload<T>(
376    bytes: &[u8],
377    field_name: &'static str,
378) -> Result<Option<T>, InternalError>
379where
380    T: PersistedScalar,
381{
382    let Some(payload) = decode_scalar_slot_payload_body(bytes, field_name)? else {
383        return Ok(None);
384    };
385
386    T::decode_scalar_payload(payload, field_name).map(Some)
387}
388
389// Detect the canonical persisted CBOR null payload used by optional structural slots.
390fn persisted_slot_payload_is_null(bytes: &[u8]) -> bool {
391    bytes == CBOR_NULL_PAYLOAD
392}
393
394// Encode one scalar slot value into the canonical prefixed scalar envelope.
395pub(super) fn encode_scalar_slot_value(value: ScalarSlotValueRef<'_>) -> Vec<u8> {
396    match value {
397        ScalarSlotValueRef::Null => vec![SCALAR_SLOT_PREFIX, SCALAR_SLOT_TAG_NULL],
398        ScalarSlotValueRef::Value(value) => {
399            let mut encoded = Vec::new();
400            encoded.push(SCALAR_SLOT_PREFIX);
401            encoded.push(SCALAR_SLOT_TAG_VALUE);
402
403            match value {
404                ScalarValueRef::Blob(bytes) => encoded.extend_from_slice(bytes),
405                ScalarValueRef::Bool(value) => encoded.push(u8::from(value)),
406                ScalarValueRef::Date(value) => {
407                    encoded.extend_from_slice(&value.as_days_since_epoch().to_le_bytes());
408                }
409                ScalarValueRef::Duration(value) => {
410                    encoded.extend_from_slice(&value.as_millis().to_le_bytes());
411                }
412                ScalarValueRef::Float32(value) => {
413                    encoded.extend_from_slice(&value.get().to_bits().to_le_bytes());
414                }
415                ScalarValueRef::Float64(value) => {
416                    encoded.extend_from_slice(&value.get().to_bits().to_le_bytes());
417                }
418                ScalarValueRef::Int(value) => encoded.extend_from_slice(&value.to_le_bytes()),
419                ScalarValueRef::Principal(value) => encoded.extend_from_slice(value.as_slice()),
420                ScalarValueRef::Subaccount(value) => encoded.extend_from_slice(&value.to_bytes()),
421                ScalarValueRef::Text(value) => encoded.extend_from_slice(value.as_bytes()),
422                ScalarValueRef::Timestamp(value) => {
423                    encoded.extend_from_slice(&value.as_millis().to_le_bytes());
424                }
425                ScalarValueRef::Uint(value) => encoded.extend_from_slice(&value.to_le_bytes()),
426                ScalarValueRef::Ulid(value) => encoded.extend_from_slice(&value.to_bytes()),
427                ScalarValueRef::Unit => {}
428            }
429
430            encoded
431        }
432    }
433}
434
435// Split one scalar slot envelope into `NULL` vs payload bytes.
436fn decode_scalar_slot_payload_body<'a>(
437    bytes: &'a [u8],
438    field_name: &'static str,
439) -> Result<Option<&'a [u8]>, InternalError> {
440    let Some((&prefix, rest)) = bytes.split_first() else {
441        return Err(InternalError::persisted_row_field_decode_failed(
442            field_name,
443            "empty scalar payload",
444        ));
445    };
446    if prefix != SCALAR_SLOT_PREFIX {
447        return Err(InternalError::persisted_row_field_decode_failed(
448            field_name,
449            format!(
450                "scalar payload prefix mismatch: expected slot envelope prefix byte 0x{SCALAR_SLOT_PREFIX:02X}, found 0x{prefix:02X}",
451            ),
452        ));
453    }
454    let Some((&tag, payload)) = rest.split_first() else {
455        return Err(InternalError::persisted_row_field_decode_failed(
456            field_name,
457            "truncated scalar payload tag",
458        ));
459    };
460
461    match tag {
462        SCALAR_SLOT_TAG_NULL => {
463            if !payload.is_empty() {
464                return Err(InternalError::persisted_row_field_decode_failed(
465                    field_name,
466                    "null scalar payload has trailing bytes",
467                ));
468            }
469
470            Ok(None)
471        }
472        SCALAR_SLOT_TAG_VALUE => Ok(Some(payload)),
473        _ => Err(InternalError::persisted_row_field_decode_failed(
474            field_name,
475            format!("invalid scalar payload tag {tag}"),
476        )),
477    }
478}
479
480// Decode one scalar slot view using the field-declared scalar codec.
481#[expect(clippy::too_many_lines)]
482pub(super) fn decode_scalar_slot_value<'a>(
483    bytes: &'a [u8],
484    codec: ScalarCodec,
485    field_name: &'static str,
486) -> Result<ScalarSlotValueRef<'a>, InternalError> {
487    let Some(payload) = decode_scalar_slot_payload_body(bytes, field_name)? else {
488        return Ok(ScalarSlotValueRef::Null);
489    };
490
491    let value = match codec {
492        ScalarCodec::Blob => ScalarValueRef::Blob(payload),
493        ScalarCodec::Bool => {
494            let [value] = payload else {
495                return Err(
496                    InternalError::persisted_row_field_payload_exact_len_required(
497                        field_name,
498                        "bool",
499                        SCALAR_BOOL_PAYLOAD_LEN,
500                    ),
501                );
502            };
503            match *value {
504                SCALAR_BOOL_FALSE_TAG => ScalarValueRef::Bool(false),
505                SCALAR_BOOL_TRUE_TAG => ScalarValueRef::Bool(true),
506                _ => {
507                    return Err(InternalError::persisted_row_field_payload_invalid_byte(
508                        field_name, "bool", *value,
509                    ));
510                }
511            }
512        }
513        ScalarCodec::Date => {
514            let bytes: [u8; SCALAR_WORD32_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
515                InternalError::persisted_row_field_payload_exact_len_required(
516                    field_name,
517                    "date",
518                    SCALAR_WORD32_PAYLOAD_LEN,
519                )
520            })?;
521            ScalarValueRef::Date(Date::from_days_since_epoch(i32::from_le_bytes(bytes)))
522        }
523        ScalarCodec::Duration => {
524            let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
525                InternalError::persisted_row_field_payload_exact_len_required(
526                    field_name,
527                    "duration",
528                    SCALAR_WORD64_PAYLOAD_LEN,
529                )
530            })?;
531            ScalarValueRef::Duration(Duration::from_millis(u64::from_le_bytes(bytes)))
532        }
533        ScalarCodec::Float32 => {
534            let bytes: [u8; SCALAR_WORD32_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
535                InternalError::persisted_row_field_payload_exact_len_required(
536                    field_name,
537                    "float32",
538                    SCALAR_WORD32_PAYLOAD_LEN,
539                )
540            })?;
541            let value = f32::from_bits(u32::from_le_bytes(bytes));
542            let value = Float32::try_new(value).ok_or_else(|| {
543                InternalError::persisted_row_field_payload_non_finite(field_name, "float32")
544            })?;
545            ScalarValueRef::Float32(value)
546        }
547        ScalarCodec::Float64 => {
548            let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
549                InternalError::persisted_row_field_payload_exact_len_required(
550                    field_name,
551                    "float64",
552                    SCALAR_WORD64_PAYLOAD_LEN,
553                )
554            })?;
555            let value = f64::from_bits(u64::from_le_bytes(bytes));
556            let value = Float64::try_new(value).ok_or_else(|| {
557                InternalError::persisted_row_field_payload_non_finite(field_name, "float64")
558            })?;
559            ScalarValueRef::Float64(value)
560        }
561        ScalarCodec::Int64 => {
562            let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
563                InternalError::persisted_row_field_payload_exact_len_required(
564                    field_name,
565                    "int",
566                    SCALAR_WORD64_PAYLOAD_LEN,
567                )
568            })?;
569            ScalarValueRef::Int(i64::from_le_bytes(bytes))
570        }
571        ScalarCodec::Principal => ScalarValueRef::Principal(
572            Principal::try_from_bytes(payload)
573                .map_err(|err| InternalError::persisted_row_field_decode_failed(field_name, err))?,
574        ),
575        ScalarCodec::Subaccount => {
576            let bytes: [u8; SCALAR_SUBACCOUNT_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
577                InternalError::persisted_row_field_payload_exact_len_required(
578                    field_name,
579                    "subaccount",
580                    SCALAR_SUBACCOUNT_PAYLOAD_LEN,
581                )
582            })?;
583            ScalarValueRef::Subaccount(Subaccount::from_array(bytes))
584        }
585        ScalarCodec::Text => {
586            let value = str::from_utf8(payload).map_err(|err| {
587                InternalError::persisted_row_field_text_payload_invalid_utf8(field_name, err)
588            })?;
589            ScalarValueRef::Text(value)
590        }
591        ScalarCodec::Timestamp => {
592            let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
593                InternalError::persisted_row_field_payload_exact_len_required(
594                    field_name,
595                    "timestamp",
596                    SCALAR_WORD64_PAYLOAD_LEN,
597                )
598            })?;
599            ScalarValueRef::Timestamp(Timestamp::from_millis(i64::from_le_bytes(bytes)))
600        }
601        ScalarCodec::Uint64 => {
602            let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
603                InternalError::persisted_row_field_payload_exact_len_required(
604                    field_name,
605                    "uint",
606                    SCALAR_WORD64_PAYLOAD_LEN,
607                )
608            })?;
609            ScalarValueRef::Uint(u64::from_le_bytes(bytes))
610        }
611        ScalarCodec::Ulid => {
612            let bytes: [u8; SCALAR_ULID_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
613                InternalError::persisted_row_field_payload_exact_len_required(
614                    field_name,
615                    "ulid",
616                    SCALAR_ULID_PAYLOAD_LEN,
617                )
618            })?;
619            ScalarValueRef::Ulid(Ulid::from_bytes(bytes))
620        }
621        ScalarCodec::Unit => {
622            if !payload.is_empty() {
623                return Err(InternalError::persisted_row_field_payload_must_be_empty(
624                    field_name, "unit",
625                ));
626            }
627            ScalarValueRef::Unit
628        }
629    };
630
631    Ok(ScalarSlotValueRef::Value(value))
632}
633
634macro_rules! impl_persisted_scalar_signed {
635    ($($ty:ty),* $(,)?) => {
636        $(
637            impl PersistedScalar for $ty {
638                const CODEC: ScalarCodec = ScalarCodec::Int64;
639
640                fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
641                    Ok(i64::from(*self).to_le_bytes().to_vec())
642                }
643
644                fn decode_scalar_payload(
645                    bytes: &[u8],
646                    field_name: &'static str,
647                ) -> Result<Self, InternalError> {
648                    let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
649                        InternalError::persisted_row_field_payload_exact_len_required(
650                            field_name,
651                            "int",
652                            SCALAR_WORD64_PAYLOAD_LEN,
653                        )
654                    })?;
655                    <$ty>::try_from(i64::from_le_bytes(raw)).map_err(|_| {
656                        InternalError::persisted_row_field_payload_out_of_range(
657                            field_name,
658                            "integer",
659                        )
660                    })
661                }
662            }
663        )*
664    };
665}
666
667macro_rules! impl_persisted_scalar_unsigned {
668    ($($ty:ty),* $(,)?) => {
669        $(
670            impl PersistedScalar for $ty {
671                const CODEC: ScalarCodec = ScalarCodec::Uint64;
672
673                fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
674                    Ok(u64::from(*self).to_le_bytes().to_vec())
675                }
676
677                fn decode_scalar_payload(
678                    bytes: &[u8],
679                    field_name: &'static str,
680                ) -> Result<Self, InternalError> {
681                    let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
682                        InternalError::persisted_row_field_payload_exact_len_required(
683                            field_name,
684                            "uint",
685                            SCALAR_WORD64_PAYLOAD_LEN,
686                        )
687                    })?;
688                    <$ty>::try_from(u64::from_le_bytes(raw)).map_err(|_| {
689                        InternalError::persisted_row_field_payload_out_of_range(
690                            field_name,
691                            "unsigned",
692                        )
693                    })
694                }
695            }
696        )*
697    };
698}
699
700impl_persisted_scalar_signed!(i8, i16, i32, i64);
701impl_persisted_scalar_unsigned!(u8, u16, u32, u64);
702
703impl PersistedScalar for bool {
704    const CODEC: ScalarCodec = ScalarCodec::Bool;
705
706    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
707        Ok(vec![u8::from(*self)])
708    }
709
710    fn decode_scalar_payload(
711        bytes: &[u8],
712        field_name: &'static str,
713    ) -> Result<Self, InternalError> {
714        let [value] = bytes else {
715            return Err(
716                InternalError::persisted_row_field_payload_exact_len_required(
717                    field_name,
718                    "bool",
719                    SCALAR_BOOL_PAYLOAD_LEN,
720                ),
721            );
722        };
723
724        match *value {
725            SCALAR_BOOL_FALSE_TAG => Ok(false),
726            SCALAR_BOOL_TRUE_TAG => Ok(true),
727            _ => Err(InternalError::persisted_row_field_payload_invalid_byte(
728                field_name, "bool", *value,
729            )),
730        }
731    }
732}
733
734impl PersistedScalar for String {
735    const CODEC: ScalarCodec = ScalarCodec::Text;
736
737    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
738        Ok(self.as_bytes().to_vec())
739    }
740
741    fn decode_scalar_payload(
742        bytes: &[u8],
743        field_name: &'static str,
744    ) -> Result<Self, InternalError> {
745        str::from_utf8(bytes).map(str::to_owned).map_err(|err| {
746            InternalError::persisted_row_field_text_payload_invalid_utf8(field_name, err)
747        })
748    }
749}
750
751impl PersistedScalar for Vec<u8> {
752    const CODEC: ScalarCodec = ScalarCodec::Blob;
753
754    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
755        Ok(self.clone())
756    }
757
758    fn decode_scalar_payload(
759        bytes: &[u8],
760        _field_name: &'static str,
761    ) -> Result<Self, InternalError> {
762        Ok(bytes.to_vec())
763    }
764}
765
766impl PersistedScalar for Blob {
767    const CODEC: ScalarCodec = ScalarCodec::Blob;
768
769    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
770        Ok(self.to_vec())
771    }
772
773    fn decode_scalar_payload(
774        bytes: &[u8],
775        _field_name: &'static str,
776    ) -> Result<Self, InternalError> {
777        Ok(Self::from(bytes))
778    }
779}
780
781impl PersistedScalar for Ulid {
782    const CODEC: ScalarCodec = ScalarCodec::Ulid;
783
784    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
785        Ok(self.to_bytes().to_vec())
786    }
787
788    fn decode_scalar_payload(
789        bytes: &[u8],
790        field_name: &'static str,
791    ) -> Result<Self, InternalError> {
792        Self::try_from_bytes(bytes)
793            .map_err(|err| InternalError::persisted_row_field_decode_failed(field_name, err))
794    }
795}
796
797impl PersistedScalar for Timestamp {
798    const CODEC: ScalarCodec = ScalarCodec::Timestamp;
799
800    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
801        Ok(self.as_millis().to_le_bytes().to_vec())
802    }
803
804    fn decode_scalar_payload(
805        bytes: &[u8],
806        field_name: &'static str,
807    ) -> Result<Self, InternalError> {
808        let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
809            InternalError::persisted_row_field_payload_exact_len_required(
810                field_name,
811                "timestamp",
812                SCALAR_WORD64_PAYLOAD_LEN,
813            )
814        })?;
815
816        Ok(Self::from_millis(i64::from_le_bytes(raw)))
817    }
818}
819
820impl PersistedScalar for Date {
821    const CODEC: ScalarCodec = ScalarCodec::Date;
822
823    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
824        Ok(self.as_days_since_epoch().to_le_bytes().to_vec())
825    }
826
827    fn decode_scalar_payload(
828        bytes: &[u8],
829        field_name: &'static str,
830    ) -> Result<Self, InternalError> {
831        let raw: [u8; SCALAR_WORD32_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
832            InternalError::persisted_row_field_payload_exact_len_required(
833                field_name,
834                "date",
835                SCALAR_WORD32_PAYLOAD_LEN,
836            )
837        })?;
838
839        Ok(Self::from_days_since_epoch(i32::from_le_bytes(raw)))
840    }
841}
842
843impl PersistedScalar for Duration {
844    const CODEC: ScalarCodec = ScalarCodec::Duration;
845
846    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
847        Ok(self.as_millis().to_le_bytes().to_vec())
848    }
849
850    fn decode_scalar_payload(
851        bytes: &[u8],
852        field_name: &'static str,
853    ) -> Result<Self, InternalError> {
854        let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
855            InternalError::persisted_row_field_payload_exact_len_required(
856                field_name,
857                "duration",
858                SCALAR_WORD64_PAYLOAD_LEN,
859            )
860        })?;
861
862        Ok(Self::from_millis(u64::from_le_bytes(raw)))
863    }
864}
865
866impl PersistedScalar for Float32 {
867    const CODEC: ScalarCodec = ScalarCodec::Float32;
868
869    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
870        Ok(self.get().to_bits().to_le_bytes().to_vec())
871    }
872
873    fn decode_scalar_payload(
874        bytes: &[u8],
875        field_name: &'static str,
876    ) -> Result<Self, InternalError> {
877        let raw: [u8; SCALAR_WORD32_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
878            InternalError::persisted_row_field_payload_exact_len_required(
879                field_name,
880                "float32",
881                SCALAR_WORD32_PAYLOAD_LEN,
882            )
883        })?;
884        let value = f32::from_bits(u32::from_le_bytes(raw));
885
886        Self::try_new(value).ok_or_else(|| {
887            InternalError::persisted_row_field_payload_non_finite(field_name, "float32")
888        })
889    }
890}
891
892impl PersistedScalar for Float64 {
893    const CODEC: ScalarCodec = ScalarCodec::Float64;
894
895    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
896        Ok(self.get().to_bits().to_le_bytes().to_vec())
897    }
898
899    fn decode_scalar_payload(
900        bytes: &[u8],
901        field_name: &'static str,
902    ) -> Result<Self, InternalError> {
903        let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
904            InternalError::persisted_row_field_payload_exact_len_required(
905                field_name,
906                "float64",
907                SCALAR_WORD64_PAYLOAD_LEN,
908            )
909        })?;
910        let value = f64::from_bits(u64::from_le_bytes(raw));
911
912        Self::try_new(value).ok_or_else(|| {
913            InternalError::persisted_row_field_payload_non_finite(field_name, "float64")
914        })
915    }
916}
917
918impl PersistedScalar for Principal {
919    const CODEC: ScalarCodec = ScalarCodec::Principal;
920
921    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
922        self.to_bytes()
923            .map_err(|err| InternalError::persisted_row_field_encode_failed("principal", err))
924    }
925
926    fn decode_scalar_payload(
927        bytes: &[u8],
928        field_name: &'static str,
929    ) -> Result<Self, InternalError> {
930        Self::try_from_bytes(bytes)
931            .map_err(|err| InternalError::persisted_row_field_decode_failed(field_name, err))
932    }
933}
934
935impl PersistedScalar for Subaccount {
936    const CODEC: ScalarCodec = ScalarCodec::Subaccount;
937
938    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
939        Ok(self.to_bytes().to_vec())
940    }
941
942    fn decode_scalar_payload(
943        bytes: &[u8],
944        field_name: &'static str,
945    ) -> Result<Self, InternalError> {
946        let raw: [u8; SCALAR_SUBACCOUNT_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
947            InternalError::persisted_row_field_payload_exact_len_required(
948                field_name,
949                "subaccount",
950                SCALAR_SUBACCOUNT_PAYLOAD_LEN,
951            )
952        })?;
953
954        Ok(Self::from_array(raw))
955    }
956}
957
958impl PersistedScalar for () {
959    const CODEC: ScalarCodec = ScalarCodec::Unit;
960
961    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
962        Ok(Vec::new())
963    }
964
965    fn decode_scalar_payload(
966        bytes: &[u8],
967        field_name: &'static str,
968    ) -> Result<Self, InternalError> {
969        if !bytes.is_empty() {
970            return Err(InternalError::persisted_row_field_payload_must_be_empty(
971                field_name, "unit",
972            ));
973        }
974
975        Ok(())
976    }
977}
978
979impl PersistedScalar for Unit {
980    const CODEC: ScalarCodec = ScalarCodec::Unit;
981
982    fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
983        Ok(Vec::new())
984    }
985
986    fn decode_scalar_payload(
987        bytes: &[u8],
988        field_name: &'static str,
989    ) -> Result<Self, InternalError> {
990        if !bytes.is_empty() {
991            return Err(InternalError::persisted_row_field_payload_must_be_empty(
992                field_name, "unit",
993            ));
994        }
995
996        Ok(Self)
997    }
998}