spacetimedb_table/
bflatn_to.rs

1//! Provides the functions [`write_row_to_pages(pages, blob_store, ty, val)`]
2//! and [`write_row_to_page(page, blob_store, visitor, ty, val)`]
3//! which write `val: ProductValue` typed at `ty` to `page` and `pages` respectively.
4
5use crate::layout::ProductTypeLayoutView;
6
7use super::{
8    blob_store::BlobStore,
9    indexes::{Bytes, PageOffset, RowPointer, SquashedOffset},
10    layout::{
11        align_to, bsatn_len, required_var_len_granules_for_row, AlgebraicTypeLayout, HasLayout, RowTypeLayout,
12        SumTypeLayout, VarLenType,
13    },
14    page::{GranuleOffsetIter, Page, VarView},
15    page_pool::PagePool,
16    pages::Pages,
17    table::BlobNumBytes,
18    util::range_move,
19    var_len::{VarLenGranule, VarLenMembers, VarLenRef},
20};
21use spacetimedb_sats::{
22    bsatn::{self, to_writer, DecodeError},
23    buffer::BufWriter,
24    de::DeserializeSeed as _,
25    i256, u256, AlgebraicType, AlgebraicValue, ProductValue, SumValue,
26};
27use thiserror::Error;
28
29#[derive(Error, Debug, PartialEq, Eq)]
30pub enum Error {
31    #[error(transparent)]
32    Decode(#[from] DecodeError),
33    #[error("Expected a value of type {0:?}, but found {1:?}")]
34    WrongType(AlgebraicType, AlgebraicValue),
35    #[error(transparent)]
36    PageError(#[from] super::page::Error),
37    #[error(transparent)]
38    PagesError(#[from] super::pages::Error),
39}
40
41/// Writes `row` typed at `ty` to `pages`
42/// using `blob_store` as needed to write large blobs.
43///
44/// Panics if `val` is not of type `ty`.
45///
46/// # Safety
47///
48/// `pages` must be specialized to store rows of `ty`.
49/// This includes that its `visitor` must be prepared to visit var-len members within `ty`,
50/// and must do so in the same order as a `VarLenVisitorProgram` for `ty` would,
51/// i.e. by monotonically increasing offsets.
52pub unsafe fn write_row_to_pages_bsatn(
53    pool: &PagePool,
54    pages: &mut Pages,
55    visitor: &impl VarLenMembers,
56    blob_store: &mut dyn BlobStore,
57    ty: &RowTypeLayout,
58    mut bytes: &[u8],
59    squashed_offset: SquashedOffset,
60) -> Result<(RowPointer, BlobNumBytes), Error> {
61    let val = ty.product().deserialize(bsatn::Deserializer::new(&mut bytes))?;
62    unsafe { write_row_to_pages(pool, pages, visitor, blob_store, ty, &val, squashed_offset) }
63}
64
65/// Writes `row` typed at `ty` to `pages`
66/// using `blob_store` as needed to write large blobs.
67///
68/// Panics if `val` is not of type `ty`.
69///
70/// # Safety
71///
72/// `pages` must be specialized to store rows of `ty`.
73/// This includes that its `visitor` must be prepared to visit var-len members within `ty`,
74/// and must do so in the same order as a `VarLenVisitorProgram` for `ty` would,
75/// i.e. by monotonically increasing offsets.
76pub unsafe fn write_row_to_pages(
77    pool: &PagePool,
78    pages: &mut Pages,
79    visitor: &impl VarLenMembers,
80    blob_store: &mut dyn BlobStore,
81    ty: &RowTypeLayout,
82    val: &ProductValue,
83    squashed_offset: SquashedOffset,
84) -> Result<(RowPointer, BlobNumBytes), Error> {
85    let num_granules = required_var_len_granules_for_row(val);
86
87    match pages.with_page_to_insert_row(pool, ty.size(), num_granules, |page| {
88        // SAFETY:
89        // - Caller promised that `pages` is suitable for storing instances of `ty`
90        //   so `page` is also suitable.
91        // - Caller promised that `visitor` is prepared to visit for `ty`
92        //   and in the same order as a `VarLenVisitorProgram` for `ty` would.
93        // - `visitor` came from `pages` which we can trust to visit in the right order.
94        unsafe { write_row_to_page(page, blob_store, visitor, ty, val) }
95    })? {
96        (page, Ok((offset, blob_inserted))) => {
97            Ok((RowPointer::new(false, page, offset, squashed_offset), blob_inserted))
98        }
99        (_, Err(e)) => Err(e),
100    }
101}
102
103/// Writes `row` typed at `ty` to `page`
104/// using `blob_store` as needed to write large blobs
105/// and `visitor` to fixup var-len pointers in the fixed-len row part.
106///
107/// Panics if `val` is not of type `ty`.
108///
109/// # Safety
110///
111/// - `page` must be prepared to store instances of `ty`.
112///
113/// - `visitor` must be prepared to visit var-len members within `ty`,
114///   and must do so in the same order as a `VarLenVisitorProgram` for `ty` would,
115///   i.e. by monotonically increasing offsets.
116///
117/// - `page` must use a var-len visitor which visits the same var-len members in the same order.
118pub unsafe fn write_row_to_page(
119    page: &mut Page,
120    blob_store: &mut dyn BlobStore,
121    visitor: &impl VarLenMembers,
122    ty: &RowTypeLayout,
123    val: &ProductValue,
124) -> Result<(PageOffset, BlobNumBytes), Error> {
125    let fixed_row_size = ty.size();
126    // SAFETY: We've used the right `row_size` and we trust that others have too.
127    // `RowTypeLayout` also ensures that we satisfy the minimum row size.
128    let fixed_offset = unsafe { page.alloc_fixed_len(fixed_row_size)? };
129
130    // Create the context for writing to `page`.
131    let (mut fixed, var_view) = page.split_fixed_var_mut();
132    let mut serialized = BflatnSerializedRowBuffer {
133        fixed_buf: fixed.get_row_mut(fixed_offset, fixed_row_size),
134        curr_offset: 0,
135        var_view,
136        last_allocated_var_len_index: 0,
137        large_blob_insertions: Vec::new(),
138    };
139
140    // Write the row to the page. Roll back on any failure.
141    if let Err(e) = serialized.write_product(ty.product(), val) {
142        // SAFETY: The `visitor` is proper for the row type per caller requirements.
143        unsafe { serialized.roll_back_var_len_allocations(visitor) };
144        // SAFETY:
145        // - `fixed_offset` came from `alloc_fixed_len` so it is in bounds of `page`.
146        // - `RowTypeLayout::size()` ensures `fixed_offset` is properly aligned for `FreeCellRef`.
147        unsafe { fixed.free(fixed_offset, fixed_row_size) };
148        return Err(e);
149    }
150
151    // Haven't stored large blobs or init those granules with blob hashes yet, so do it now.
152    let blob_store_inserted_bytes = serialized.write_large_blobs(blob_store);
153
154    Ok((fixed_offset, blob_store_inserted_bytes))
155}
156
157/// The writing / serialization context used by the function [`write_row_to_page`].
158struct BflatnSerializedRowBuffer<'page> {
159    /// The work-in-progress fixed part of the row,
160    /// allocated inside the page.
161    fixed_buf: &'page mut Bytes,
162
163    /// The current offset into `fixed_buf` at which we are writing.
164    ///
165    /// The various writing methods will advance `curr_offset`.
166    curr_offset: usize,
167
168    /// The number of inserted var-len objects to the page.
169    last_allocated_var_len_index: usize,
170
171    /// The deferred large-blob insertions
172    /// with `Vec<u8>` being the blob bytes to insert to the blob store
173    /// and the `VarLenRef` being the destination to write the blob hash.
174    large_blob_insertions: Vec<(VarLenRef, Vec<u8>)>,
175
176    /// The mutable view of the variable section of the page.
177    var_view: VarView<'page>,
178}
179
180impl BflatnSerializedRowBuffer<'_> {
181    /// Rolls back all the var-len allocations made when writing the row.
182    ///
183    /// # Safety
184    ///
185    /// The `visitor` must be proper for the row type.
186    unsafe fn roll_back_var_len_allocations(&mut self, visitor: &impl VarLenMembers) {
187        // SAFETY:
188        // - `fixed_buf` is properly aligned for the row type
189        //    and `fixed_buf.len()` matches exactly the size of the row type.
190        // - `fixed_buf`'s `VarLenRef`s are initialized up to `last_allocated_var_len_index`.
191        // - `visitor` is proper for the row type.
192        let visitor_iter = unsafe { visitor.visit_var_len(self.fixed_buf) };
193        for vlr in visitor_iter.take(self.last_allocated_var_len_index) {
194            // SAFETY: The `vlr` came from the allocation in `write_var_len_obj`
195            // which wrote it to the fixed part using `write_var_len_ref`.
196            // Thus, it points to a valid `VarLenGranule`.
197            unsafe { self.var_view.free_object_ignore_blob(*vlr) };
198        }
199    }
200
201    /// Insert all large blobs into `blob_store` and their hashes to their granules.
202    fn write_large_blobs(mut self, blob_store: &mut dyn BlobStore) -> BlobNumBytes {
203        let mut blob_store_inserted_bytes = BlobNumBytes::default();
204        for (vlr, value) in self.large_blob_insertions {
205            // SAFETY: `vlr` was given to us by `alloc_for_slice`
206            // so it is properly aligned for a `VarLenGranule` and in bounds of the page.
207            // However, as it was added to `self.large_blob_insertions`,
208            // we have not yet written the hash to that granule.
209            unsafe {
210                blob_store_inserted_bytes += self.var_view.write_large_blob_hash_to_granule(blob_store, &value, vlr);
211            }
212        }
213        blob_store_inserted_bytes
214    }
215
216    /// Write an `val`, an [`AlgebraicValue`], typed at `ty`, to the buffer.
217    fn write_value(&mut self, ty: &AlgebraicTypeLayout, val: &AlgebraicValue) -> Result<(), Error> {
218        debug_assert_eq!(
219            self.curr_offset,
220            align_to(self.curr_offset, ty.align()),
221            "curr_offset {} insufficiently aligned for type {:#?}",
222            self.curr_offset,
223            val,
224        );
225
226        match (ty, val) {
227            // For sums, select the type based on the sum tag,
228            // write the variant data given the variant type,
229            // and finally write the tag.
230            (AlgebraicTypeLayout::Sum(ty), AlgebraicValue::Sum(val)) => self.write_sum(ty, val)?,
231            // For products, write every element in order.
232            (AlgebraicTypeLayout::Product(ty), AlgebraicValue::Product(val)) => self.write_product(ty.view(), val)?,
233
234            // For primitive types, write their contents by LE-encoding.
235            (&AlgebraicTypeLayout::Bool, AlgebraicValue::Bool(val)) => self.write_bool(*val),
236            // Integer types:
237            (&AlgebraicTypeLayout::I8, AlgebraicValue::I8(val)) => self.write_i8(*val),
238            (&AlgebraicTypeLayout::U8, AlgebraicValue::U8(val)) => self.write_u8(*val),
239            (&AlgebraicTypeLayout::I16, AlgebraicValue::I16(val)) => self.write_i16(*val),
240            (&AlgebraicTypeLayout::U16, AlgebraicValue::U16(val)) => self.write_u16(*val),
241            (&AlgebraicTypeLayout::I32, AlgebraicValue::I32(val)) => self.write_i32(*val),
242            (&AlgebraicTypeLayout::U32, AlgebraicValue::U32(val)) => self.write_u32(*val),
243            (&AlgebraicTypeLayout::I64, AlgebraicValue::I64(val)) => self.write_i64(*val),
244            (&AlgebraicTypeLayout::U64, AlgebraicValue::U64(val)) => self.write_u64(*val),
245            (&AlgebraicTypeLayout::I128, AlgebraicValue::I128(val)) => self.write_i128(val.0),
246            (&AlgebraicTypeLayout::U128, AlgebraicValue::U128(val)) => self.write_u128(val.0),
247            (&AlgebraicTypeLayout::I256, AlgebraicValue::I256(val)) => self.write_i256(**val),
248            (&AlgebraicTypeLayout::U256, AlgebraicValue::U256(val)) => self.write_u256(**val),
249            // Float types:
250            (&AlgebraicTypeLayout::F32, AlgebraicValue::F32(val)) => self.write_f32((*val).into()),
251            (&AlgebraicTypeLayout::F64, AlgebraicValue::F64(val)) => self.write_f64((*val).into()),
252
253            // For strings, we reserve space for a `VarLenRef`
254            // and push the bytes as a var-len object.
255            (&AlgebraicTypeLayout::String, AlgebraicValue::String(val)) => self.write_string(val)?,
256
257            // For array and maps, we reserve space for a `VarLenRef`
258            // and push the bytes, after BSATN encoding, as a var-len object.
259            (AlgebraicTypeLayout::VarLen(VarLenType::Array(_)), val @ AlgebraicValue::Array(_)) => {
260                self.write_av_bsatn(val)?
261            }
262
263            // If the type doesn't match the value, return an error.
264            (ty, val) => Err(Error::WrongType(ty.algebraic_type(), val.clone()))?,
265        }
266
267        Ok(())
268    }
269
270    /// Write a `val`, a [`SumValue`], typed at `ty`, to the buffer.
271    fn write_sum(&mut self, ty: &SumTypeLayout, val: &SumValue) -> Result<(), Error> {
272        // Extract sum value components and variant type, and offsets.
273        let SumValue { tag, ref value } = *val;
274        let variant_ty = &ty.variants[tag as usize];
275        let variant_offset = self.curr_offset + ty.offset_of_variant_data(tag);
276        let tag_offset = self.curr_offset + ty.offset_of_tag();
277
278        // Write the variant value at `variant_offset`.
279        self.curr_offset = variant_offset;
280        self.write_value(&variant_ty.ty, value)?;
281
282        // Write the variant value at `tag_offset`.
283        self.curr_offset = tag_offset;
284        self.write_u8(tag);
285
286        Ok(())
287    }
288
289    /// Write an `val`, a [`ProductValue`], typed at `ty`, to the buffer.
290    fn write_product(&mut self, ty: ProductTypeLayoutView<'_>, val: &ProductValue) -> Result<(), Error> {
291        // `Iterator::zip` silently drops elements if the two iterators have different lengths,
292        // so we need to check that our `ProductValue` has the same number of elements
293        // as our `ProductTypeLayout` to be sure it's typed correctly.
294        // Otherwise, if the value is too long, we'll discard its fields (whatever),
295        // or if it's too long, we'll leave some fields in the page "uninit"
296        // (actually valid-unconstrained) (very bad).
297        if ty.elements.len() != val.elements.len() {
298            return Err(Error::WrongType(
299                ty.algebraic_type(),
300                AlgebraicValue::Product(val.clone()),
301            ));
302        }
303
304        let base_offset = self.curr_offset;
305
306        for (elt_ty, elt) in ty.elements.iter().zip(val.elements.iter()) {
307            self.curr_offset = base_offset + elt_ty.offset as usize;
308            self.write_value(&elt_ty.ty, elt)?;
309        }
310        Ok(())
311    }
312
313    /// Write the string `str` to the var-len section
314    /// and a `VarLenRef` to the fixed buffer and advance the `curr_offset`.
315    fn write_string(&mut self, val: &str) -> Result<(), Error> {
316        let val = val.as_bytes();
317
318        // Write `val` to the page. The handle is `vlr`.
319        let (vlr, in_blob) = self.var_view.alloc_for_slice(val)?;
320        if in_blob {
321            self.defer_insert_large_blob(vlr, val.to_vec());
322        }
323
324        // Write `vlr` to the fixed part.
325        self.write_var_len_ref(vlr);
326        Ok(())
327    }
328
329    /// Write `val` BSATN-encoded to var-len section
330    /// and a `VarLenRef` to the fixed buffer and advance the `curr_offset`.
331    fn write_av_bsatn(&mut self, val: &AlgebraicValue) -> Result<(), Error> {
332        // Allocate space.
333        let len_in_bytes = bsatn_len(val);
334        let (vlr, in_blob) = self.var_view.alloc_for_len(len_in_bytes)?;
335
336        // Write `vlr` to the fixed part.
337        self.write_var_len_ref(vlr);
338
339        if in_blob {
340            // We won't be storing the large blob in the page,
341            // so no point in writing the blob directly to the page.
342            let mut bytes = Vec::with_capacity(len_in_bytes);
343            val.encode(&mut bytes);
344            self.defer_insert_large_blob(vlr, bytes);
345        } else {
346            // Write directly to the page.
347            // SAFETY: `vlr.first_granule` points to a granule
348            // even though the granule's data is not initialized as of yet.
349            // Note that the granule stores valid-unconstrained bytes (i.e. they are not uninit),
350            // but they may be leftovers from a previous allocation.
351            let iter = unsafe { self.var_view.granule_offset_iter(vlr.first_granule) };
352            let mut writer = GranuleBufWriter { buf: None, iter };
353            to_writer(&mut writer, val).unwrap();
354        }
355
356        /// A `BufWriter` that writes directly to a page.
357        struct GranuleBufWriter<'vv, 'page> {
358            /// The offset to the granule being written to
359            /// and how much has been written to it already.
360            buf: Option<(PageOffset, usize)>,
361            /// The iterator for the offsets to all the granule we'll write to.
362            iter: GranuleOffsetIter<'page, 'vv>,
363        }
364        impl BufWriter for GranuleBufWriter<'_, '_> {
365            fn put_slice(&mut self, mut slice: &[u8]) {
366                while !slice.is_empty() {
367                    let (offset, start) = match self.buf.take() {
368                        // Still have some to write to this granule.
369                        Some(buf @ (_, start)) if start < VarLenGranule::DATA_SIZE => buf,
370                        // First granule or the current one is full.
371                        _ => {
372                            let next = self.iter.next();
373                            debug_assert!(next.is_some());
374                            // SAFETY: The iterator length is exactly such that
375                            // `next.is_none() == slice.is_empty()`.
376                            let next = unsafe { next.unwrap_unchecked() };
377                            (next, 0)
378                        }
379                    };
380
381                    // Derive how much we can add to this granule
382                    // and only take that much from `slice`.
383                    let capacity_remains = VarLenGranule::DATA_SIZE - start;
384                    debug_assert!(capacity_remains > 0);
385                    let extend_len = capacity_remains.min(slice.len());
386                    let (extend_with, rest) = slice.split_at(extend_len);
387                    // The section of the granule data to write to.
388                    // SAFETY:
389                    // - `offset` came from `self.iter`, which only yields valid offsets.
390                    // - `start < VarLenGranule::DATA_SIZE` was ensured above.
391                    let write_to = unsafe { self.iter.get_mut_data(offset, start) };
392
393                    // Write to the granule.
394                    for (to, byte) in write_to.iter_mut().zip(extend_with) {
395                        *to = *byte;
396                    }
397
398                    slice = rest;
399                    self.buf = Some((offset, start + extend_len));
400                }
401            }
402        }
403
404        Ok(())
405    }
406
407    /// Write a `VarLenRef` to the fixed buffer and advance the `curr_offset`.
408    fn write_var_len_ref(&mut self, val: VarLenRef) {
409        self.write_u16(val.length_in_bytes);
410        self.write_u16(val.first_granule.0);
411
412        // Keep track of how many var len objects we've added so far
413        // so that we can free them on failure.
414        self.last_allocated_var_len_index += 1;
415    }
416
417    /// Defers the insertion of a large blob to the blob store as well as writing the hash to the granule.
418    fn defer_insert_large_blob(&mut self, vlr: VarLenRef, obj_bytes: Vec<u8>) {
419        self.large_blob_insertions.push((vlr, obj_bytes));
420    }
421
422    /// Write `bytes: &[u8; N]` starting at the current offset
423    /// and advance the offset by `N`.
424    fn write_bytes<const N: usize>(&mut self, bytes: &[u8; N]) {
425        self.fixed_buf[range_move(0..N, self.curr_offset)].copy_from_slice(bytes);
426        self.curr_offset += N;
427    }
428
429    /// Write a `u8` to the fixed buffer and advance the `curr_offset`.
430    fn write_u8(&mut self, val: u8) {
431        self.write_bytes(&[val]);
432    }
433
434    /// Write an `i8` to the fixed buffer and advance the `curr_offset`.
435    fn write_i8(&mut self, val: i8) {
436        self.write_u8(val as u8);
437    }
438
439    /// Write a `bool` to the fixed buffer and advance the `curr_offset`.
440    fn write_bool(&mut self, val: bool) {
441        self.write_u8(val as u8);
442    }
443
444    /// Write a `u16` to the fixed buffer and advance the `curr_offset`.
445    fn write_u16(&mut self, val: u16) {
446        self.write_bytes(&val.to_le_bytes());
447    }
448
449    /// Write an `i16` to the fixed buffer and advance the `curr_offset`.
450    fn write_i16(&mut self, val: i16) {
451        self.write_bytes(&val.to_le_bytes());
452    }
453
454    /// Write a `u32` to the fixed buffer and advance the `curr_offset`.
455    fn write_u32(&mut self, val: u32) {
456        self.write_bytes(&val.to_le_bytes());
457    }
458
459    /// Write an `i32` to the fixed buffer and advance the `curr_offset`.
460    fn write_i32(&mut self, val: i32) {
461        self.write_bytes(&val.to_le_bytes());
462    }
463
464    /// Write a `u64` to the fixed buffer and advance the `curr_offset`.
465    fn write_u64(&mut self, val: u64) {
466        self.write_bytes(&val.to_le_bytes());
467    }
468
469    /// Write an `i64` to the fixed buffer and advance the `curr_offset`.
470    fn write_i64(&mut self, val: i64) {
471        self.write_bytes(&val.to_le_bytes());
472    }
473
474    /// Write a `u128` to the fixed buffer and advance the `curr_offset`.
475    fn write_u128(&mut self, val: u128) {
476        self.write_bytes(&val.to_le_bytes());
477    }
478
479    /// Write an `i128` to the fixed buffer and advance the `curr_offset`.
480    fn write_i128(&mut self, val: i128) {
481        self.write_bytes(&val.to_le_bytes());
482    }
483
484    /// Write a `u256` to the fixed buffer and advance the `curr_offset`.
485    fn write_u256(&mut self, val: u256) {
486        self.write_bytes(&val.to_le_bytes());
487    }
488
489    /// Write an `i256` to the fixed buffer and advance the `curr_offset`.
490    fn write_i256(&mut self, val: i256) {
491        self.write_bytes(&val.to_le_bytes());
492    }
493
494    /// Write a `f32` to the fixed buffer and advance the `curr_offset`.
495    fn write_f32(&mut self, val: f32) {
496        self.write_bytes(&val.to_le_bytes());
497    }
498
499    /// Write a `f64` to the fixed buffer and advance the `curr_offset`.
500    fn write_f64(&mut self, val: f64) {
501        self.write_bytes(&val.to_le_bytes());
502    }
503}
504
505#[cfg(test)]
506pub mod test {
507    use super::*;
508    use crate::{
509        bflatn_from::serialize_row_from_page, blob_store::HashMapBlobStore, page::tests::hash_unmodified_save_get,
510        row_type_visitor::row_type_visitor,
511    };
512    use proptest::{prelude::*, prop_assert_eq, proptest};
513    use spacetimedb_sats::algebraic_value::ser::ValueSerializer;
514    use spacetimedb_sats::proptest::generate_typed_row;
515
516    proptest! {
517        #![proptest_config(ProptestConfig::with_cases(if cfg!(miri) { 8 } else { 2048 }))]
518        #[test]
519        fn av_serde_round_trip_through_page((ty, val) in generate_typed_row()) {
520            let ty: RowTypeLayout = ty.into();
521            let mut page = Page::new(ty.size());
522            let visitor = row_type_visitor(&ty);
523            let blob_store = &mut HashMapBlobStore::default();
524
525            let hash_pre_ins = hash_unmodified_save_get(&mut page);
526
527            let (offset, _) = unsafe { write_row_to_page(&mut page, blob_store, &visitor, &ty, &val).unwrap() };
528
529            let hash_pre_ser = hash_unmodified_save_get(&mut page);
530            assert_ne!(hash_pre_ins, hash_pre_ser);
531
532            let read_val = unsafe { serialize_row_from_page(ValueSerializer, &page, blob_store, offset, &ty) }
533                .unwrap().into_product().unwrap();
534
535            prop_assert_eq!(val, read_val);
536            assert_eq!(hash_pre_ser, *page.unmodified_hash().unwrap());
537        }
538    }
539}