use super::{
blob_store::BlobStore,
indexes::{Bytes, PageOffset},
page::Page,
row_hash,
var_len::VarLenRef,
};
use core::cell::Cell;
use core::str;
use spacetimedb_sats::{
i256, impl_serialize,
layout::{
align_to, AlgebraicTypeLayout, HasLayout as _, ProductTypeLayoutView, RowTypeLayout, SumTypeLayout, VarLenType,
},
ser::{SerializeNamedProduct, Serializer},
u256, ArrayType,
};
pub unsafe fn serialize_row_from_page<S: Serializer>(
ser: S,
page: &Page,
blob_store: &dyn BlobStore,
fixed_offset: PageOffset,
ty: &RowTypeLayout,
) -> Result<S::Ok, S::Error> {
let fixed_bytes = page.get_row_data(fixed_offset, ty.size());
unsafe { serialize_product(ser, fixed_bytes, page, blob_store, &Cell::new(0), ty.product()) }
}
type CurrOffset<'a> = &'a Cell<usize>;
fn update<R>(curr_offset: CurrOffset<'_>, with: impl FnOnce(&mut usize) -> R) -> R {
let mut tmp = curr_offset.get();
let ret = with(&mut tmp);
curr_offset.set(tmp);
ret
}
unsafe fn serialize_product<S: Serializer>(
ser: S,
bytes: &Bytes,
page: &Page,
blob_store: &dyn BlobStore,
curr_offset: CurrOffset<'_>,
ty: ProductTypeLayoutView<'_>,
) -> Result<S::Ok, S::Error> {
let elems = &ty.elements;
let mut ser = ser.serialize_named_product(elems.len())?;
let my_offset = curr_offset.get();
for elem_ty in elems.iter() {
curr_offset.set(my_offset + elem_ty.offset as usize);
let value = Value {
bytes,
page,
blob_store,
curr_offset,
ty: &elem_ty.ty,
};
ser.serialize_element(elem_ty.name.as_deref(), &value)?;
}
ser.end()
}
unsafe fn serialize_sum<S: Serializer>(
ser: S,
bytes: &Bytes,
page: &Page,
blob_store: &dyn BlobStore,
curr_offset: CurrOffset<'_>,
ty: &SumTypeLayout,
) -> Result<S::Ok, S::Error> {
let (tag, data_ty) = read_tag(bytes, ty, curr_offset.get());
let data_offset = &Cell::new(curr_offset.get() + ty.offset_of_variant_data(tag));
let data_value = Value {
bytes,
page,
blob_store,
curr_offset: data_offset,
ty: data_ty,
};
let ret = ser.serialize_variant(tag, None, &data_value);
update(curr_offset, |co| *co += ty.size());
ret
}
pub fn read_tag<'ty>(bytes: &Bytes, ty: &'ty SumTypeLayout, curr_offset: usize) -> (u8, &'ty AlgebraicTypeLayout) {
let tag_offset = ty.offset_of_tag();
let tag = bytes[curr_offset + tag_offset];
let data_ty = &ty.variants[tag as usize].ty;
(tag, data_ty)
}
struct Value<'a> {
bytes: &'a Bytes,
page: &'a Page,
blob_store: &'a dyn BlobStore,
curr_offset: CurrOffset<'a>,
ty: &'a AlgebraicTypeLayout,
}
impl_serialize!(['a] Value<'a>, (self, ser) => {
unsafe { serialize_value(ser, self.bytes, self.page, self.blob_store, self.curr_offset, self.ty) }
});
pub(crate) unsafe fn serialize_value<S: Serializer>(
ser: S,
bytes: &Bytes,
page: &Page,
blob_store: &dyn BlobStore,
curr_offset: CurrOffset<'_>,
ty: &AlgebraicTypeLayout,
) -> Result<S::Ok, S::Error> {
debug_assert_eq!(
curr_offset.get(),
align_to(curr_offset.get(), ty.align()),
"curr_offset {curr_offset:?} insufficiently aligned for type {ty:#?}",
);
match ty {
AlgebraicTypeLayout::Sum(ty) => {
unsafe { serialize_sum(ser, bytes, page, blob_store, curr_offset, ty) }
}
AlgebraicTypeLayout::Product(ty) => {
unsafe { serialize_product(ser, bytes, page, blob_store, curr_offset, ty.view()) }
}
&AlgebraicTypeLayout::Bool => ser.serialize_bool(unsafe { read_from_bytes::<u8>(bytes, curr_offset) } != 0),
&AlgebraicTypeLayout::I8 => ser.serialize_i8(unsafe { read_from_bytes(bytes, curr_offset) }),
&AlgebraicTypeLayout::U8 => ser.serialize_u8(unsafe { read_from_bytes(bytes, curr_offset) }),
&AlgebraicTypeLayout::I16 => {
ser.serialize_i16(i16::from_le_bytes(unsafe { read_from_bytes(bytes, curr_offset) }))
}
&AlgebraicTypeLayout::U16 => {
ser.serialize_u16(u16::from_le_bytes(unsafe { read_from_bytes(bytes, curr_offset) }))
}
&AlgebraicTypeLayout::I32 => {
ser.serialize_i32(i32::from_le_bytes(unsafe { read_from_bytes(bytes, curr_offset) }))
}
&AlgebraicTypeLayout::U32 => {
ser.serialize_u32(u32::from_le_bytes(unsafe { read_from_bytes(bytes, curr_offset) }))
}
&AlgebraicTypeLayout::I64 => {
ser.serialize_i64(i64::from_le_bytes(unsafe { read_from_bytes(bytes, curr_offset) }))
}
&AlgebraicTypeLayout::U64 => {
ser.serialize_u64(u64::from_le_bytes(unsafe { read_from_bytes(bytes, curr_offset) }))
}
&AlgebraicTypeLayout::I128 => {
ser.serialize_i128(i128::from_le_bytes(unsafe { read_from_bytes(bytes, curr_offset) }))
}
&AlgebraicTypeLayout::U128 => {
ser.serialize_u128(u128::from_le_bytes(unsafe { read_from_bytes(bytes, curr_offset) }))
}
&AlgebraicTypeLayout::I256 => {
ser.serialize_i256(i256::from_le_bytes(unsafe { read_from_bytes(bytes, curr_offset) }))
}
&AlgebraicTypeLayout::U256 => {
ser.serialize_u256(u256::from_le_bytes(unsafe { read_from_bytes(bytes, curr_offset) }))
}
&AlgebraicTypeLayout::F32 => {
ser.serialize_f32(f32::from_le_bytes(unsafe { read_from_bytes(bytes, curr_offset) }))
}
&AlgebraicTypeLayout::F64 => {
ser.serialize_f64(f64::from_le_bytes(unsafe { read_from_bytes(bytes, curr_offset) }))
}
&AlgebraicTypeLayout::String => {
unsafe { serialize_string(ser, bytes, page, blob_store, curr_offset) }
}
AlgebraicTypeLayout::VarLen(VarLenType::Array(ty)) => {
unsafe { serialize_array(ser, bytes, page, blob_store, curr_offset, ty) }
}
}
}
unsafe fn serialize_string<S: Serializer>(
ser: S,
bytes: &Bytes,
page: &Page,
blob_store: &dyn BlobStore,
curr_offset: CurrOffset<'_>,
) -> Result<S::Ok, S::Error> {
let vlr = unsafe { read_from_bytes::<VarLenRef>(bytes, curr_offset) };
if vlr.is_large_blob() {
let blob = unsafe { vlr_blob_bytes(page, blob_store, vlr) };
let str = unsafe { str::from_utf8_unchecked(blob) };
ser.serialize_str(str)
} else {
let var_iter = unsafe { page.iter_vlo_data(vlr.first_granule) };
let total_len = vlr.length_in_bytes as usize;
unsafe { ser.serialize_str_in_chunks(total_len, var_iter) }
}
}
unsafe fn serialize_array<S: Serializer>(
ser: S,
bytes: &Bytes,
page: &Page,
blob_store: &dyn BlobStore,
curr_offset: CurrOffset<'_>,
ty: &ArrayType,
) -> Result<S::Ok, S::Error> {
let vlr = unsafe { read_from_bytes::<VarLenRef>(bytes, curr_offset) };
if vlr.is_large_blob() {
let blob = unsafe { vlr_blob_bytes(page, blob_store, vlr) };
unsafe { ser.serialize_bsatn(ty, blob) }
} else {
let var_iter = unsafe { page.iter_vlo_data(vlr.first_granule) };
let total_len = vlr.length_in_bytes as usize;
unsafe { ser.serialize_bsatn_in_chunks(ty, total_len, var_iter) }
}
}
#[cold]
#[inline(never)]
pub(crate) unsafe fn vlr_blob_bytes<'b>(page: &Page, blob_store: &'b dyn BlobStore, vlr: VarLenRef) -> &'b [u8] {
let mut var_iter = unsafe { page.iter_var_len_object(vlr.first_granule) };
let granule = var_iter.next();
let granule = unsafe { granule.unwrap_unchecked() };
let hash = granule.blob_hash();
blob_store.retrieve_blob(&hash).unwrap()
}
pub unsafe fn read_from_bytes<T: Copy>(bytes: &Bytes, curr_offset: CurrOffset<'_>) -> T {
update(curr_offset, |co| unsafe { row_hash::read_from_bytes(bytes, co) })
}