1use crate::layout::ProductTypeLayoutView;
6
7use super::{
8 blob_store::BlobStore,
9 indexes::{Bytes, PageOffset, RowPointer, SquashedOffset},
10 layout::{
11 align_to, bsatn_len, required_var_len_granules_for_row, AlgebraicTypeLayout, HasLayout, RowTypeLayout,
12 SumTypeLayout, VarLenType,
13 },
14 page::{GranuleOffsetIter, Page, VarView},
15 page_pool::PagePool,
16 pages::Pages,
17 table::BlobNumBytes,
18 util::range_move,
19 var_len::{VarLenGranule, VarLenMembers, VarLenRef},
20};
21use spacetimedb_sats::{
22 bsatn::{self, to_writer, DecodeError},
23 buffer::BufWriter,
24 de::DeserializeSeed as _,
25 i256, u256, AlgebraicType, AlgebraicValue, ProductValue, SumValue,
26};
27use thiserror::Error;
28
29#[derive(Error, Debug, PartialEq, Eq)]
30pub enum Error {
31 #[error(transparent)]
32 Decode(#[from] DecodeError),
33 #[error("Expected a value of type {0:?}, but found {1:?}")]
34 WrongType(AlgebraicType, AlgebraicValue),
35 #[error(transparent)]
36 PageError(#[from] super::page::Error),
37 #[error(transparent)]
38 PagesError(#[from] super::pages::Error),
39}
40
41pub unsafe fn write_row_to_pages_bsatn(
53 pool: &PagePool,
54 pages: &mut Pages,
55 visitor: &impl VarLenMembers,
56 blob_store: &mut dyn BlobStore,
57 ty: &RowTypeLayout,
58 mut bytes: &[u8],
59 squashed_offset: SquashedOffset,
60) -> Result<(RowPointer, BlobNumBytes), Error> {
61 let val = ty.product().deserialize(bsatn::Deserializer::new(&mut bytes))?;
62 unsafe { write_row_to_pages(pool, pages, visitor, blob_store, ty, &val, squashed_offset) }
63}
64
65pub unsafe fn write_row_to_pages(
77 pool: &PagePool,
78 pages: &mut Pages,
79 visitor: &impl VarLenMembers,
80 blob_store: &mut dyn BlobStore,
81 ty: &RowTypeLayout,
82 val: &ProductValue,
83 squashed_offset: SquashedOffset,
84) -> Result<(RowPointer, BlobNumBytes), Error> {
85 let num_granules = required_var_len_granules_for_row(val);
86
87 match pages.with_page_to_insert_row(pool, ty.size(), num_granules, |page| {
88 unsafe { write_row_to_page(page, blob_store, visitor, ty, val) }
95 })? {
96 (page, Ok((offset, blob_inserted))) => {
97 Ok((RowPointer::new(false, page, offset, squashed_offset), blob_inserted))
98 }
99 (_, Err(e)) => Err(e),
100 }
101}
102
103pub unsafe fn write_row_to_page(
119 page: &mut Page,
120 blob_store: &mut dyn BlobStore,
121 visitor: &impl VarLenMembers,
122 ty: &RowTypeLayout,
123 val: &ProductValue,
124) -> Result<(PageOffset, BlobNumBytes), Error> {
125 let fixed_row_size = ty.size();
126 let fixed_offset = unsafe { page.alloc_fixed_len(fixed_row_size)? };
129
130 let (mut fixed, var_view) = page.split_fixed_var_mut();
132 let mut serialized = BflatnSerializedRowBuffer {
133 fixed_buf: fixed.get_row_mut(fixed_offset, fixed_row_size),
134 curr_offset: 0,
135 var_view,
136 last_allocated_var_len_index: 0,
137 large_blob_insertions: Vec::new(),
138 };
139
140 if let Err(e) = serialized.write_product(ty.product(), val) {
142 unsafe { serialized.roll_back_var_len_allocations(visitor) };
144 unsafe { fixed.free(fixed_offset, fixed_row_size) };
148 return Err(e);
149 }
150
151 let blob_store_inserted_bytes = serialized.write_large_blobs(blob_store);
153
154 Ok((fixed_offset, blob_store_inserted_bytes))
155}
156
157struct BflatnSerializedRowBuffer<'page> {
159 fixed_buf: &'page mut Bytes,
162
163 curr_offset: usize,
167
168 last_allocated_var_len_index: usize,
170
171 large_blob_insertions: Vec<(VarLenRef, Vec<u8>)>,
175
176 var_view: VarView<'page>,
178}
179
180impl BflatnSerializedRowBuffer<'_> {
181 unsafe fn roll_back_var_len_allocations(&mut self, visitor: &impl VarLenMembers) {
187 let visitor_iter = unsafe { visitor.visit_var_len(self.fixed_buf) };
193 for vlr in visitor_iter.take(self.last_allocated_var_len_index) {
194 unsafe { self.var_view.free_object_ignore_blob(*vlr) };
198 }
199 }
200
201 fn write_large_blobs(mut self, blob_store: &mut dyn BlobStore) -> BlobNumBytes {
203 let mut blob_store_inserted_bytes = BlobNumBytes::default();
204 for (vlr, value) in self.large_blob_insertions {
205 unsafe {
210 blob_store_inserted_bytes += self.var_view.write_large_blob_hash_to_granule(blob_store, &value, vlr);
211 }
212 }
213 blob_store_inserted_bytes
214 }
215
216 fn write_value(&mut self, ty: &AlgebraicTypeLayout, val: &AlgebraicValue) -> Result<(), Error> {
218 debug_assert_eq!(
219 self.curr_offset,
220 align_to(self.curr_offset, ty.align()),
221 "curr_offset {} insufficiently aligned for type {:#?}",
222 self.curr_offset,
223 val,
224 );
225
226 match (ty, val) {
227 (AlgebraicTypeLayout::Sum(ty), AlgebraicValue::Sum(val)) => self.write_sum(ty, val)?,
231 (AlgebraicTypeLayout::Product(ty), AlgebraicValue::Product(val)) => self.write_product(ty.view(), val)?,
233
234 (&AlgebraicTypeLayout::Bool, AlgebraicValue::Bool(val)) => self.write_bool(*val),
236 (&AlgebraicTypeLayout::I8, AlgebraicValue::I8(val)) => self.write_i8(*val),
238 (&AlgebraicTypeLayout::U8, AlgebraicValue::U8(val)) => self.write_u8(*val),
239 (&AlgebraicTypeLayout::I16, AlgebraicValue::I16(val)) => self.write_i16(*val),
240 (&AlgebraicTypeLayout::U16, AlgebraicValue::U16(val)) => self.write_u16(*val),
241 (&AlgebraicTypeLayout::I32, AlgebraicValue::I32(val)) => self.write_i32(*val),
242 (&AlgebraicTypeLayout::U32, AlgebraicValue::U32(val)) => self.write_u32(*val),
243 (&AlgebraicTypeLayout::I64, AlgebraicValue::I64(val)) => self.write_i64(*val),
244 (&AlgebraicTypeLayout::U64, AlgebraicValue::U64(val)) => self.write_u64(*val),
245 (&AlgebraicTypeLayout::I128, AlgebraicValue::I128(val)) => self.write_i128(val.0),
246 (&AlgebraicTypeLayout::U128, AlgebraicValue::U128(val)) => self.write_u128(val.0),
247 (&AlgebraicTypeLayout::I256, AlgebraicValue::I256(val)) => self.write_i256(**val),
248 (&AlgebraicTypeLayout::U256, AlgebraicValue::U256(val)) => self.write_u256(**val),
249 (&AlgebraicTypeLayout::F32, AlgebraicValue::F32(val)) => self.write_f32((*val).into()),
251 (&AlgebraicTypeLayout::F64, AlgebraicValue::F64(val)) => self.write_f64((*val).into()),
252
253 (&AlgebraicTypeLayout::String, AlgebraicValue::String(val)) => self.write_string(val)?,
256
257 (AlgebraicTypeLayout::VarLen(VarLenType::Array(_)), val @ AlgebraicValue::Array(_)) => {
260 self.write_av_bsatn(val)?
261 }
262
263 (ty, val) => Err(Error::WrongType(ty.algebraic_type(), val.clone()))?,
265 }
266
267 Ok(())
268 }
269
270 fn write_sum(&mut self, ty: &SumTypeLayout, val: &SumValue) -> Result<(), Error> {
272 let SumValue { tag, ref value } = *val;
274 let variant_ty = &ty.variants[tag as usize];
275 let variant_offset = self.curr_offset + ty.offset_of_variant_data(tag);
276 let tag_offset = self.curr_offset + ty.offset_of_tag();
277
278 self.curr_offset = variant_offset;
280 self.write_value(&variant_ty.ty, value)?;
281
282 self.curr_offset = tag_offset;
284 self.write_u8(tag);
285
286 Ok(())
287 }
288
289 fn write_product(&mut self, ty: ProductTypeLayoutView<'_>, val: &ProductValue) -> Result<(), Error> {
291 if ty.elements.len() != val.elements.len() {
298 return Err(Error::WrongType(
299 ty.algebraic_type(),
300 AlgebraicValue::Product(val.clone()),
301 ));
302 }
303
304 let base_offset = self.curr_offset;
305
306 for (elt_ty, elt) in ty.elements.iter().zip(val.elements.iter()) {
307 self.curr_offset = base_offset + elt_ty.offset as usize;
308 self.write_value(&elt_ty.ty, elt)?;
309 }
310 Ok(())
311 }
312
313 fn write_string(&mut self, val: &str) -> Result<(), Error> {
316 let val = val.as_bytes();
317
318 let (vlr, in_blob) = self.var_view.alloc_for_slice(val)?;
320 if in_blob {
321 self.defer_insert_large_blob(vlr, val.to_vec());
322 }
323
324 self.write_var_len_ref(vlr);
326 Ok(())
327 }
328
329 fn write_av_bsatn(&mut self, val: &AlgebraicValue) -> Result<(), Error> {
332 let len_in_bytes = bsatn_len(val);
334 let (vlr, in_blob) = self.var_view.alloc_for_len(len_in_bytes)?;
335
336 self.write_var_len_ref(vlr);
338
339 if in_blob {
340 let mut bytes = Vec::with_capacity(len_in_bytes);
343 val.encode(&mut bytes);
344 self.defer_insert_large_blob(vlr, bytes);
345 } else {
346 let iter = unsafe { self.var_view.granule_offset_iter(vlr.first_granule) };
352 let mut writer = GranuleBufWriter { buf: None, iter };
353 to_writer(&mut writer, val).unwrap();
354 }
355
356 struct GranuleBufWriter<'vv, 'page> {
358 buf: Option<(PageOffset, usize)>,
361 iter: GranuleOffsetIter<'page, 'vv>,
363 }
364 impl BufWriter for GranuleBufWriter<'_, '_> {
365 fn put_slice(&mut self, mut slice: &[u8]) {
366 while !slice.is_empty() {
367 let (offset, start) = match self.buf.take() {
368 Some(buf @ (_, start)) if start < VarLenGranule::DATA_SIZE => buf,
370 _ => {
372 let next = self.iter.next();
373 debug_assert!(next.is_some());
374 let next = unsafe { next.unwrap_unchecked() };
377 (next, 0)
378 }
379 };
380
381 let capacity_remains = VarLenGranule::DATA_SIZE - start;
384 debug_assert!(capacity_remains > 0);
385 let extend_len = capacity_remains.min(slice.len());
386 let (extend_with, rest) = slice.split_at(extend_len);
387 let write_to = unsafe { self.iter.get_mut_data(offset, start) };
392
393 for (to, byte) in write_to.iter_mut().zip(extend_with) {
395 *to = *byte;
396 }
397
398 slice = rest;
399 self.buf = Some((offset, start + extend_len));
400 }
401 }
402 }
403
404 Ok(())
405 }
406
407 fn write_var_len_ref(&mut self, val: VarLenRef) {
409 self.write_u16(val.length_in_bytes);
410 self.write_u16(val.first_granule.0);
411
412 self.last_allocated_var_len_index += 1;
415 }
416
417 fn defer_insert_large_blob(&mut self, vlr: VarLenRef, obj_bytes: Vec<u8>) {
419 self.large_blob_insertions.push((vlr, obj_bytes));
420 }
421
422 fn write_bytes<const N: usize>(&mut self, bytes: &[u8; N]) {
425 self.fixed_buf[range_move(0..N, self.curr_offset)].copy_from_slice(bytes);
426 self.curr_offset += N;
427 }
428
429 fn write_u8(&mut self, val: u8) {
431 self.write_bytes(&[val]);
432 }
433
434 fn write_i8(&mut self, val: i8) {
436 self.write_u8(val as u8);
437 }
438
439 fn write_bool(&mut self, val: bool) {
441 self.write_u8(val as u8);
442 }
443
444 fn write_u16(&mut self, val: u16) {
446 self.write_bytes(&val.to_le_bytes());
447 }
448
449 fn write_i16(&mut self, val: i16) {
451 self.write_bytes(&val.to_le_bytes());
452 }
453
454 fn write_u32(&mut self, val: u32) {
456 self.write_bytes(&val.to_le_bytes());
457 }
458
459 fn write_i32(&mut self, val: i32) {
461 self.write_bytes(&val.to_le_bytes());
462 }
463
464 fn write_u64(&mut self, val: u64) {
466 self.write_bytes(&val.to_le_bytes());
467 }
468
469 fn write_i64(&mut self, val: i64) {
471 self.write_bytes(&val.to_le_bytes());
472 }
473
474 fn write_u128(&mut self, val: u128) {
476 self.write_bytes(&val.to_le_bytes());
477 }
478
479 fn write_i128(&mut self, val: i128) {
481 self.write_bytes(&val.to_le_bytes());
482 }
483
484 fn write_u256(&mut self, val: u256) {
486 self.write_bytes(&val.to_le_bytes());
487 }
488
489 fn write_i256(&mut self, val: i256) {
491 self.write_bytes(&val.to_le_bytes());
492 }
493
494 fn write_f32(&mut self, val: f32) {
496 self.write_bytes(&val.to_le_bytes());
497 }
498
499 fn write_f64(&mut self, val: f64) {
501 self.write_bytes(&val.to_le_bytes());
502 }
503}
504
505#[cfg(test)]
506pub mod test {
507 use super::*;
508 use crate::{
509 bflatn_from::serialize_row_from_page, blob_store::HashMapBlobStore, page::tests::hash_unmodified_save_get,
510 row_type_visitor::row_type_visitor,
511 };
512 use proptest::{prelude::*, prop_assert_eq, proptest};
513 use spacetimedb_sats::algebraic_value::ser::ValueSerializer;
514 use spacetimedb_sats::proptest::generate_typed_row;
515
516 proptest! {
517 #![proptest_config(ProptestConfig::with_cases(if cfg!(miri) { 8 } else { 2048 }))]
518 #[test]
519 fn av_serde_round_trip_through_page((ty, val) in generate_typed_row()) {
520 let ty: RowTypeLayout = ty.into();
521 let mut page = Page::new(ty.size());
522 let visitor = row_type_visitor(&ty);
523 let blob_store = &mut HashMapBlobStore::default();
524
525 let hash_pre_ins = hash_unmodified_save_get(&mut page);
526
527 let (offset, _) = unsafe { write_row_to_page(&mut page, blob_store, &visitor, &ty, &val).unwrap() };
528
529 let hash_pre_ser = hash_unmodified_save_get(&mut page);
530 assert_ne!(hash_pre_ins, hash_pre_ser);
531
532 let read_val = unsafe { serialize_row_from_page(ValueSerializer, &page, blob_store, offset, &ty) }
533 .unwrap().into_product().unwrap();
534
535 prop_assert_eq!(val, read_val);
536 assert_eq!(hash_pre_ser, *page.unmodified_hash().unwrap());
537 }
538 }
539}