1use crate::{
8 db::data::{
9 DataKey, RawRow, StructuralRowDecodeError, StructuralRowFieldBytes,
10 decode_structural_field_by_kind_bytes, decode_structural_value_storage_bytes,
11 },
12 error::InternalError,
13 model::{
14 entity::{EntityModel, resolve_primary_key_slot},
15 field::{FieldModel, LeafCodec, ScalarCodec},
16 },
17 serialize::{deserialize, serialize},
18 traits::EntityKind,
19 types::{Blob, Date, Duration, Float32, Float64, Principal, Subaccount, Timestamp, Ulid},
20 value::{StorageKey, Value},
21};
22use std::str;
23
24const SCALAR_SLOT_PREFIX: u8 = 0xFF;
25const SCALAR_SLOT_TAG_NULL: u8 = 0;
26const SCALAR_SLOT_TAG_VALUE: u8 = 1;
27
28pub trait SlotReader {
37 fn model(&self) -> &'static EntityModel;
39
40 fn has(&self, slot: usize) -> bool;
42
43 fn get_bytes(&self, slot: usize) -> Option<&[u8]>;
45
46 fn get_scalar(&self, slot: usize) -> Result<Option<ScalarSlotValueRef<'_>>, InternalError>;
48
49 fn get_value(&mut self, slot: usize) -> Result<Option<Value>, InternalError>;
51}
52
53pub trait SlotWriter {
61 fn write_slot(&mut self, slot: usize, payload: Option<&[u8]>) -> Result<(), InternalError>;
63
64 fn write_scalar(
66 &mut self,
67 slot: usize,
68 value: ScalarSlotValueRef<'_>,
69 ) -> Result<(), InternalError> {
70 let payload = encode_scalar_slot_value(value);
71
72 self.write_slot(slot, Some(payload.as_slice()))
73 }
74}
75
76pub trait PersistedRow: EntityKind + Sized {
86 fn materialize_from_slots(slots: &mut dyn SlotReader) -> Result<Self, InternalError>;
88
89 fn write_slots(&self, out: &mut dyn SlotWriter) -> Result<(), InternalError>;
91
92 fn project_slot(
94 slots: &mut dyn SlotReader,
95 slot: usize,
96 ) -> Result<Option<Value>, InternalError>;
97}
98
99pub(in crate::db) fn decode_slot_value_by_contract(
102 slots: &dyn SlotReader,
103 slot: usize,
104) -> Result<Option<Value>, InternalError> {
105 let field = slots.model().fields().get(slot).ok_or_else(|| {
106 InternalError::index_invariant(format!(
107 "slot lookup outside model bounds during structural row access: model='{}' slot={slot}",
108 slots.model().path(),
109 ))
110 })?;
111
112 if matches!(field.leaf_codec(), LeafCodec::Scalar(_))
113 && let Some(value) = slots.get_scalar(slot)?
114 {
115 return Ok(Some(match value {
116 ScalarSlotValueRef::Null => Value::Null,
117 ScalarSlotValueRef::Value(value) => value.into_value(),
118 }));
119 }
120
121 match slots.get_bytes(slot) {
122 Some(raw_value) => decode_non_scalar_slot_value(raw_value, field).map(Some),
123 None => Ok(None),
124 }
125}
126
127fn decode_non_scalar_slot_value(
130 raw_value: &[u8],
131 field: &FieldModel,
132) -> Result<Value, InternalError> {
133 let decoded = match field.storage_decode() {
134 crate::model::field::FieldStorageDecode::ByKind => {
135 decode_structural_field_by_kind_bytes(raw_value, field.kind())
136 }
137 crate::model::field::FieldStorageDecode::Value => {
138 decode_structural_value_storage_bytes(raw_value)
139 }
140 };
141
142 decoded.map_err(|err| {
143 InternalError::serialize_corruption(format!(
144 "row decode failed for field '{}' kind={:?}: {err}",
145 field.name(),
146 field.kind(),
147 ))
148 })
149}
150
151#[derive(Clone, Copy, Debug)]
161pub enum ScalarValueRef<'a> {
162 Blob(&'a [u8]),
163 Bool(bool),
164 Date(Date),
165 Duration(Duration),
166 Float32(Float32),
167 Float64(Float64),
168 Int(i64),
169 Principal(Principal),
170 Subaccount(Subaccount),
171 Text(&'a str),
172 Timestamp(Timestamp),
173 Uint(u64),
174 Ulid(Ulid),
175 Unit,
176}
177
178impl ScalarValueRef<'_> {
179 #[must_use]
181 pub fn into_value(self) -> Value {
182 match self {
183 Self::Blob(value) => Value::Blob(value.to_vec()),
184 Self::Bool(value) => Value::Bool(value),
185 Self::Date(value) => Value::Date(value),
186 Self::Duration(value) => Value::Duration(value),
187 Self::Float32(value) => Value::Float32(value),
188 Self::Float64(value) => Value::Float64(value),
189 Self::Int(value) => Value::Int(value),
190 Self::Principal(value) => Value::Principal(value),
191 Self::Subaccount(value) => Value::Subaccount(value),
192 Self::Text(value) => Value::Text(value.to_owned()),
193 Self::Timestamp(value) => Value::Timestamp(value),
194 Self::Uint(value) => Value::Uint(value),
195 Self::Ulid(value) => Value::Ulid(value),
196 Self::Unit => Value::Unit,
197 }
198 }
199}
200
201#[derive(Clone, Copy, Debug)]
211pub enum ScalarSlotValueRef<'a> {
212 Null,
213 Value(ScalarValueRef<'a>),
214}
215
216pub trait PersistedScalar: Sized {
226 const CODEC: ScalarCodec;
228
229 fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError>;
231
232 fn decode_scalar_payload(bytes: &[u8], field_name: &'static str)
234 -> Result<Self, InternalError>;
235}
236
237pub fn encode_persisted_slot_payload<T>(
239 value: &T,
240 field_name: &'static str,
241) -> Result<Vec<u8>, InternalError>
242where
243 T: serde::Serialize,
244{
245 serialize(value).map_err(|err| {
246 InternalError::serialize_internal(format!(
247 "row encode failed for field '{field_name}': {err}",
248 ))
249 })
250}
251
252pub fn encode_persisted_scalar_slot_payload<T>(
254 value: &T,
255 field_name: &'static str,
256) -> Result<Vec<u8>, InternalError>
257where
258 T: PersistedScalar,
259{
260 let payload = value.encode_scalar_payload()?;
261 let mut encoded = Vec::with_capacity(payload.len() + 2);
262 encoded.push(SCALAR_SLOT_PREFIX);
263 encoded.push(SCALAR_SLOT_TAG_VALUE);
264 encoded.extend_from_slice(&payload);
265
266 if encoded.len() < 2 {
267 return Err(InternalError::serialize_internal(format!(
268 "row encode failed for field '{field_name}': scalar payload envelope underflow",
269 )));
270 }
271
272 Ok(encoded)
273}
274
275pub fn encode_persisted_option_scalar_slot_payload<T>(
277 value: &Option<T>,
278 field_name: &'static str,
279) -> Result<Vec<u8>, InternalError>
280where
281 T: PersistedScalar,
282{
283 match value {
284 Some(value) => encode_persisted_scalar_slot_payload(value, field_name),
285 None => Ok(vec![SCALAR_SLOT_PREFIX, SCALAR_SLOT_TAG_NULL]),
286 }
287}
288
289pub fn decode_persisted_slot_payload<T>(
291 bytes: &[u8],
292 field_name: &'static str,
293) -> Result<T, InternalError>
294where
295 T: serde::de::DeserializeOwned,
296{
297 deserialize(bytes).map_err(|err| {
298 InternalError::serialize_corruption(format!(
299 "row decode failed for field '{field_name}': {err}",
300 ))
301 })
302}
303
304pub fn decode_persisted_scalar_slot_payload<T>(
306 bytes: &[u8],
307 field_name: &'static str,
308) -> Result<T, InternalError>
309where
310 T: PersistedScalar,
311{
312 let payload = decode_scalar_slot_payload_body(bytes, field_name)?.ok_or_else(|| {
313 InternalError::serialize_corruption(format!(
314 "row decode failed for field '{field_name}': unexpected null for non-nullable scalar field",
315 ))
316 })?;
317
318 T::decode_scalar_payload(payload, field_name)
319}
320
321pub fn decode_persisted_option_scalar_slot_payload<T>(
323 bytes: &[u8],
324 field_name: &'static str,
325) -> Result<Option<T>, InternalError>
326where
327 T: PersistedScalar,
328{
329 let Some(payload) = decode_scalar_slot_payload_body(bytes, field_name)? else {
330 return Ok(None);
331 };
332
333 T::decode_scalar_payload(payload, field_name).map(Some)
334}
335
336#[must_use]
338pub fn missing_persisted_slot_error(field_name: &'static str) -> InternalError {
339 InternalError::serialize_corruption(format!(
340 "row decode failed: missing required field '{field_name}'",
341 ))
342}
343
344pub(in crate::db) struct SlotBufferWriter {
352 slots: Vec<Option<Vec<u8>>>,
353}
354
355impl SlotBufferWriter {
356 pub(in crate::db) fn for_model(model: &'static EntityModel) -> Self {
358 Self {
359 slots: vec![None; model.fields().len()],
360 }
361 }
362
363 pub(in crate::db) fn finish(self) -> Result<Vec<u8>, InternalError> {
365 let field_count = u16::try_from(self.slots.len()).map_err(|_| {
366 InternalError::serialize_internal(format!(
367 "row encode failed: field count {} exceeds u16 slot table capacity",
368 self.slots.len(),
369 ))
370 })?;
371 let mut payload_bytes = Vec::new();
372 let mut slot_table = Vec::with_capacity(self.slots.len());
373
374 for slot_payload in self.slots {
376 match slot_payload {
377 Some(bytes) => {
378 let start = u32::try_from(payload_bytes.len()).map_err(|_| {
379 InternalError::serialize_internal(
380 "row encode failed: slot payload start exceeds u32 range",
381 )
382 })?;
383 let len = u32::try_from(bytes.len()).map_err(|_| {
384 InternalError::serialize_internal(
385 "row encode failed: slot payload length exceeds u32 range",
386 )
387 })?;
388 payload_bytes.extend_from_slice(&bytes);
389 slot_table.push((start, len));
390 }
391 None => slot_table.push((0, 0)),
392 }
393 }
394
395 let mut encoded = Vec::with_capacity(
397 usize::from(field_count) * (u32::BITS as usize / 4) + 2 + payload_bytes.len(),
398 );
399 encoded.extend_from_slice(&field_count.to_be_bytes());
400 for (start, len) in slot_table {
401 encoded.extend_from_slice(&start.to_be_bytes());
402 encoded.extend_from_slice(&len.to_be_bytes());
403 }
404 encoded.extend_from_slice(&payload_bytes);
405
406 Ok(encoded)
407 }
408}
409
410impl SlotWriter for SlotBufferWriter {
411 fn write_slot(&mut self, slot: usize, payload: Option<&[u8]>) -> Result<(), InternalError> {
412 let entry = self.slots.get_mut(slot).ok_or_else(|| {
413 InternalError::serialize_internal(format!(
414 "row encode failed: slot {slot} is outside the row layout",
415 ))
416 })?;
417 *entry = payload.map(<[u8]>::to_vec);
418
419 Ok(())
420 }
421}
422
423pub(in crate::db) fn encode_persisted_row<E>(entity: &E) -> Result<Vec<u8>, InternalError>
425where
426 E: PersistedRow,
427{
428 let mut writer = SlotBufferWriter::for_model(E::MODEL);
429 entity.write_slots(&mut writer)?;
430 writer.finish()
431}
432
433pub(in crate::db) struct StructuralSlotReader<'a> {
443 model: &'static EntityModel,
444 field_bytes: StructuralRowFieldBytes<'a>,
445 cached_values: Vec<CachedSlotValue>,
446}
447
448impl<'a> StructuralSlotReader<'a> {
449 pub(in crate::db) fn from_raw_row(
451 raw_row: &'a RawRow,
452 model: &'static EntityModel,
453 ) -> Result<Self, InternalError> {
454 let field_bytes =
455 StructuralRowFieldBytes::from_raw_row(raw_row, model).map_err(|err| match err {
456 StructuralRowDecodeError::Deserialize(source) => source,
457 })?;
458 let cached_values = std::iter::repeat_with(|| CachedSlotValue::Pending)
459 .take(model.fields().len())
460 .collect();
461
462 Ok(Self {
463 model,
464 field_bytes,
465 cached_values,
466 })
467 }
468
469 pub(in crate::db) fn validate_storage_key_for_entity<E: EntityKind>(
471 &self,
472 data_key: &DataKey,
473 ) -> Result<(), InternalError> {
474 let Some(primary_key_slot) = resolve_primary_key_slot(E::MODEL) else {
475 return Err(InternalError::index_invariant(format!(
476 "entity primary key field missing during structural row validation: {} field={}",
477 E::PATH,
478 E::PRIMARY_KEY
479 )));
480 };
481 let field = self.field_model(primary_key_slot)?;
482 let primary_key_value = match self.get_scalar(primary_key_slot)? {
483 Some(ScalarSlotValueRef::Null) => Some(Value::Null),
484 Some(ScalarSlotValueRef::Value(value)) => Some(value.into_value()),
485 None => match self.field_bytes.field(primary_key_slot) {
486 Some(raw_value) => Some(decode_non_scalar_slot_value(raw_value, field)?),
487 None => None,
488 },
489 };
490 let Some(primary_key_value) = primary_key_value else {
491 return Err(InternalError::serialize_corruption(format!(
492 "row decode failed: missing primary-key slot while validating {data_key}",
493 )));
494 };
495 let decoded_key = StorageKey::try_from_value(&primary_key_value).map_err(|err| {
496 InternalError::serialize_corruption(format!(
497 "row decode failed: primary-key value is not storage-key encodable: {data_key} ({err})",
498 ))
499 })?;
500 let expected_key = data_key.storage_key();
501
502 if decoded_key != expected_key {
503 return Err(InternalError::store_corruption(format!(
504 "row key mismatch: expected {expected_key}, found {decoded_key}",
505 )));
506 }
507
508 Ok(())
509 }
510
511 fn field_model(&self, slot: usize) -> Result<&FieldModel, InternalError> {
513 self.model.fields().get(slot).ok_or_else(|| {
514 InternalError::index_invariant(format!(
515 "slot lookup outside model bounds during structural row access: model='{}' slot={slot}",
516 self.model.path(),
517 ))
518 })
519 }
520}
521
522impl SlotReader for StructuralSlotReader<'_> {
523 fn model(&self) -> &'static EntityModel {
524 self.model
525 }
526
527 fn has(&self, slot: usize) -> bool {
528 self.field_bytes.field(slot).is_some()
529 }
530
531 fn get_bytes(&self, slot: usize) -> Option<&[u8]> {
532 self.field_bytes.field(slot)
533 }
534
535 fn get_scalar(&self, slot: usize) -> Result<Option<ScalarSlotValueRef<'_>>, InternalError> {
536 let field = self.field_model(slot)?;
537 let Some(raw_value) = self.field_bytes.field(slot) else {
538 return Ok(None);
539 };
540
541 match field.leaf_codec() {
542 LeafCodec::Scalar(codec) => {
543 decode_scalar_slot_value(raw_value, codec, field.name()).map(Some)
544 }
545 LeafCodec::CborFallback => Ok(None),
546 }
547 }
548
549 fn get_value(&mut self, slot: usize) -> Result<Option<Value>, InternalError> {
550 let cached = self.cached_values.get(slot).ok_or_else(|| {
551 InternalError::index_invariant(format!(
552 "slot cache lookup outside model bounds during structural row access: model='{}' slot={slot}",
553 self.model.path(),
554 ))
555 })?;
556 if let CachedSlotValue::Decoded(value) = cached {
557 return Ok(value.clone());
558 }
559
560 let field = self.field_model(slot)?;
561 let value = match self.get_scalar(slot)? {
562 Some(ScalarSlotValueRef::Null) => Some(Value::Null),
563 Some(ScalarSlotValueRef::Value(value)) => Some(value.into_value()),
564 None => match self.field_bytes.field(slot) {
565 Some(raw_value) => Some(decode_non_scalar_slot_value(raw_value, field)?),
566 None => None,
567 },
568 };
569 self.cached_values[slot] = CachedSlotValue::Decoded(value.clone());
570
571 Ok(value)
572 }
573}
574
575enum CachedSlotValue {
583 Pending,
584 Decoded(Option<Value>),
585}
586
587fn encode_scalar_slot_value(value: ScalarSlotValueRef<'_>) -> Vec<u8> {
589 match value {
590 ScalarSlotValueRef::Null => vec![SCALAR_SLOT_PREFIX, SCALAR_SLOT_TAG_NULL],
591 ScalarSlotValueRef::Value(value) => {
592 let mut encoded = Vec::new();
593 encoded.push(SCALAR_SLOT_PREFIX);
594 encoded.push(SCALAR_SLOT_TAG_VALUE);
595
596 match value {
597 ScalarValueRef::Blob(bytes) => encoded.extend_from_slice(bytes),
598 ScalarValueRef::Bool(value) => encoded.push(u8::from(value)),
599 ScalarValueRef::Date(value) => {
600 encoded.extend_from_slice(&value.as_days_since_epoch().to_le_bytes());
601 }
602 ScalarValueRef::Duration(value) => {
603 encoded.extend_from_slice(&value.as_millis().to_le_bytes());
604 }
605 ScalarValueRef::Float32(value) => {
606 encoded.extend_from_slice(&value.get().to_bits().to_le_bytes());
607 }
608 ScalarValueRef::Float64(value) => {
609 encoded.extend_from_slice(&value.get().to_bits().to_le_bytes());
610 }
611 ScalarValueRef::Int(value) => encoded.extend_from_slice(&value.to_le_bytes()),
612 ScalarValueRef::Principal(value) => encoded.extend_from_slice(value.as_slice()),
613 ScalarValueRef::Subaccount(value) => encoded.extend_from_slice(&value.to_bytes()),
614 ScalarValueRef::Text(value) => encoded.extend_from_slice(value.as_bytes()),
615 ScalarValueRef::Timestamp(value) => {
616 encoded.extend_from_slice(&value.as_millis().to_le_bytes());
617 }
618 ScalarValueRef::Uint(value) => encoded.extend_from_slice(&value.to_le_bytes()),
619 ScalarValueRef::Ulid(value) => encoded.extend_from_slice(&value.to_bytes()),
620 ScalarValueRef::Unit => {}
621 }
622
623 encoded
624 }
625 }
626}
627
628fn decode_scalar_slot_payload_body<'a>(
630 bytes: &'a [u8],
631 field_name: &'static str,
632) -> Result<Option<&'a [u8]>, InternalError> {
633 let Some((&prefix, rest)) = bytes.split_first() else {
634 return Err(InternalError::serialize_corruption(format!(
635 "row decode failed for field '{field_name}': empty scalar payload",
636 )));
637 };
638 if prefix != SCALAR_SLOT_PREFIX {
639 return Err(InternalError::serialize_corruption(format!(
640 "row decode failed for field '{field_name}': scalar payload prefix mismatch",
641 )));
642 }
643 let Some((&tag, payload)) = rest.split_first() else {
644 return Err(InternalError::serialize_corruption(format!(
645 "row decode failed for field '{field_name}': truncated scalar payload tag",
646 )));
647 };
648
649 match tag {
650 SCALAR_SLOT_TAG_NULL => {
651 if !payload.is_empty() {
652 return Err(InternalError::serialize_corruption(format!(
653 "row decode failed for field '{field_name}': null scalar payload has trailing bytes",
654 )));
655 }
656
657 Ok(None)
658 }
659 SCALAR_SLOT_TAG_VALUE => Ok(Some(payload)),
660 _ => Err(InternalError::serialize_corruption(format!(
661 "row decode failed for field '{field_name}': invalid scalar payload tag {tag}",
662 ))),
663 }
664}
665
666#[expect(clippy::too_many_lines)]
668fn decode_scalar_slot_value<'a>(
669 bytes: &'a [u8],
670 codec: ScalarCodec,
671 field_name: &'static str,
672) -> Result<ScalarSlotValueRef<'a>, InternalError> {
673 let Some(payload) = decode_scalar_slot_payload_body(bytes, field_name)? else {
674 return Ok(ScalarSlotValueRef::Null);
675 };
676
677 let value = match codec {
678 ScalarCodec::Blob => ScalarValueRef::Blob(payload),
679 ScalarCodec::Bool => {
680 let [value] = payload else {
681 return Err(InternalError::serialize_corruption(format!(
682 "row decode failed for field '{field_name}': bool payload must be exactly 1 byte",
683 )));
684 };
685 match value {
686 0 => ScalarValueRef::Bool(false),
687 1 => ScalarValueRef::Bool(true),
688 _ => {
689 return Err(InternalError::serialize_corruption(format!(
690 "row decode failed for field '{field_name}': invalid bool payload byte {value}",
691 )));
692 }
693 }
694 }
695 ScalarCodec::Date => {
696 let bytes: [u8; 4] = payload.try_into().map_err(|_| {
697 InternalError::serialize_corruption(format!(
698 "row decode failed for field '{field_name}': date payload must be exactly 4 bytes",
699 ))
700 })?;
701 ScalarValueRef::Date(Date::from_days_since_epoch(i32::from_le_bytes(bytes)))
702 }
703 ScalarCodec::Duration => {
704 let bytes: [u8; 8] = payload.try_into().map_err(|_| {
705 InternalError::serialize_corruption(format!(
706 "row decode failed for field '{field_name}': duration payload must be exactly 8 bytes",
707 ))
708 })?;
709 ScalarValueRef::Duration(Duration::from_millis(u64::from_le_bytes(bytes)))
710 }
711 ScalarCodec::Float32 => {
712 let bytes: [u8; 4] = payload.try_into().map_err(|_| {
713 InternalError::serialize_corruption(format!(
714 "row decode failed for field '{field_name}': float32 payload must be exactly 4 bytes",
715 ))
716 })?;
717 let value = f32::from_bits(u32::from_le_bytes(bytes));
718 let value = Float32::try_new(value).ok_or_else(|| {
719 InternalError::serialize_corruption(format!(
720 "row decode failed for field '{field_name}': float32 payload is non-finite",
721 ))
722 })?;
723 ScalarValueRef::Float32(value)
724 }
725 ScalarCodec::Float64 => {
726 let bytes: [u8; 8] = payload.try_into().map_err(|_| {
727 InternalError::serialize_corruption(format!(
728 "row decode failed for field '{field_name}': float64 payload must be exactly 8 bytes",
729 ))
730 })?;
731 let value = f64::from_bits(u64::from_le_bytes(bytes));
732 let value = Float64::try_new(value).ok_or_else(|| {
733 InternalError::serialize_corruption(format!(
734 "row decode failed for field '{field_name}': float64 payload is non-finite",
735 ))
736 })?;
737 ScalarValueRef::Float64(value)
738 }
739 ScalarCodec::Int64 => {
740 let bytes: [u8; 8] = payload.try_into().map_err(|_| {
741 InternalError::serialize_corruption(format!(
742 "row decode failed for field '{field_name}': int payload must be exactly 8 bytes",
743 ))
744 })?;
745 ScalarValueRef::Int(i64::from_le_bytes(bytes))
746 }
747 ScalarCodec::Principal => {
748 ScalarValueRef::Principal(Principal::try_from_bytes(payload).map_err(|err| {
749 InternalError::serialize_corruption(format!(
750 "row decode failed for field '{field_name}': {err}",
751 ))
752 })?)
753 }
754 ScalarCodec::Subaccount => {
755 let bytes: [u8; 32] = payload.try_into().map_err(|_| {
756 InternalError::serialize_corruption(format!(
757 "row decode failed for field '{field_name}': subaccount payload must be exactly 32 bytes",
758 ))
759 })?;
760 ScalarValueRef::Subaccount(Subaccount::from_array(bytes))
761 }
762 ScalarCodec::Text => {
763 let value = str::from_utf8(payload).map_err(|err| {
764 InternalError::serialize_corruption(format!(
765 "row decode failed for field '{field_name}': invalid UTF-8 text payload ({err})",
766 ))
767 })?;
768 ScalarValueRef::Text(value)
769 }
770 ScalarCodec::Timestamp => {
771 let bytes: [u8; 8] = payload.try_into().map_err(|_| {
772 InternalError::serialize_corruption(format!(
773 "row decode failed for field '{field_name}': timestamp payload must be exactly 8 bytes",
774 ))
775 })?;
776 ScalarValueRef::Timestamp(Timestamp::from_millis(i64::from_le_bytes(bytes)))
777 }
778 ScalarCodec::Uint64 => {
779 let bytes: [u8; 8] = payload.try_into().map_err(|_| {
780 InternalError::serialize_corruption(format!(
781 "row decode failed for field '{field_name}': uint payload must be exactly 8 bytes",
782 ))
783 })?;
784 ScalarValueRef::Uint(u64::from_le_bytes(bytes))
785 }
786 ScalarCodec::Ulid => {
787 let bytes: [u8; 16] = payload.try_into().map_err(|_| {
788 InternalError::serialize_corruption(format!(
789 "row decode failed for field '{field_name}': ulid payload must be exactly 16 bytes",
790 ))
791 })?;
792 ScalarValueRef::Ulid(Ulid::from_bytes(bytes))
793 }
794 ScalarCodec::Unit => {
795 if !payload.is_empty() {
796 return Err(InternalError::serialize_corruption(format!(
797 "row decode failed for field '{field_name}': unit payload must be empty",
798 )));
799 }
800 ScalarValueRef::Unit
801 }
802 };
803
804 Ok(ScalarSlotValueRef::Value(value))
805}
806
807macro_rules! impl_persisted_scalar_signed {
808 ($($ty:ty),* $(,)?) => {
809 $(
810 impl PersistedScalar for $ty {
811 const CODEC: ScalarCodec = ScalarCodec::Int64;
812
813 fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
814 Ok(i64::from(*self).to_le_bytes().to_vec())
815 }
816
817 fn decode_scalar_payload(
818 bytes: &[u8],
819 field_name: &'static str,
820 ) -> Result<Self, InternalError> {
821 let raw: [u8; 8] = bytes.try_into().map_err(|_| {
822 InternalError::serialize_corruption(format!(
823 "row decode failed for field '{field_name}': int payload must be exactly 8 bytes",
824 ))
825 })?;
826 <$ty>::try_from(i64::from_le_bytes(raw)).map_err(|_| {
827 InternalError::serialize_corruption(format!(
828 "row decode failed for field '{field_name}': integer payload out of range for target type",
829 ))
830 })
831 }
832 }
833 )*
834 };
835}
836
837macro_rules! impl_persisted_scalar_unsigned {
838 ($($ty:ty),* $(,)?) => {
839 $(
840 impl PersistedScalar for $ty {
841 const CODEC: ScalarCodec = ScalarCodec::Uint64;
842
843 fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
844 Ok(u64::from(*self).to_le_bytes().to_vec())
845 }
846
847 fn decode_scalar_payload(
848 bytes: &[u8],
849 field_name: &'static str,
850 ) -> Result<Self, InternalError> {
851 let raw: [u8; 8] = bytes.try_into().map_err(|_| {
852 InternalError::serialize_corruption(format!(
853 "row decode failed for field '{field_name}': uint payload must be exactly 8 bytes",
854 ))
855 })?;
856 <$ty>::try_from(u64::from_le_bytes(raw)).map_err(|_| {
857 InternalError::serialize_corruption(format!(
858 "row decode failed for field '{field_name}': unsigned payload out of range for target type",
859 ))
860 })
861 }
862 }
863 )*
864 };
865}
866
867impl_persisted_scalar_signed!(i8, i16, i32, i64);
868impl_persisted_scalar_unsigned!(u8, u16, u32, u64);
869
870impl PersistedScalar for bool {
871 const CODEC: ScalarCodec = ScalarCodec::Bool;
872
873 fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
874 Ok(vec![u8::from(*self)])
875 }
876
877 fn decode_scalar_payload(
878 bytes: &[u8],
879 field_name: &'static str,
880 ) -> Result<Self, InternalError> {
881 let [value] = bytes else {
882 return Err(InternalError::serialize_corruption(format!(
883 "row decode failed for field '{field_name}': bool payload must be exactly 1 byte",
884 )));
885 };
886
887 match value {
888 0 => Ok(false),
889 1 => Ok(true),
890 _ => Err(InternalError::serialize_corruption(format!(
891 "row decode failed for field '{field_name}': invalid bool payload byte {value}",
892 ))),
893 }
894 }
895}
896
897impl PersistedScalar for String {
898 const CODEC: ScalarCodec = ScalarCodec::Text;
899
900 fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
901 Ok(self.as_bytes().to_vec())
902 }
903
904 fn decode_scalar_payload(
905 bytes: &[u8],
906 field_name: &'static str,
907 ) -> Result<Self, InternalError> {
908 str::from_utf8(bytes).map(str::to_owned).map_err(|err| {
909 InternalError::serialize_corruption(format!(
910 "row decode failed for field '{field_name}': invalid UTF-8 text payload ({err})",
911 ))
912 })
913 }
914}
915
916impl PersistedScalar for Vec<u8> {
917 const CODEC: ScalarCodec = ScalarCodec::Blob;
918
919 fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
920 Ok(self.clone())
921 }
922
923 fn decode_scalar_payload(
924 bytes: &[u8],
925 _field_name: &'static str,
926 ) -> Result<Self, InternalError> {
927 Ok(bytes.to_vec())
928 }
929}
930
931impl PersistedScalar for Blob {
932 const CODEC: ScalarCodec = ScalarCodec::Blob;
933
934 fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
935 Ok(self.to_vec())
936 }
937
938 fn decode_scalar_payload(
939 bytes: &[u8],
940 _field_name: &'static str,
941 ) -> Result<Self, InternalError> {
942 Ok(Self::from(bytes))
943 }
944}
945
946impl PersistedScalar for Ulid {
947 const CODEC: ScalarCodec = ScalarCodec::Ulid;
948
949 fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
950 Ok(self.to_bytes().to_vec())
951 }
952
953 fn decode_scalar_payload(
954 bytes: &[u8],
955 field_name: &'static str,
956 ) -> Result<Self, InternalError> {
957 Self::try_from_bytes(bytes).map_err(|err| {
958 InternalError::serialize_corruption(format!(
959 "row decode failed for field '{field_name}': {err}",
960 ))
961 })
962 }
963}
964
965impl PersistedScalar for Timestamp {
966 const CODEC: ScalarCodec = ScalarCodec::Timestamp;
967
968 fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
969 Ok(self.as_millis().to_le_bytes().to_vec())
970 }
971
972 fn decode_scalar_payload(
973 bytes: &[u8],
974 field_name: &'static str,
975 ) -> Result<Self, InternalError> {
976 let raw: [u8; 8] = bytes.try_into().map_err(|_| {
977 InternalError::serialize_corruption(format!(
978 "row decode failed for field '{field_name}': timestamp payload must be exactly 8 bytes",
979 ))
980 })?;
981
982 Ok(Self::from_millis(i64::from_le_bytes(raw)))
983 }
984}
985
986impl PersistedScalar for Date {
987 const CODEC: ScalarCodec = ScalarCodec::Date;
988
989 fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
990 Ok(self.as_days_since_epoch().to_le_bytes().to_vec())
991 }
992
993 fn decode_scalar_payload(
994 bytes: &[u8],
995 field_name: &'static str,
996 ) -> Result<Self, InternalError> {
997 let raw: [u8; 4] = bytes.try_into().map_err(|_| {
998 InternalError::serialize_corruption(format!(
999 "row decode failed for field '{field_name}': date payload must be exactly 4 bytes",
1000 ))
1001 })?;
1002
1003 Ok(Self::from_days_since_epoch(i32::from_le_bytes(raw)))
1004 }
1005}
1006
1007impl PersistedScalar for Duration {
1008 const CODEC: ScalarCodec = ScalarCodec::Duration;
1009
1010 fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1011 Ok(self.as_millis().to_le_bytes().to_vec())
1012 }
1013
1014 fn decode_scalar_payload(
1015 bytes: &[u8],
1016 field_name: &'static str,
1017 ) -> Result<Self, InternalError> {
1018 let raw: [u8; 8] = bytes.try_into().map_err(|_| {
1019 InternalError::serialize_corruption(format!(
1020 "row decode failed for field '{field_name}': duration payload must be exactly 8 bytes",
1021 ))
1022 })?;
1023
1024 Ok(Self::from_millis(u64::from_le_bytes(raw)))
1025 }
1026}
1027
1028impl PersistedScalar for Float32 {
1029 const CODEC: ScalarCodec = ScalarCodec::Float32;
1030
1031 fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1032 Ok(self.get().to_bits().to_le_bytes().to_vec())
1033 }
1034
1035 fn decode_scalar_payload(
1036 bytes: &[u8],
1037 field_name: &'static str,
1038 ) -> Result<Self, InternalError> {
1039 let raw: [u8; 4] = bytes.try_into().map_err(|_| {
1040 InternalError::serialize_corruption(format!(
1041 "row decode failed for field '{field_name}': float32 payload must be exactly 4 bytes",
1042 ))
1043 })?;
1044 let value = f32::from_bits(u32::from_le_bytes(raw));
1045
1046 Self::try_new(value).ok_or_else(|| {
1047 InternalError::serialize_corruption(format!(
1048 "row decode failed for field '{field_name}': float32 payload is non-finite",
1049 ))
1050 })
1051 }
1052}
1053
1054impl PersistedScalar for Float64 {
1055 const CODEC: ScalarCodec = ScalarCodec::Float64;
1056
1057 fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1058 Ok(self.get().to_bits().to_le_bytes().to_vec())
1059 }
1060
1061 fn decode_scalar_payload(
1062 bytes: &[u8],
1063 field_name: &'static str,
1064 ) -> Result<Self, InternalError> {
1065 let raw: [u8; 8] = bytes.try_into().map_err(|_| {
1066 InternalError::serialize_corruption(format!(
1067 "row decode failed for field '{field_name}': float64 payload must be exactly 8 bytes",
1068 ))
1069 })?;
1070 let value = f64::from_bits(u64::from_le_bytes(raw));
1071
1072 Self::try_new(value).ok_or_else(|| {
1073 InternalError::serialize_corruption(format!(
1074 "row decode failed for field '{field_name}': float64 payload is non-finite",
1075 ))
1076 })
1077 }
1078}
1079
1080impl PersistedScalar for Principal {
1081 const CODEC: ScalarCodec = ScalarCodec::Principal;
1082
1083 fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1084 self.to_bytes().map_err(|err| {
1085 InternalError::serialize_internal(format!(
1086 "row encode failed for principal field: {err}",
1087 ))
1088 })
1089 }
1090
1091 fn decode_scalar_payload(
1092 bytes: &[u8],
1093 field_name: &'static str,
1094 ) -> Result<Self, InternalError> {
1095 Self::try_from_bytes(bytes).map_err(|err| {
1096 InternalError::serialize_corruption(format!(
1097 "row decode failed for field '{field_name}': {err}",
1098 ))
1099 })
1100 }
1101}
1102
1103impl PersistedScalar for Subaccount {
1104 const CODEC: ScalarCodec = ScalarCodec::Subaccount;
1105
1106 fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1107 Ok(self.to_bytes().to_vec())
1108 }
1109
1110 fn decode_scalar_payload(
1111 bytes: &[u8],
1112 field_name: &'static str,
1113 ) -> Result<Self, InternalError> {
1114 let raw: [u8; 32] = bytes.try_into().map_err(|_| {
1115 InternalError::serialize_corruption(format!(
1116 "row decode failed for field '{field_name}': subaccount payload must be exactly 32 bytes",
1117 ))
1118 })?;
1119
1120 Ok(Self::from_array(raw))
1121 }
1122}
1123
1124impl PersistedScalar for () {
1125 const CODEC: ScalarCodec = ScalarCodec::Unit;
1126
1127 fn encode_scalar_payload(&self) -> Result<Vec<u8>, InternalError> {
1128 Ok(Vec::new())
1129 }
1130
1131 fn decode_scalar_payload(
1132 bytes: &[u8],
1133 field_name: &'static str,
1134 ) -> Result<Self, InternalError> {
1135 if !bytes.is_empty() {
1136 return Err(InternalError::serialize_corruption(format!(
1137 "row decode failed for field '{field_name}': unit payload must be empty",
1138 )));
1139 }
1140
1141 Ok(())
1142 }
1143}