1use crate::{
8 db::data::{
9 DataKey, RawRow, StructuralRowDecodeError, StructuralRowFieldBytes,
10 decode_storage_key_field_bytes, decode_structural_field_by_kind_bytes,
11 decode_structural_value_storage_bytes,
12 },
13 error::InternalError,
14 model::{
15 entity::{EntityModel, resolve_primary_key_slot},
16 field::{FieldModel, LeafCodec, ScalarCodec},
17 },
18 serialize::{deserialize, serialize},
19 traits::EntityKind,
20 types::{Blob, Date, Duration, Float32, Float64, Principal, Subaccount, Timestamp, Ulid},
21 value::{StorageKey, Value},
22};
23use std::str;
24
25const SCALAR_SLOT_PREFIX: u8 = 0xFF;
26const SCALAR_SLOT_TAG_NULL: u8 = 0;
27const SCALAR_SLOT_TAG_VALUE: u8 = 1;
28
29const SCALAR_BOOL_PAYLOAD_LEN: usize = 1;
30const SCALAR_WORD32_PAYLOAD_LEN: usize = 4;
31const SCALAR_WORD64_PAYLOAD_LEN: usize = 8;
32const SCALAR_ULID_PAYLOAD_LEN: usize = 16;
33const SCALAR_SUBACCOUNT_PAYLOAD_LEN: usize = 32;
34
35const SCALAR_BOOL_FALSE_TAG: u8 = 0;
36const SCALAR_BOOL_TRUE_TAG: u8 = 1;
37
38pub trait SlotReader {
47 fn model(&self) -> &'static EntityModel;
49
50 fn has(&self, slot: usize) -> bool;
52
53 fn get_bytes(&self, slot: usize) -> Option<&[u8]>;
55
56 fn get_scalar(&self, slot: usize) -> Result<Option<ScalarSlotValueRef<'_>>, InternalError>;
58
59 fn get_value(&mut self, slot: usize) -> Result<Option<Value>, InternalError>;
61}
62
63pub trait SlotWriter {
71 fn write_slot(&mut self, slot: usize, payload: Option<&[u8]>) -> Result<(), InternalError>;
73
74 fn write_scalar(
76 &mut self,
77 slot: usize,
78 value: ScalarSlotValueRef<'_>,
79 ) -> Result<(), InternalError> {
80 let payload = encode_scalar_slot_value(value);
81
82 self.write_slot(slot, Some(payload.as_slice()))
83 }
84}
85
86pub trait PersistedRow: EntityKind + Sized {
96 fn materialize_from_slots(slots: &mut dyn SlotReader) -> Result<Self, InternalError>;
98
99 fn write_slots(&self, out: &mut dyn SlotWriter) -> Result<(), InternalError>;
101
102 fn project_slot(slots: &mut dyn SlotReader, slot: usize) -> Result<Option<Value>, InternalError>
104 where
105 Self: crate::traits::FieldProjection,
106 {
107 let entity = Self::materialize_from_slots(slots)?;
108
109 Ok(<Self as crate::traits::FieldProjection>::get_value_by_index(&entity, slot))
110 }
111}
112
113pub(in crate::db) fn decode_slot_value_by_contract(
116 slots: &dyn SlotReader,
117 slot: usize,
118) -> Result<Option<Value>, InternalError> {
119 let field = slots.model().fields().get(slot).ok_or_else(|| {
120 InternalError::index_invariant(format!(
121 "slot lookup outside model bounds during structural row access: model='{}' slot={slot}",
122 slots.model().path(),
123 ))
124 })?;
125
126 if matches!(field.leaf_codec(), LeafCodec::Scalar(_))
127 && let Some(value) = slots.get_scalar(slot)?
128 {
129 return Ok(Some(match value {
130 ScalarSlotValueRef::Null => Value::Null,
131 ScalarSlotValueRef::Value(value) => value.into_value(),
132 }));
133 }
134
135 match slots.get_bytes(slot) {
136 Some(raw_value) => decode_non_scalar_slot_value(raw_value, field).map(Some),
137 None => Ok(None),
138 }
139}
140
141fn decode_non_scalar_slot_value(
144 raw_value: &[u8],
145 field: &FieldModel,
146) -> Result<Value, InternalError> {
147 let decoded = match field.storage_decode() {
148 crate::model::field::FieldStorageDecode::ByKind => {
149 decode_structural_field_by_kind_bytes(raw_value, field.kind())
150 }
151 crate::model::field::FieldStorageDecode::Value => {
152 decode_structural_value_storage_bytes(raw_value)
153 }
154 };
155
156 decoded.map_err(|err| {
157 InternalError::serialize_corruption(format!(
158 "row decode failed for field '{}' kind={:?}: {err}",
159 field.name(),
160 field.kind(),
161 ))
162 })
163}
164
165#[derive(Clone, Copy, Debug)]
175pub enum ScalarValueRef<'a> {
176 Blob(&'a [u8]),
177 Bool(bool),
178 Date(Date),
179 Duration(Duration),
180 Float32(Float32),
181 Float64(Float64),
182 Int(i64),
183 Principal(Principal),
184 Subaccount(Subaccount),
185 Text(&'a str),
186 Timestamp(Timestamp),
187 Uint(u64),
188 Ulid(Ulid),
189 Unit,
190}
191
192impl ScalarValueRef<'_> {
193 #[must_use]
195 pub fn into_value(self) -> Value {
196 match self {
197 Self::Blob(value) => Value::Blob(value.to_vec()),
198 Self::Bool(value) => Value::Bool(value),
199 Self::Date(value) => Value::Date(value),
200 Self::Duration(value) => Value::Duration(value),
201 Self::Float32(value) => Value::Float32(value),
202 Self::Float64(value) => Value::Float64(value),
203 Self::Int(value) => Value::Int(value),
204 Self::Principal(value) => Value::Principal(value),
205 Self::Subaccount(value) => Value::Subaccount(value),
206 Self::Text(value) => Value::Text(value.to_owned()),
207 Self::Timestamp(value) => Value::Timestamp(value),
208 Self::Uint(value) => Value::Uint(value),
209 Self::Ulid(value) => Value::Ulid(value),
210 Self::Unit => Value::Unit,
211 }
212 }
213}
214
215#[derive(Clone, Copy, Debug)]
225pub enum ScalarSlotValueRef<'a> {
226 Null,
227 Value(ScalarValueRef<'a>),
228}
229
230pub trait PersistedScalar: Sized {
240 const CODEC: ScalarCodec;
242
243 fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError>;
245
246 fn decode_scalar_payload(bytes: &[u8], field_name: &'static str)
248 -> Result<Self, InternalError>;
249}
250
251pub fn encode_persisted_slot_payload<T>(
253 value: &T,
254 field_name: &'static str,
255) -> Result<Vec<u8>, InternalError>
256where
257 T: serde::Serialize,
258{
259 serialize(value).map_err(|err| {
260 InternalError::serialize_internal(format!(
261 "row encode failed for field '{field_name}': {err}",
262 ))
263 })
264}
265
266pub fn encode_persisted_scalar_slot_payload<T>(
268 value: &T,
269 field_name: &'static str,
270) -> Result<Vec<u8>, InternalError>
271where
272 T: PersistedScalar,
273{
274 let payload = value.encode_scalar_payload()?;
275 let mut encoded = Vec::with_capacity(payload.len() + 2);
276 encoded.push(SCALAR_SLOT_PREFIX);
277 encoded.push(SCALAR_SLOT_TAG_VALUE);
278 encoded.extend_from_slice(&payload);
279
280 if encoded.len() < 2 {
281 return Err(InternalError::serialize_internal(format!(
282 "row encode failed for field '{field_name}': scalar payload envelope underflow",
283 )));
284 }
285
286 Ok(encoded)
287}
288
289pub fn encode_persisted_option_scalar_slot_payload<T>(
291 value: &Option<T>,
292 field_name: &'static str,
293) -> Result<Vec<u8>, InternalError>
294where
295 T: PersistedScalar,
296{
297 match value {
298 Some(value) => encode_persisted_scalar_slot_payload(value, field_name),
299 None => Ok(vec![SCALAR_SLOT_PREFIX, SCALAR_SLOT_TAG_NULL]),
300 }
301}
302
303pub fn decode_persisted_slot_payload<T>(
305 bytes: &[u8],
306 field_name: &'static str,
307) -> Result<T, InternalError>
308where
309 T: serde::de::DeserializeOwned,
310{
311 deserialize(bytes).map_err(|err| {
312 InternalError::serialize_corruption(format!(
313 "row decode failed for field '{field_name}': {err}",
314 ))
315 })
316}
317
318pub fn decode_persisted_scalar_slot_payload<T>(
320 bytes: &[u8],
321 field_name: &'static str,
322) -> Result<T, InternalError>
323where
324 T: PersistedScalar,
325{
326 let payload = decode_scalar_slot_payload_body(bytes, field_name)?.ok_or_else(|| {
327 InternalError::serialize_corruption(format!(
328 "row decode failed for field '{field_name}': unexpected null for non-nullable scalar field",
329 ))
330 })?;
331
332 T::decode_scalar_payload(payload, field_name)
333}
334
335pub fn decode_persisted_option_scalar_slot_payload<T>(
337 bytes: &[u8],
338 field_name: &'static str,
339) -> Result<Option<T>, InternalError>
340where
341 T: PersistedScalar,
342{
343 let Some(payload) = decode_scalar_slot_payload_body(bytes, field_name)? else {
344 return Ok(None);
345 };
346
347 T::decode_scalar_payload(payload, field_name).map(Some)
348}
349
350pub(in crate::db) struct SlotBufferWriter {
358 slots: Vec<Option<Vec<u8>>>,
359}
360
361impl SlotBufferWriter {
362 pub(in crate::db) fn for_model(model: &'static EntityModel) -> Self {
364 Self {
365 slots: vec![None; model.fields().len()],
366 }
367 }
368
369 pub(in crate::db) fn finish(self) -> Result<Vec<u8>, InternalError> {
371 let field_count = u16::try_from(self.slots.len()).map_err(|_| {
372 InternalError::serialize_internal(format!(
373 "row encode failed: field count {} exceeds u16 slot table capacity",
374 self.slots.len(),
375 ))
376 })?;
377 let mut payload_bytes = Vec::new();
378 let mut slot_table = Vec::with_capacity(self.slots.len());
379
380 for slot_payload in self.slots {
382 match slot_payload {
383 Some(bytes) => {
384 let start = u32::try_from(payload_bytes.len()).map_err(|_| {
385 InternalError::serialize_internal(
386 "row encode failed: slot payload start exceeds u32 range",
387 )
388 })?;
389 let len = u32::try_from(bytes.len()).map_err(|_| {
390 InternalError::serialize_internal(
391 "row encode failed: slot payload length exceeds u32 range",
392 )
393 })?;
394 payload_bytes.extend_from_slice(&bytes);
395 slot_table.push((start, len));
396 }
397 None => slot_table.push((0, 0)),
398 }
399 }
400
401 let mut encoded = Vec::with_capacity(
403 usize::from(field_count) * (u32::BITS as usize / 4) + 2 + payload_bytes.len(),
404 );
405 encoded.extend_from_slice(&field_count.to_be_bytes());
406 for (start, len) in slot_table {
407 encoded.extend_from_slice(&start.to_be_bytes());
408 encoded.extend_from_slice(&len.to_be_bytes());
409 }
410 encoded.extend_from_slice(&payload_bytes);
411
412 Ok(encoded)
413 }
414}
415
416impl SlotWriter for SlotBufferWriter {
417 fn write_slot(&mut self, slot: usize, payload: Option<&[u8]>) -> Result<(), InternalError> {
418 let entry = self.slots.get_mut(slot).ok_or_else(|| {
419 InternalError::serialize_internal(format!(
420 "row encode failed: slot {slot} is outside the row layout",
421 ))
422 })?;
423 *entry = payload.map(<[u8]>::to_vec);
424
425 Ok(())
426 }
427}
428
429pub(in crate::db) fn encode_persisted_row<E>(entity: &E) -> Result<Vec<u8>, InternalError>
431where
432 E: PersistedRow,
433{
434 let mut writer = SlotBufferWriter::for_model(E::MODEL);
435 entity.write_slots(&mut writer)?;
436 writer.finish()
437}
438
439pub(in crate::db) struct StructuralSlotReader<'a> {
449 model: &'static EntityModel,
450 field_bytes: StructuralRowFieldBytes<'a>,
451 cached_values: Vec<CachedSlotValue>,
452}
453
454impl<'a> StructuralSlotReader<'a> {
455 pub(in crate::db) fn from_raw_row(
457 raw_row: &'a RawRow,
458 model: &'static EntityModel,
459 ) -> Result<Self, InternalError> {
460 let field_bytes = StructuralRowFieldBytes::from_raw_row(raw_row, model)
461 .map_err(StructuralRowDecodeError::into_internal_error)?;
462 let cached_values = std::iter::repeat_with(|| CachedSlotValue::Pending)
463 .take(model.fields().len())
464 .collect();
465
466 Ok(Self {
467 model,
468 field_bytes,
469 cached_values,
470 })
471 }
472
473 pub(in crate::db) fn validate_storage_key(
475 &self,
476 data_key: &DataKey,
477 ) -> Result<(), InternalError> {
478 let Some(primary_key_slot) = resolve_primary_key_slot(self.model) else {
479 return Err(InternalError::index_invariant(format!(
480 "entity primary key field missing during structural row validation: {}",
481 self.model.path(),
482 )));
483 };
484 let field = self.field_model(primary_key_slot)?;
485 let decoded_key = match self.get_scalar(primary_key_slot)? {
486 Some(ScalarSlotValueRef::Null) => None,
487 Some(ScalarSlotValueRef::Value(value)) => storage_key_from_scalar_ref(value),
488 None => match self.field_bytes.field(primary_key_slot) {
489 Some(raw_value) => Some(
490 decode_storage_key_field_bytes(raw_value, field.kind).map_err(|err| {
491 InternalError::serialize_corruption(format!(
492 "row decode failed: primary-key value is not storage-key encodable: {data_key} ({err})",
493 ))
494 })?,
495 ),
496 None => None,
497 },
498 };
499 let Some(decoded_key) = decoded_key else {
500 return Err(InternalError::serialize_corruption(format!(
501 "row decode failed: missing primary-key slot while validating {data_key}",
502 )));
503 };
504 let expected_key = data_key.storage_key();
505
506 if decoded_key != expected_key {
507 return Err(InternalError::store_corruption(format!(
508 "row key mismatch: expected {expected_key}, found {decoded_key}",
509 )));
510 }
511
512 Ok(())
513 }
514
515 fn field_model(&self, slot: usize) -> Result<&FieldModel, InternalError> {
517 self.model.fields().get(slot).ok_or_else(|| {
518 InternalError::index_invariant(format!(
519 "slot lookup outside model bounds during structural row access: model='{}' slot={slot}",
520 self.model.path(),
521 ))
522 })
523 }
524}
525
526const fn storage_key_from_scalar_ref(value: ScalarValueRef<'_>) -> Option<StorageKey> {
529 match value {
530 ScalarValueRef::Int(value) => Some(StorageKey::Int(value)),
531 ScalarValueRef::Principal(value) => Some(StorageKey::Principal(value)),
532 ScalarValueRef::Subaccount(value) => Some(StorageKey::Subaccount(value)),
533 ScalarValueRef::Timestamp(value) => Some(StorageKey::Timestamp(value)),
534 ScalarValueRef::Uint(value) => Some(StorageKey::Uint(value)),
535 ScalarValueRef::Ulid(value) => Some(StorageKey::Ulid(value)),
536 ScalarValueRef::Unit => Some(StorageKey::Unit),
537 _ => None,
538 }
539}
540
541impl SlotReader for StructuralSlotReader<'_> {
542 fn model(&self) -> &'static EntityModel {
543 self.model
544 }
545
546 fn has(&self, slot: usize) -> bool {
547 self.field_bytes.field(slot).is_some()
548 }
549
550 fn get_bytes(&self, slot: usize) -> Option<&[u8]> {
551 self.field_bytes.field(slot)
552 }
553
554 fn get_scalar(&self, slot: usize) -> Result<Option<ScalarSlotValueRef<'_>>, InternalError> {
555 let field = self.field_model(slot)?;
556 let Some(raw_value) = self.field_bytes.field(slot) else {
557 return Ok(None);
558 };
559
560 match field.leaf_codec() {
561 LeafCodec::Scalar(codec) => {
562 decode_scalar_slot_value(raw_value, codec, field.name()).map(Some)
563 }
564 LeafCodec::CborFallback => Ok(None),
565 }
566 }
567
568 fn get_value(&mut self, slot: usize) -> Result<Option<Value>, InternalError> {
569 let cached = self.cached_values.get(slot).ok_or_else(|| {
570 InternalError::index_invariant(format!(
571 "slot cache lookup outside model bounds during structural row access: model='{}' slot={slot}",
572 self.model.path(),
573 ))
574 })?;
575 if let CachedSlotValue::Decoded(value) = cached {
576 return Ok(value.clone());
577 }
578
579 let field = self.field_model(slot)?;
580 let value = match self.get_scalar(slot)? {
581 Some(ScalarSlotValueRef::Null) => Some(Value::Null),
582 Some(ScalarSlotValueRef::Value(value)) => Some(value.into_value()),
583 None => match self.field_bytes.field(slot) {
584 Some(raw_value) => Some(decode_non_scalar_slot_value(raw_value, field)?),
585 None => None,
586 },
587 };
588 self.cached_values[slot] = CachedSlotValue::Decoded(value.clone());
589
590 Ok(value)
591 }
592}
593
594enum CachedSlotValue {
602 Pending,
603 Decoded(Option<Value>),
604}
605
606fn encode_scalar_slot_value(value: ScalarSlotValueRef<'_>) -> Vec<u8> {
608 match value {
609 ScalarSlotValueRef::Null => vec![SCALAR_SLOT_PREFIX, SCALAR_SLOT_TAG_NULL],
610 ScalarSlotValueRef::Value(value) => {
611 let mut encoded = Vec::new();
612 encoded.push(SCALAR_SLOT_PREFIX);
613 encoded.push(SCALAR_SLOT_TAG_VALUE);
614
615 match value {
616 ScalarValueRef::Blob(bytes) => encoded.extend_from_slice(bytes),
617 ScalarValueRef::Bool(value) => encoded.push(u8::from(value)),
618 ScalarValueRef::Date(value) => {
619 encoded.extend_from_slice(&value.as_days_since_epoch().to_le_bytes());
620 }
621 ScalarValueRef::Duration(value) => {
622 encoded.extend_from_slice(&value.as_millis().to_le_bytes());
623 }
624 ScalarValueRef::Float32(value) => {
625 encoded.extend_from_slice(&value.get().to_bits().to_le_bytes());
626 }
627 ScalarValueRef::Float64(value) => {
628 encoded.extend_from_slice(&value.get().to_bits().to_le_bytes());
629 }
630 ScalarValueRef::Int(value) => encoded.extend_from_slice(&value.to_le_bytes()),
631 ScalarValueRef::Principal(value) => encoded.extend_from_slice(value.as_slice()),
632 ScalarValueRef::Subaccount(value) => encoded.extend_from_slice(&value.to_bytes()),
633 ScalarValueRef::Text(value) => encoded.extend_from_slice(value.as_bytes()),
634 ScalarValueRef::Timestamp(value) => {
635 encoded.extend_from_slice(&value.as_millis().to_le_bytes());
636 }
637 ScalarValueRef::Uint(value) => encoded.extend_from_slice(&value.to_le_bytes()),
638 ScalarValueRef::Ulid(value) => encoded.extend_from_slice(&value.to_bytes()),
639 ScalarValueRef::Unit => {}
640 }
641
642 encoded
643 }
644 }
645}
646
647fn decode_scalar_slot_payload_body<'a>(
649 bytes: &'a [u8],
650 field_name: &'static str,
651) -> Result<Option<&'a [u8]>, InternalError> {
652 let Some((&prefix, rest)) = bytes.split_first() else {
653 return Err(InternalError::serialize_corruption(format!(
654 "row decode failed for field '{field_name}': empty scalar payload",
655 )));
656 };
657 if prefix != SCALAR_SLOT_PREFIX {
658 return Err(InternalError::serialize_corruption(format!(
659 "row decode failed for field '{field_name}': scalar payload prefix mismatch",
660 )));
661 }
662 let Some((&tag, payload)) = rest.split_first() else {
663 return Err(InternalError::serialize_corruption(format!(
664 "row decode failed for field '{field_name}': truncated scalar payload tag",
665 )));
666 };
667
668 match tag {
669 SCALAR_SLOT_TAG_NULL => {
670 if !payload.is_empty() {
671 return Err(InternalError::serialize_corruption(format!(
672 "row decode failed for field '{field_name}': null scalar payload has trailing bytes",
673 )));
674 }
675
676 Ok(None)
677 }
678 SCALAR_SLOT_TAG_VALUE => Ok(Some(payload)),
679 _ => Err(InternalError::serialize_corruption(format!(
680 "row decode failed for field '{field_name}': invalid scalar payload tag {tag}",
681 ))),
682 }
683}
684
685#[expect(clippy::too_many_lines)]
687fn decode_scalar_slot_value<'a>(
688 bytes: &'a [u8],
689 codec: ScalarCodec,
690 field_name: &'static str,
691) -> Result<ScalarSlotValueRef<'a>, InternalError> {
692 let Some(payload) = decode_scalar_slot_payload_body(bytes, field_name)? else {
693 return Ok(ScalarSlotValueRef::Null);
694 };
695
696 let value = match codec {
697 ScalarCodec::Blob => ScalarValueRef::Blob(payload),
698 ScalarCodec::Bool => {
699 let [value] = payload else {
700 return Err(InternalError::serialize_corruption(format!(
701 "row decode failed for field '{field_name}': bool payload must be exactly {SCALAR_BOOL_PAYLOAD_LEN} byte",
702 )));
703 };
704 match *value {
705 SCALAR_BOOL_FALSE_TAG => ScalarValueRef::Bool(false),
706 SCALAR_BOOL_TRUE_TAG => ScalarValueRef::Bool(true),
707 _ => {
708 return Err(InternalError::serialize_corruption(format!(
709 "row decode failed for field '{field_name}': invalid bool payload byte {value}",
710 )));
711 }
712 }
713 }
714 ScalarCodec::Date => {
715 let bytes: [u8; SCALAR_WORD32_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
716 InternalError::serialize_corruption(format!(
717 "row decode failed for field '{field_name}': date payload must be exactly {SCALAR_WORD32_PAYLOAD_LEN} bytes",
718 ))
719 })?;
720 ScalarValueRef::Date(Date::from_days_since_epoch(i32::from_le_bytes(bytes)))
721 }
722 ScalarCodec::Duration => {
723 let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
724 InternalError::serialize_corruption(format!(
725 "row decode failed for field '{field_name}': duration payload must be exactly {SCALAR_WORD64_PAYLOAD_LEN} bytes",
726 ))
727 })?;
728 ScalarValueRef::Duration(Duration::from_millis(u64::from_le_bytes(bytes)))
729 }
730 ScalarCodec::Float32 => {
731 let bytes: [u8; SCALAR_WORD32_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
732 InternalError::serialize_corruption(format!(
733 "row decode failed for field '{field_name}': float32 payload must be exactly {SCALAR_WORD32_PAYLOAD_LEN} bytes",
734 ))
735 })?;
736 let value = f32::from_bits(u32::from_le_bytes(bytes));
737 let value = Float32::try_new(value).ok_or_else(|| {
738 InternalError::serialize_corruption(format!(
739 "row decode failed for field '{field_name}': float32 payload is non-finite",
740 ))
741 })?;
742 ScalarValueRef::Float32(value)
743 }
744 ScalarCodec::Float64 => {
745 let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
746 InternalError::serialize_corruption(format!(
747 "row decode failed for field '{field_name}': float64 payload must be exactly {SCALAR_WORD64_PAYLOAD_LEN} bytes",
748 ))
749 })?;
750 let value = f64::from_bits(u64::from_le_bytes(bytes));
751 let value = Float64::try_new(value).ok_or_else(|| {
752 InternalError::serialize_corruption(format!(
753 "row decode failed for field '{field_name}': float64 payload is non-finite",
754 ))
755 })?;
756 ScalarValueRef::Float64(value)
757 }
758 ScalarCodec::Int64 => {
759 let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
760 InternalError::serialize_corruption(format!(
761 "row decode failed for field '{field_name}': int payload must be exactly {SCALAR_WORD64_PAYLOAD_LEN} bytes",
762 ))
763 })?;
764 ScalarValueRef::Int(i64::from_le_bytes(bytes))
765 }
766 ScalarCodec::Principal => {
767 ScalarValueRef::Principal(Principal::try_from_bytes(payload).map_err(|err| {
768 InternalError::serialize_corruption(format!(
769 "row decode failed for field '{field_name}': {err}",
770 ))
771 })?)
772 }
773 ScalarCodec::Subaccount => {
774 let bytes: [u8; SCALAR_SUBACCOUNT_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
775 InternalError::serialize_corruption(format!(
776 "row decode failed for field '{field_name}': subaccount payload must be exactly {SCALAR_SUBACCOUNT_PAYLOAD_LEN} bytes",
777 ))
778 })?;
779 ScalarValueRef::Subaccount(Subaccount::from_array(bytes))
780 }
781 ScalarCodec::Text => {
782 let value = str::from_utf8(payload).map_err(|err| {
783 InternalError::serialize_corruption(format!(
784 "row decode failed for field '{field_name}': invalid UTF-8 text payload ({err})",
785 ))
786 })?;
787 ScalarValueRef::Text(value)
788 }
789 ScalarCodec::Timestamp => {
790 let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
791 InternalError::serialize_corruption(format!(
792 "row decode failed for field '{field_name}': timestamp payload must be exactly {SCALAR_WORD64_PAYLOAD_LEN} bytes",
793 ))
794 })?;
795 ScalarValueRef::Timestamp(Timestamp::from_millis(i64::from_le_bytes(bytes)))
796 }
797 ScalarCodec::Uint64 => {
798 let bytes: [u8; SCALAR_WORD64_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
799 InternalError::serialize_corruption(format!(
800 "row decode failed for field '{field_name}': uint payload must be exactly {SCALAR_WORD64_PAYLOAD_LEN} bytes",
801 ))
802 })?;
803 ScalarValueRef::Uint(u64::from_le_bytes(bytes))
804 }
805 ScalarCodec::Ulid => {
806 let bytes: [u8; SCALAR_ULID_PAYLOAD_LEN] = payload.try_into().map_err(|_| {
807 InternalError::serialize_corruption(format!(
808 "row decode failed for field '{field_name}': ulid payload must be exactly {SCALAR_ULID_PAYLOAD_LEN} bytes",
809 ))
810 })?;
811 ScalarValueRef::Ulid(Ulid::from_bytes(bytes))
812 }
813 ScalarCodec::Unit => {
814 if !payload.is_empty() {
815 return Err(InternalError::serialize_corruption(format!(
816 "row decode failed for field '{field_name}': unit payload must be empty",
817 )));
818 }
819 ScalarValueRef::Unit
820 }
821 };
822
823 Ok(ScalarSlotValueRef::Value(value))
824}
825
826macro_rules! impl_persisted_scalar_signed {
827 ($($ty:ty),* $(,)?) => {
828 $(
829 impl PersistedScalar for $ty {
830 const CODEC: ScalarCodec = ScalarCodec::Int64;
831
832 fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
833 Ok(i64::from(*self).to_le_bytes().to_vec())
834 }
835
836 fn decode_scalar_payload(
837 bytes: &[u8],
838 field_name: &'static str,
839 ) -> Result<Self, InternalError> {
840 let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
841 InternalError::serialize_corruption(format!(
842 "row decode failed for field '{field_name}': int payload must be exactly {SCALAR_WORD64_PAYLOAD_LEN} bytes",
843 ))
844 })?;
845 <$ty>::try_from(i64::from_le_bytes(raw)).map_err(|_| {
846 InternalError::serialize_corruption(format!(
847 "row decode failed for field '{field_name}': integer payload out of range for target type",
848 ))
849 })
850 }
851 }
852 )*
853 };
854}
855
856macro_rules! impl_persisted_scalar_unsigned {
857 ($($ty:ty),* $(,)?) => {
858 $(
859 impl PersistedScalar for $ty {
860 const CODEC: ScalarCodec = ScalarCodec::Uint64;
861
862 fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
863 Ok(u64::from(*self).to_le_bytes().to_vec())
864 }
865
866 fn decode_scalar_payload(
867 bytes: &[u8],
868 field_name: &'static str,
869 ) -> Result<Self, InternalError> {
870 let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
871 InternalError::serialize_corruption(format!(
872 "row decode failed for field '{field_name}': uint payload must be exactly {SCALAR_WORD64_PAYLOAD_LEN} bytes",
873 ))
874 })?;
875 <$ty>::try_from(u64::from_le_bytes(raw)).map_err(|_| {
876 InternalError::serialize_corruption(format!(
877 "row decode failed for field '{field_name}': unsigned payload out of range for target type",
878 ))
879 })
880 }
881 }
882 )*
883 };
884}
885
886impl_persisted_scalar_signed!(i8, i16, i32, i64);
887impl_persisted_scalar_unsigned!(u8, u16, u32, u64);
888
889impl PersistedScalar for bool {
890 const CODEC: ScalarCodec = ScalarCodec::Bool;
891
892 fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
893 Ok(vec![u8::from(*self)])
894 }
895
896 fn decode_scalar_payload(
897 bytes: &[u8],
898 field_name: &'static str,
899 ) -> Result<Self, InternalError> {
900 let [value] = bytes else {
901 return Err(InternalError::serialize_corruption(format!(
902 "row decode failed for field '{field_name}': bool payload must be exactly {SCALAR_BOOL_PAYLOAD_LEN} byte",
903 )));
904 };
905
906 match *value {
907 SCALAR_BOOL_FALSE_TAG => Ok(false),
908 SCALAR_BOOL_TRUE_TAG => Ok(true),
909 _ => Err(InternalError::serialize_corruption(format!(
910 "row decode failed for field '{field_name}': invalid bool payload byte {value}",
911 ))),
912 }
913 }
914}
915
916impl PersistedScalar for String {
917 const CODEC: ScalarCodec = ScalarCodec::Text;
918
919 fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
920 Ok(self.as_bytes().to_vec())
921 }
922
923 fn decode_scalar_payload(
924 bytes: &[u8],
925 field_name: &'static str,
926 ) -> Result<Self, InternalError> {
927 str::from_utf8(bytes).map(str::to_owned).map_err(|err| {
928 InternalError::serialize_corruption(format!(
929 "row decode failed for field '{field_name}': invalid UTF-8 text payload ({err})",
930 ))
931 })
932 }
933}
934
935impl PersistedScalar for Vec<u8> {
936 const CODEC: ScalarCodec = ScalarCodec::Blob;
937
938 fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
939 Ok(self.clone())
940 }
941
942 fn decode_scalar_payload(
943 bytes: &[u8],
944 _field_name: &'static str,
945 ) -> Result<Self, InternalError> {
946 Ok(bytes.to_vec())
947 }
948}
949
950impl PersistedScalar for Blob {
951 const CODEC: ScalarCodec = ScalarCodec::Blob;
952
953 fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
954 Ok(self.to_vec())
955 }
956
957 fn decode_scalar_payload(
958 bytes: &[u8],
959 _field_name: &'static str,
960 ) -> Result<Self, InternalError> {
961 Ok(Self::from(bytes))
962 }
963}
964
965impl PersistedScalar for Ulid {
966 const CODEC: ScalarCodec = ScalarCodec::Ulid;
967
968 fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
969 Ok(self.to_bytes().to_vec())
970 }
971
972 fn decode_scalar_payload(
973 bytes: &[u8],
974 field_name: &'static str,
975 ) -> Result<Self, InternalError> {
976 Self::try_from_bytes(bytes).map_err(|err| {
977 InternalError::serialize_corruption(format!(
978 "row decode failed for field '{field_name}': {err}",
979 ))
980 })
981 }
982}
983
984impl PersistedScalar for Timestamp {
985 const CODEC: ScalarCodec = ScalarCodec::Timestamp;
986
987 fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
988 Ok(self.as_millis().to_le_bytes().to_vec())
989 }
990
991 fn decode_scalar_payload(
992 bytes: &[u8],
993 field_name: &'static str,
994 ) -> Result<Self, InternalError> {
995 let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
996 InternalError::serialize_corruption(format!(
997 "row decode failed for field '{field_name}': timestamp payload must be exactly {SCALAR_WORD64_PAYLOAD_LEN} bytes",
998 ))
999 })?;
1000
1001 Ok(Self::from_millis(i64::from_le_bytes(raw)))
1002 }
1003}
1004
1005impl PersistedScalar for Date {
1006 const CODEC: ScalarCodec = ScalarCodec::Date;
1007
1008 fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1009 Ok(self.as_days_since_epoch().to_le_bytes().to_vec())
1010 }
1011
1012 fn decode_scalar_payload(
1013 bytes: &[u8],
1014 field_name: &'static str,
1015 ) -> Result<Self, InternalError> {
1016 let raw: [u8; SCALAR_WORD32_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
1017 InternalError::serialize_corruption(format!(
1018 "row decode failed for field '{field_name}': date payload must be exactly {SCALAR_WORD32_PAYLOAD_LEN} bytes",
1019 ))
1020 })?;
1021
1022 Ok(Self::from_days_since_epoch(i32::from_le_bytes(raw)))
1023 }
1024}
1025
1026impl PersistedScalar for Duration {
1027 const CODEC: ScalarCodec = ScalarCodec::Duration;
1028
1029 fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1030 Ok(self.as_millis().to_le_bytes().to_vec())
1031 }
1032
1033 fn decode_scalar_payload(
1034 bytes: &[u8],
1035 field_name: &'static str,
1036 ) -> Result<Self, InternalError> {
1037 let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
1038 InternalError::serialize_corruption(format!(
1039 "row decode failed for field '{field_name}': duration payload must be exactly {SCALAR_WORD64_PAYLOAD_LEN} bytes",
1040 ))
1041 })?;
1042
1043 Ok(Self::from_millis(u64::from_le_bytes(raw)))
1044 }
1045}
1046
1047impl PersistedScalar for Float32 {
1048 const CODEC: ScalarCodec = ScalarCodec::Float32;
1049
1050 fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1051 Ok(self.get().to_bits().to_le_bytes().to_vec())
1052 }
1053
1054 fn decode_scalar_payload(
1055 bytes: &[u8],
1056 field_name: &'static str,
1057 ) -> Result<Self, InternalError> {
1058 let raw: [u8; SCALAR_WORD32_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
1059 InternalError::serialize_corruption(format!(
1060 "row decode failed for field '{field_name}': float32 payload must be exactly {SCALAR_WORD32_PAYLOAD_LEN} bytes",
1061 ))
1062 })?;
1063 let value = f32::from_bits(u32::from_le_bytes(raw));
1064
1065 Self::try_new(value).ok_or_else(|| {
1066 InternalError::serialize_corruption(format!(
1067 "row decode failed for field '{field_name}': float32 payload is non-finite",
1068 ))
1069 })
1070 }
1071}
1072
1073impl PersistedScalar for Float64 {
1074 const CODEC: ScalarCodec = ScalarCodec::Float64;
1075
1076 fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1077 Ok(self.get().to_bits().to_le_bytes().to_vec())
1078 }
1079
1080 fn decode_scalar_payload(
1081 bytes: &[u8],
1082 field_name: &'static str,
1083 ) -> Result<Self, InternalError> {
1084 let raw: [u8; SCALAR_WORD64_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
1085 InternalError::serialize_corruption(format!(
1086 "row decode failed for field '{field_name}': float64 payload must be exactly {SCALAR_WORD64_PAYLOAD_LEN} bytes",
1087 ))
1088 })?;
1089 let value = f64::from_bits(u64::from_le_bytes(raw));
1090
1091 Self::try_new(value).ok_or_else(|| {
1092 InternalError::serialize_corruption(format!(
1093 "row decode failed for field '{field_name}': float64 payload is non-finite",
1094 ))
1095 })
1096 }
1097}
1098
1099impl PersistedScalar for Principal {
1100 const CODEC: ScalarCodec = ScalarCodec::Principal;
1101
1102 fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1103 self.to_bytes().map_err(|err| {
1104 InternalError::serialize_internal(format!(
1105 "row encode failed for principal field: {err}",
1106 ))
1107 })
1108 }
1109
1110 fn decode_scalar_payload(
1111 bytes: &[u8],
1112 field_name: &'static str,
1113 ) -> Result<Self, InternalError> {
1114 Self::try_from_bytes(bytes).map_err(|err| {
1115 InternalError::serialize_corruption(format!(
1116 "row decode failed for field '{field_name}': {err}",
1117 ))
1118 })
1119 }
1120}
1121
1122impl PersistedScalar for Subaccount {
1123 const CODEC: ScalarCodec = ScalarCodec::Subaccount;
1124
1125 fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1126 Ok(self.to_bytes().to_vec())
1127 }
1128
1129 fn decode_scalar_payload(
1130 bytes: &[u8],
1131 field_name: &'static str,
1132 ) -> Result<Self, InternalError> {
1133 let raw: [u8; SCALAR_SUBACCOUNT_PAYLOAD_LEN] = bytes.try_into().map_err(|_| {
1134 InternalError::serialize_corruption(format!(
1135 "row decode failed for field '{field_name}': subaccount payload must be exactly {SCALAR_SUBACCOUNT_PAYLOAD_LEN} bytes",
1136 ))
1137 })?;
1138
1139 Ok(Self::from_array(raw))
1140 }
1141}
1142
1143impl PersistedScalar for () {
1144 const CODEC: ScalarCodec = ScalarCodec::Unit;
1145
1146 fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1147 Ok(Vec::new())
1148 }
1149
1150 fn decode_scalar_payload(
1151 bytes: &[u8],
1152 field_name: &'static str,
1153 ) -> Result<Self, InternalError> {
1154 if !bytes.is_empty() {
1155 return Err(InternalError::serialize_corruption(format!(
1156 "row decode failed for field '{field_name}': unit payload must be empty",
1157 )));
1158 }
1159
1160 Ok(())
1161 }
1162}