1use super::{
6 blob_store::BlobStore,
7 indexes::{Bytes, PageOffset, RowPointer, SquashedOffset},
8 page::{GranuleOffsetIter, Page, VarView},
9 page_pool::PagePool,
10 pages::Pages,
11 table::BlobNumBytes,
12 util::range_move,
13 var_len::{VarLenGranule, VarLenMembers, VarLenRef},
14};
15use spacetimedb_sats::{
16 bsatn::{self, to_writer, DecodeError},
17 buffer::BufWriter,
18 de::DeserializeSeed as _,
19 i256,
20 layout::{
21 align_to, AlgebraicTypeLayout, HasLayout, ProductTypeLayoutView, RowTypeLayout, SumTypeLayout, VarLenType,
22 },
23 u256, AlgebraicType, AlgebraicValue, ProductValue, SumValue,
24};
25use thiserror::Error;
26
27#[derive(Error, Debug, PartialEq, Eq)]
28pub enum Error {
29 #[error(transparent)]
30 Decode(#[from] DecodeError),
31 #[error("Expected a value of type {0:?}, but found {1:?}")]
32 WrongType(AlgebraicType, AlgebraicValue),
33 #[error(transparent)]
34 PageError(#[from] super::page::Error),
35 #[error(transparent)]
36 PagesError(#[from] super::pages::Error),
37}
38
39pub unsafe fn write_row_to_pages_bsatn(
51 pool: &PagePool,
52 pages: &mut Pages,
53 visitor: &impl VarLenMembers,
54 blob_store: &mut dyn BlobStore,
55 ty: &RowTypeLayout,
56 mut bytes: &[u8],
57 squashed_offset: SquashedOffset,
58) -> Result<(RowPointer, BlobNumBytes), Error> {
59 let val = ty.product().deserialize(bsatn::Deserializer::new(&mut bytes))?;
60 unsafe { write_row_to_pages(pool, pages, visitor, blob_store, ty, &val, squashed_offset) }
61}
62
63pub unsafe fn write_row_to_pages(
75 pool: &PagePool,
76 pages: &mut Pages,
77 visitor: &impl VarLenMembers,
78 blob_store: &mut dyn BlobStore,
79 ty: &RowTypeLayout,
80 val: &ProductValue,
81 squashed_offset: SquashedOffset,
82) -> Result<(RowPointer, BlobNumBytes), Error> {
83 let num_granules = required_var_len_granules_for_row(val);
84
85 match pages.with_page_to_insert_row(pool, ty.size(), num_granules, |page| {
86 unsafe { write_row_to_page(page, blob_store, visitor, ty, val) }
93 })? {
94 (page, Ok((offset, blob_inserted))) => {
95 Ok((RowPointer::new(false, page, offset, squashed_offset), blob_inserted))
96 }
97 (_, Err(e)) => Err(e),
98 }
99}
100
101pub unsafe fn write_row_to_page(
117 page: &mut Page,
118 blob_store: &mut dyn BlobStore,
119 visitor: &impl VarLenMembers,
120 ty: &RowTypeLayout,
121 val: &ProductValue,
122) -> Result<(PageOffset, BlobNumBytes), Error> {
123 let fixed_row_size = ty.size();
124 let fixed_offset = unsafe { page.alloc_fixed_len(fixed_row_size)? };
127
128 let (mut fixed, var_view) = page.split_fixed_var_mut();
130 let mut serialized = BflatnSerializedRowBuffer {
131 fixed_buf: fixed.get_row_mut(fixed_offset, fixed_row_size),
132 curr_offset: 0,
133 var_view,
134 last_allocated_var_len_index: 0,
135 large_blob_insertions: Vec::new(),
136 };
137
138 if let Err(e) = serialized.write_product(ty.product(), val) {
140 unsafe { serialized.roll_back_var_len_allocations(visitor) };
142 unsafe { fixed.free(fixed_offset, fixed_row_size) };
146 return Err(e);
147 }
148
149 let blob_store_inserted_bytes = serialized.write_large_blobs(blob_store);
151
152 Ok((fixed_offset, blob_store_inserted_bytes))
153}
154
155struct BflatnSerializedRowBuffer<'page> {
157 fixed_buf: &'page mut Bytes,
160
161 curr_offset: usize,
165
166 last_allocated_var_len_index: usize,
168
169 large_blob_insertions: Vec<(VarLenRef, Vec<u8>)>,
173
174 var_view: VarView<'page>,
176}
177
178impl BflatnSerializedRowBuffer<'_> {
179 unsafe fn roll_back_var_len_allocations(&mut self, visitor: &impl VarLenMembers) {
185 let visitor_iter = unsafe { visitor.visit_var_len(self.fixed_buf) };
191 for vlr in visitor_iter.take(self.last_allocated_var_len_index) {
192 unsafe { self.var_view.free_object_ignore_blob(*vlr) };
196 }
197 }
198
199 fn write_large_blobs(mut self, blob_store: &mut dyn BlobStore) -> BlobNumBytes {
201 let mut blob_store_inserted_bytes = BlobNumBytes::default();
202 for (vlr, value) in self.large_blob_insertions {
203 unsafe {
208 blob_store_inserted_bytes += self.var_view.write_large_blob_hash_to_granule(blob_store, &value, vlr);
209 }
210 }
211 blob_store_inserted_bytes
212 }
213
214 fn write_value(&mut self, ty: &AlgebraicTypeLayout, val: &AlgebraicValue) -> Result<(), Error> {
216 debug_assert_eq!(
217 self.curr_offset,
218 align_to(self.curr_offset, ty.align()),
219 "curr_offset {} insufficiently aligned for type {:#?}",
220 self.curr_offset,
221 val,
222 );
223
224 match (ty, val) {
225 (AlgebraicTypeLayout::Sum(ty), AlgebraicValue::Sum(val)) => self.write_sum(ty, val)?,
229 (AlgebraicTypeLayout::Product(ty), AlgebraicValue::Product(val)) => self.write_product(ty.view(), val)?,
231
232 (&AlgebraicTypeLayout::Bool, AlgebraicValue::Bool(val)) => self.write_bool(*val),
234 (&AlgebraicTypeLayout::I8, AlgebraicValue::I8(val)) => self.write_i8(*val),
236 (&AlgebraicTypeLayout::U8, AlgebraicValue::U8(val)) => self.write_u8(*val),
237 (&AlgebraicTypeLayout::I16, AlgebraicValue::I16(val)) => self.write_i16(*val),
238 (&AlgebraicTypeLayout::U16, AlgebraicValue::U16(val)) => self.write_u16(*val),
239 (&AlgebraicTypeLayout::I32, AlgebraicValue::I32(val)) => self.write_i32(*val),
240 (&AlgebraicTypeLayout::U32, AlgebraicValue::U32(val)) => self.write_u32(*val),
241 (&AlgebraicTypeLayout::I64, AlgebraicValue::I64(val)) => self.write_i64(*val),
242 (&AlgebraicTypeLayout::U64, AlgebraicValue::U64(val)) => self.write_u64(*val),
243 (&AlgebraicTypeLayout::I128, AlgebraicValue::I128(val)) => self.write_i128(val.0),
244 (&AlgebraicTypeLayout::U128, AlgebraicValue::U128(val)) => self.write_u128(val.0),
245 (&AlgebraicTypeLayout::I256, AlgebraicValue::I256(val)) => self.write_i256(**val),
246 (&AlgebraicTypeLayout::U256, AlgebraicValue::U256(val)) => self.write_u256(**val),
247 (&AlgebraicTypeLayout::F32, AlgebraicValue::F32(val)) => self.write_f32((*val).into()),
249 (&AlgebraicTypeLayout::F64, AlgebraicValue::F64(val)) => self.write_f64((*val).into()),
250
251 (&AlgebraicTypeLayout::String, AlgebraicValue::String(val)) => self.write_string(val)?,
254
255 (AlgebraicTypeLayout::VarLen(VarLenType::Array(_)), val @ AlgebraicValue::Array(_)) => {
258 self.write_av_bsatn(val)?
259 }
260
261 (ty, val) => Err(Error::WrongType(ty.algebraic_type(), val.clone()))?,
263 }
264
265 Ok(())
266 }
267
268 fn write_sum(&mut self, ty: &SumTypeLayout, val: &SumValue) -> Result<(), Error> {
270 let SumValue { tag, ref value } = *val;
272 let variant_ty = &ty.variants[tag as usize];
273 let variant_offset = self.curr_offset + ty.offset_of_variant_data(tag);
274 let tag_offset = self.curr_offset + ty.offset_of_tag();
275
276 self.curr_offset = variant_offset;
278 self.write_value(&variant_ty.ty, value)?;
279
280 self.curr_offset = tag_offset;
282 self.write_u8(tag);
283
284 Ok(())
285 }
286
287 fn write_product(&mut self, ty: ProductTypeLayoutView<'_>, val: &ProductValue) -> Result<(), Error> {
289 if ty.elements.len() != val.elements.len() {
296 return Err(Error::WrongType(
297 ty.algebraic_type(),
298 AlgebraicValue::Product(val.clone()),
299 ));
300 }
301
302 let base_offset = self.curr_offset;
303
304 for (elt_ty, elt) in ty.elements.iter().zip(val.elements.iter()) {
305 self.curr_offset = base_offset + elt_ty.offset as usize;
306 self.write_value(&elt_ty.ty, elt)?;
307 }
308 Ok(())
309 }
310
311 fn write_string(&mut self, val: &str) -> Result<(), Error> {
314 let val = val.as_bytes();
315
316 let (vlr, in_blob) = self.var_view.alloc_for_slice(val)?;
318 if in_blob {
319 self.defer_insert_large_blob(vlr, val.to_vec());
320 }
321
322 self.write_var_len_ref(vlr);
324 Ok(())
325 }
326
327 fn write_av_bsatn(&mut self, val: &AlgebraicValue) -> Result<(), Error> {
330 let len_in_bytes = bsatn_len(val);
332 let (vlr, in_blob) = self.var_view.alloc_for_len(len_in_bytes)?;
333
334 self.write_var_len_ref(vlr);
336
337 if in_blob {
338 let mut bytes = Vec::with_capacity(len_in_bytes);
341 val.encode(&mut bytes);
342 self.defer_insert_large_blob(vlr, bytes);
343 } else {
344 let iter = unsafe { self.var_view.granule_offset_iter(vlr.first_granule) };
350 let mut writer = GranuleBufWriter { buf: None, iter };
351 to_writer(&mut writer, val).unwrap();
352 }
353
354 struct GranuleBufWriter<'vv, 'page> {
356 buf: Option<(PageOffset, usize)>,
359 iter: GranuleOffsetIter<'page, 'vv>,
361 }
362 impl BufWriter for GranuleBufWriter<'_, '_> {
363 fn put_slice(&mut self, mut slice: &[u8]) {
364 while !slice.is_empty() {
365 let (offset, start) = match self.buf.take() {
366 Some(buf @ (_, start)) if start < VarLenGranule::DATA_SIZE => buf,
368 _ => {
370 let next = self.iter.next();
371 debug_assert!(next.is_some());
372 let next = unsafe { next.unwrap_unchecked() };
375 (next, 0)
376 }
377 };
378
379 let capacity_remains = VarLenGranule::DATA_SIZE - start;
382 debug_assert!(capacity_remains > 0);
383 let extend_len = capacity_remains.min(slice.len());
384 let (extend_with, rest) = slice.split_at(extend_len);
385 let write_to = unsafe { self.iter.get_mut_data(offset, start) };
390
391 for (to, byte) in write_to.iter_mut().zip(extend_with) {
393 *to = *byte;
394 }
395
396 slice = rest;
397 self.buf = Some((offset, start + extend_len));
398 }
399 }
400 }
401
402 Ok(())
403 }
404
405 fn write_var_len_ref(&mut self, val: VarLenRef) {
407 self.write_u16(val.length_in_bytes);
408 self.write_u16(val.first_granule.0);
409
410 self.last_allocated_var_len_index += 1;
413 }
414
415 fn defer_insert_large_blob(&mut self, vlr: VarLenRef, obj_bytes: Vec<u8>) {
417 self.large_blob_insertions.push((vlr, obj_bytes));
418 }
419
420 fn write_bytes<const N: usize>(&mut self, bytes: &[u8; N]) {
423 self.fixed_buf[range_move(0..N, self.curr_offset)].copy_from_slice(bytes);
424 self.curr_offset += N;
425 }
426
427 fn write_u8(&mut self, val: u8) {
429 self.write_bytes(&[val]);
430 }
431
432 fn write_i8(&mut self, val: i8) {
434 self.write_u8(val as u8);
435 }
436
437 fn write_bool(&mut self, val: bool) {
439 self.write_u8(val as u8);
440 }
441
442 fn write_u16(&mut self, val: u16) {
444 self.write_bytes(&val.to_le_bytes());
445 }
446
447 fn write_i16(&mut self, val: i16) {
449 self.write_bytes(&val.to_le_bytes());
450 }
451
452 fn write_u32(&mut self, val: u32) {
454 self.write_bytes(&val.to_le_bytes());
455 }
456
457 fn write_i32(&mut self, val: i32) {
459 self.write_bytes(&val.to_le_bytes());
460 }
461
462 fn write_u64(&mut self, val: u64) {
464 self.write_bytes(&val.to_le_bytes());
465 }
466
467 fn write_i64(&mut self, val: i64) {
469 self.write_bytes(&val.to_le_bytes());
470 }
471
472 fn write_u128(&mut self, val: u128) {
474 self.write_bytes(&val.to_le_bytes());
475 }
476
477 fn write_i128(&mut self, val: i128) {
479 self.write_bytes(&val.to_le_bytes());
480 }
481
482 fn write_u256(&mut self, val: u256) {
484 self.write_bytes(&val.to_le_bytes());
485 }
486
487 fn write_i256(&mut self, val: i256) {
489 self.write_bytes(&val.to_le_bytes());
490 }
491
492 fn write_f32(&mut self, val: f32) {
494 self.write_bytes(&val.to_le_bytes());
495 }
496
497 fn write_f64(&mut self, val: f64) {
499 self.write_bytes(&val.to_le_bytes());
500 }
501}
502
503fn required_var_len_granules_for_row(val: &ProductValue) -> usize {
505 fn traverse_av(val: &AlgebraicValue, count: &mut usize) {
506 match val {
507 AlgebraicValue::Product(val) => traverse_product(val, count),
508 AlgebraicValue::Sum(val) => traverse_av(&val.value, count),
509 AlgebraicValue::Array(_) => add_for_bytestring(bsatn_len(val), count),
510 AlgebraicValue::String(val) => add_for_bytestring(val.len(), count),
511 _ => (),
512 }
513 }
514
515 fn traverse_product(val: &ProductValue, count: &mut usize) {
516 for elt in val {
517 traverse_av(elt, count);
518 }
519 }
520
521 fn add_for_bytestring(len_in_bytes: usize, count: &mut usize) {
522 *count += VarLenGranule::bytes_to_granules(len_in_bytes).0;
523 }
524
525 let mut required_granules: usize = 0;
526 traverse_product(val, &mut required_granules);
527 required_granules
528}
529
530fn bsatn_len(val: &AlgebraicValue) -> usize {
532 bsatn::to_len(val).unwrap()
537}
538
539#[cfg(test)]
540pub mod test {
541 use super::*;
542 use crate::{
543 bflatn_from::serialize_row_from_page, blob_store::HashMapBlobStore, page::tests::hash_unmodified_save_get,
544 row_type_visitor::row_type_visitor,
545 };
546 use proptest::{prelude::*, prop_assert_eq, proptest};
547 use spacetimedb_sats::algebraic_value::ser::ValueSerializer;
548 use spacetimedb_sats::proptest::generate_typed_row;
549
550 proptest! {
551 #![proptest_config(ProptestConfig::with_cases(if cfg!(miri) { 8 } else { 2048 }))]
552 #[test]
553 fn av_serde_round_trip_through_page((ty, val) in generate_typed_row()) {
554 let ty: RowTypeLayout = ty.into();
555 let mut page = Page::new(ty.size());
556 let visitor = row_type_visitor(&ty);
557 let blob_store = &mut HashMapBlobStore::default();
558
559 let hash_pre_ins = hash_unmodified_save_get(&mut page);
560
561 let (offset, _) = unsafe { write_row_to_page(&mut page, blob_store, &visitor, &ty, &val).unwrap() };
562
563 let hash_pre_ser = hash_unmodified_save_get(&mut page);
564 assert_ne!(hash_pre_ins, hash_pre_ser);
565
566 let read_val = unsafe { serialize_row_from_page(ValueSerializer, &page, blob_store, offset, &ty) }
567 .unwrap().into_product().unwrap();
568
569 prop_assert_eq!(val, read_val);
570 assert_eq!(hash_pre_ser, *page.unmodified_hash().unwrap());
571 }
572 }
573}