spacetimedb_table/
table.rs

1use crate::{
2    bflatn_to::write_row_to_pages_bsatn,
3    layout::AlgebraicTypeLayout,
4    static_bsatn_validator::{static_bsatn_validator, validate_bsatn, StaticBsatnValidator},
5    table_index::TableIndexPointIter,
6};
7
8use super::{
9    bflatn_from::serialize_row_from_page,
10    bflatn_to::{write_row_to_pages, Error},
11    blob_store::BlobStore,
12    eq::eq_row_in_page,
13    eq_to_pv::eq_row_in_page_to_pv,
14    indexes::{Bytes, PageIndex, PageOffset, RowHash, RowPointer, Size, SquashedOffset, PAGE_DATA_SIZE},
15    layout::RowTypeLayout,
16    page::{FixedLenRowsIter, Page},
17    pages::Pages,
18    pointer_map::PointerMap,
19    read_column::{ReadColumn, TypeError},
20    row_hash::hash_row_in_page,
21    row_type_visitor::{row_type_visitor, VarLenVisitorProgram},
22    static_assert_size,
23    static_layout::StaticLayout,
24    table_index::{TableIndex, TableIndexRangeIter},
25    var_len::VarLenMembers,
26    MemoryUsage,
27};
28use core::ops::RangeBounds;
29use core::{fmt, ptr};
30use core::{
31    hash::{Hash, Hasher},
32    hint::unreachable_unchecked,
33};
34use derive_more::{Add, AddAssign, From, Sub, SubAssign};
35use enum_as_inner::EnumAsInner;
36use smallvec::SmallVec;
37use spacetimedb_lib::{bsatn::DecodeError, de::DeserializeOwned};
38use spacetimedb_primitives::{ColId, ColList, IndexId, SequenceId};
39use spacetimedb_sats::{
40    algebraic_value::ser::ValueSerializer,
41    bsatn::{self, ser::BsatnError, ToBsatn},
42    i256,
43    product_value::InvalidFieldError,
44    satn::Satn,
45    ser::{Serialize, Serializer},
46    u256, AlgebraicValue, ProductType, ProductValue,
47};
48use spacetimedb_schema::{def::IndexAlgorithm, schema::TableSchema, type_for_generate::PrimitiveType};
49use std::{
50    collections::{btree_map, BTreeMap},
51    sync::Arc,
52};
53use thiserror::Error;
54
55/// The number of bytes used by, added to, or removed from a [`Table`]'s share of a [`BlobStore`].
56#[derive(Copy, Clone, PartialEq, Eq, Debug, Default, From, Add, Sub, AddAssign, SubAssign)]
57pub struct BlobNumBytes(usize);
58
59impl MemoryUsage for BlobNumBytes {}
60
61pub type SeqIdList = SmallVec<[SequenceId; 4]>;
62static_assert_size!(SeqIdList, 24);
63
64/// A database table containing the row schema, the rows, and indices.
65///
66/// The table stores the rows into a page manager
67/// and uses an internal map to ensure that no identical row is stored more than once.
68#[derive(Debug, PartialEq, Eq)]
69pub struct Table {
70    /// Page manager and row layout grouped together, for `RowRef` purposes.
71    inner: TableInner,
72    /// Maps `RowHash -> [RowPointer]` where a [`RowPointer`] points into `pages`.
73    /// A [`PointerMap`] is effectively a specialized unique index on all the columns.
74    ///
75    /// In tables without any other unique constraints,
76    /// the pointer map is used to enforce set semantics,
77    /// i.e. to prevent duplicate rows.
78    /// If `self.indexes` contains at least one unique index,
79    /// duplicate rows are impossible regardless, so this will be `None`.
80    pointer_map: Option<PointerMap>,
81    /// The indices associated with a set of columns of the table.
82    pub indexes: BTreeMap<IndexId, TableIndex>,
83    /// The schema of the table, from which the type, and other details are derived.
84    pub schema: Arc<TableSchema>,
85    /// `SquashedOffset::TX_STATE` or `SquashedOffset::COMMITTED_STATE`
86    /// depending on whether this is a tx scratchpad table
87    /// or a committed table.
88    squashed_offset: SquashedOffset,
89    /// Store number of rows present in table.
90    pub row_count: u64,
91    /// Stores the sum total number of bytes that each blob object in the table occupies.
92    ///
93    /// Note that the [`HashMapBlobStore`] does ref-counting and de-duplication,
94    /// but this sum will count an object each time its hash is mentioned, rather than just once.
95    blob_store_bytes: BlobNumBytes,
96    /// Indicates whether this is a scheduler table or not.
97    ///
98    /// This is an optimization to avoid checking the schema in e.g., `InstanceEnv::{insert, update}`.
99    is_scheduler: bool,
100}
101
102/// The part of a `Table` concerned only with storing rows.
103///
104/// Separated from the "outer" parts of `Table`, especially the `indexes`,
105/// so that `RowRef` can borrow only the `TableInner`,
106/// while other mutable references to the `indexes` exist.
107/// This is necessary because index insertions and deletions take a `RowRef` as an argument,
108/// from which they [`ReadColumn::read_column`] their keys.
109#[derive(Debug, PartialEq, Eq)]
110pub(crate) struct TableInner {
111    /// The type of rows this table stores, with layout information included.
112    row_layout: RowTypeLayout,
113    /// A [`StaticLayout`] for fast BFLATN <-> BSATN conversion,
114    /// if the [`RowTypeLayout`] has a static BSATN length and layout.
115    ///
116    /// A [`StaticBsatnValidator`] is also included.
117    /// It's used to validate BSATN-encoded rows before converting to BFLATN.
118    static_layout: Option<(StaticLayout, StaticBsatnValidator)>,
119    /// The visitor program for `row_layout`.
120    ///
121    /// Must be in the `TableInner` so that [`RowRef::blob_store_bytes`] can use it.
122    visitor_prog: VarLenVisitorProgram,
123    /// The page manager that holds rows
124    /// including both their fixed and variable components.
125    pages: Pages,
126}
127
128impl TableInner {
129    /// Assumes `ptr` is a present row in `self` and returns a [`RowRef`] to it.
130    ///
131    /// # Safety
132    ///
133    /// The requirement is that `table.is_row_present(ptr)` must hold,
134    /// where `table` is the `Table` which contains this `TableInner`.
135    /// That is, `ptr` must refer to a row within `self`
136    /// which was previously inserted and has not been deleted since.
137    ///
138    /// This means:
139    /// - The `PageIndex` of `ptr` must be in-bounds for `self.pages`.
140    /// - The `PageOffset` of `ptr` must be properly aligned for the row type of `self`,
141    ///   and must refer to a valid, live row in that page.
142    /// - The `SquashedOffset` of `ptr` must match the enclosing table's `table.squashed_offset`.
143    ///
144    /// Showing that `ptr` was the result of a call to [`Table::insert(table, ..)`]
145    /// and has not been passed to [`Table::delete(table, ..)`]
146    /// is sufficient to demonstrate all of these properties.
147    unsafe fn get_row_ref_unchecked<'a>(&'a self, blob_store: &'a dyn BlobStore, ptr: RowPointer) -> RowRef<'a> {
148        // SAFETY: Forward caller requirements.
149        unsafe { RowRef::new(self, blob_store, ptr) }
150    }
151
152    fn try_page_and_offset(&self, ptr: RowPointer) -> Option<(&Page, PageOffset)> {
153        (ptr.page_index().idx() < self.pages.len()).then(|| (&self.pages[ptr.page_index()], ptr.page_offset()))
154    }
155
156    /// Returns the page and page offset that `ptr` points to.
157    fn page_and_offset(&self, ptr: RowPointer) -> (&Page, PageOffset) {
158        self.try_page_and_offset(ptr).unwrap()
159    }
160}
161
162static_assert_size!(Table, 264);
163
164impl MemoryUsage for Table {
165    fn heap_usage(&self) -> usize {
166        let Self {
167            inner,
168            pointer_map,
169            indexes,
170            // MEMUSE: intentionally ignoring schema
171            schema: _,
172            squashed_offset,
173            row_count,
174            blob_store_bytes,
175            is_scheduler,
176        } = self;
177        inner.heap_usage()
178            + pointer_map.heap_usage()
179            + indexes.heap_usage()
180            + squashed_offset.heap_usage()
181            + row_count.heap_usage()
182            + blob_store_bytes.heap_usage()
183            + is_scheduler.heap_usage()
184    }
185}
186
187impl MemoryUsage for TableInner {
188    fn heap_usage(&self) -> usize {
189        let Self {
190            row_layout,
191            static_layout,
192            visitor_prog,
193            pages,
194        } = self;
195        row_layout.heap_usage() + static_layout.heap_usage() + visitor_prog.heap_usage() + pages.heap_usage()
196    }
197}
198
199/// There was already a row with the same value.
200#[derive(Error, Debug, PartialEq, Eq)]
201#[error("Duplicate insertion of row {0:?} violates set semantics")]
202pub struct DuplicateError(pub RowPointer);
203
204/// Various error that can happen on table insertion.
205#[derive(Error, Debug, PartialEq, Eq, EnumAsInner)]
206pub enum InsertError {
207    /// There was already a row with the same value.
208    #[error(transparent)]
209    Duplicate(#[from] DuplicateError),
210
211    /// Couldn't write the row to the page manager.
212    #[error(transparent)]
213    Bflatn(#[from] super::bflatn_to::Error),
214
215    /// Some index related error occurred.
216    #[error(transparent)]
217    IndexError(#[from] UniqueConstraintViolation),
218}
219
220/// Errors that can occur while trying to read a value via bsatn.
221#[derive(Error, Debug)]
222pub enum ReadViaBsatnError {
223    #[error(transparent)]
224    BSatnError(#[from] BsatnError),
225
226    #[error(transparent)]
227    DecodeError(#[from] DecodeError),
228}
229
230// Public API:
231impl Table {
232    /// Creates a new empty table with the given `schema` and `squashed_offset`.
233    pub fn new(schema: Arc<TableSchema>, squashed_offset: SquashedOffset) -> Self {
234        let row_layout: RowTypeLayout = schema.get_row_type().clone().into();
235        let static_layout = StaticLayout::for_row_type(&row_layout).map(|sl| (sl, static_bsatn_validator(&row_layout)));
236        let visitor_prog = row_type_visitor(&row_layout);
237        // By default, we start off with an empty pointer map,
238        // which is removed when the first unique index is added.
239        let pm = Some(PointerMap::default());
240        Self::new_with_indexes_capacity(schema, row_layout, static_layout, visitor_prog, squashed_offset, pm)
241    }
242
243    /// Returns whether this is a scheduler table.
244    pub fn is_scheduler(&self) -> bool {
245        self.is_scheduler
246    }
247
248    /// Check if the `row` conflicts with any unique index on `self`,
249    /// and if there is a conflict, return `Err`.
250    ///
251    /// `is_deleted` is a predicate which, for a given row pointer,
252    /// returns true if and only if that row should be ignored.
253    /// While checking unique constraints against the committed state,
254    /// `MutTxId::insert` will ignore rows which are listed in the delete table.
255    ///
256    /// # Safety
257    ///
258    /// `row.row_layout() == self.row_layout()` must hold.
259    pub unsafe fn check_unique_constraints<'a, I: Iterator<Item = (&'a IndexId, &'a TableIndex)>>(
260        &'a self,
261        row: RowRef<'_>,
262        adapt: impl FnOnce(btree_map::Iter<'a, IndexId, TableIndex>) -> I,
263        mut is_deleted: impl FnMut(RowPointer) -> bool,
264    ) -> Result<(), UniqueConstraintViolation> {
265        for (&index_id, index) in adapt(self.indexes.iter()).filter(|(_, index)| index.is_unique()) {
266            // SAFETY: Caller promised that `row´ has the same layout as `self`.
267            // Thus, as `index.indexed_columns` is in-bounds of `self`'s layout,
268            // it's also in-bounds of `row`'s layout.
269            let value = unsafe { row.project_unchecked(&index.indexed_columns) };
270            if index.seek_point(&value).next().is_some_and(|ptr| !is_deleted(ptr)) {
271                return Err(self.build_error_unique(index, index_id, value));
272            }
273        }
274        Ok(())
275    }
276
277    /// Insert a `row` into this table, storing its large var-len members in the `blob_store`.
278    ///
279    /// On success, returns the hash, if any, of the newly-inserted row,
280    /// and a `RowRef` referring to the row.s
281    /// The hash is only computed if this table has a [`PointerMap`],
282    /// i.e., does not have any unique indexes.
283    /// If the table has unique indexes,
284    /// the returned `Option<RowHash>` will be `None`.
285    ///
286    /// When a row equal to `row` already exists in `self`,
287    /// returns `InsertError::Duplicate(existing_row_pointer)`,
288    /// where `existing_row_pointer` is a `RowPointer` which identifies the existing row.
289    /// In this case, the duplicate is not inserted,
290    /// but internal data structures may be altered in ways that affect performance and fragmentation.
291    ///
292    /// TODO(error-handling): describe errors from `write_row_to_pages` and return meaningful errors.
293    pub fn insert<'a>(
294        &'a mut self,
295        blob_store: &'a mut dyn BlobStore,
296        row: &ProductValue,
297    ) -> Result<(Option<RowHash>, RowRef<'a>), InsertError> {
298        // Optimistically insert the `row` before checking any constraints
299        // under the assumption that errors (unique constraint & set semantic violations) are rare.
300        let (row_ref, blob_bytes) = self.insert_physically_pv(blob_store, row)?;
301        let row_ptr = row_ref.pointer();
302
303        // Confirm the insertion, checking any constraints, removing the physical row on error.
304        // SAFETY: We just inserted `ptr`, so it must be present.
305        let (hash, row_ptr) = unsafe { self.confirm_insertion(blob_store, row_ptr, blob_bytes) }?;
306        // SAFETY: Per post-condition of `confirm_insertion`, `row_ptr` refers to a valid row.
307        let row_ref = unsafe { self.inner.get_row_ref_unchecked(blob_store, row_ptr) };
308        Ok((hash, row_ref))
309    }
310
311    /// Physically inserts `row` into the page
312    /// without inserting it logically into the pointer map.
313    ///
314    /// This is useful when we need to insert a row temporarily to get back a `RowPointer`.
315    /// A call to this method should be followed by a call to [`delete_internal_skip_pointer_map`].
316    pub fn insert_physically_pv<'a>(
317        &'a mut self,
318        blob_store: &'a mut dyn BlobStore,
319        row: &ProductValue,
320    ) -> Result<(RowRef<'a>, BlobNumBytes), Error> {
321        // SAFETY: `self.pages` is known to be specialized for `self.row_layout`,
322        // as `self.pages` was constructed from `self.row_layout` in `Table::new`.
323        let (ptr, blob_bytes) = unsafe {
324            write_row_to_pages(
325                &mut self.inner.pages,
326                &self.inner.visitor_prog,
327                blob_store,
328                &self.inner.row_layout,
329                row,
330                self.squashed_offset,
331            )
332        }?;
333        // SAFETY: We just inserted `ptr`, so it must be present.
334        let row_ref = unsafe { self.inner.get_row_ref_unchecked(blob_store, ptr) };
335
336        Ok((row_ref, blob_bytes))
337    }
338
339    /// Physically insert a `row`, encoded in BSATN, into this table,
340    /// storing its large var-len members in the `blob_store`.
341    ///
342    /// On success, returns the hash of the newly-inserted row,
343    /// and a `RowRef` referring to the row.
344    ///
345    /// This does not check for set semantic or unique constraints.
346    ///
347    /// This is also useful when we need to insert a row temporarily to get back a `RowPointer`.
348    /// In this case, A call to this method should be followed by a call to [`delete_internal_skip_pointer_map`].
349    ///
350    /// When `row` is not valid BSATN at the table's row type,
351    /// an error is returned and there will be nothing for the caller to revert.
352    pub fn insert_physically_bsatn<'a>(
353        &'a mut self,
354        blob_store: &'a mut dyn BlobStore,
355        row: &[u8],
356    ) -> Result<(RowRef<'a>, BlobNumBytes), Error> {
357        // Got a static layout? => Use fast-path insertion.
358        let (ptr, blob_bytes) = if let Some((static_layout, static_validator)) = self.inner.static_layout.as_ref() {
359            // Before inserting, validate the row, ensuring type safety.
360            // SAFETY: The `static_validator` was derived from the same row layout as the static layout.
361            unsafe { validate_bsatn(static_validator, static_layout, row) }.map_err(Error::Decode)?;
362
363            let fixed_row_size = self.inner.row_layout.size();
364            let squashed_offset = self.squashed_offset;
365            let res = self
366                .inner
367                .pages
368                .with_page_to_insert_row(fixed_row_size, 0, |page| {
369                    // SAFETY: We've used the right `row_size` and we trust that others have too.
370                    // `RowTypeLayout` also ensures that we satisfy the minimum row size.
371                    let fixed_offset = unsafe { page.alloc_fixed_len(fixed_row_size) }.map_err(Error::PageError)?;
372                    let (mut fixed, _) = page.split_fixed_var_mut();
373                    let fixed_buf = fixed.get_row_mut(fixed_offset, fixed_row_size);
374                    // SAFETY:
375                    // - We've validated that `row` is of sufficient length.
376                    // - The `fixed_buf` is exactly the right `fixed_row_size`.
377                    unsafe { static_layout.deserialize_row_into(fixed_buf, row) };
378                    Ok(fixed_offset)
379                })
380                .map_err(Error::PagesError)?;
381            match res {
382                (page, Ok(offset)) => (RowPointer::new(false, page, offset, squashed_offset), 0.into()),
383                (_, Err(e)) => return Err(e),
384            }
385        } else {
386            // SAFETY: `self.pages` is known to be specialized for `self.row_layout`,
387            // as `self.pages` was constructed from `self.row_layout` in `Table::new`.
388            unsafe {
389                write_row_to_pages_bsatn(
390                    &mut self.inner.pages,
391                    &self.inner.visitor_prog,
392                    blob_store,
393                    &self.inner.row_layout,
394                    row,
395                    self.squashed_offset,
396                )
397            }?
398        };
399
400        // SAFETY: We just inserted `ptr`, so it must be present.
401        let row_ref = unsafe { self.inner.get_row_ref_unchecked(blob_store, ptr) };
402
403        Ok((row_ref, blob_bytes))
404    }
405
406    /// Returns all the columns with sequences that need generation for this `row`.
407    ///
408    /// # Safety
409    ///
410    /// `self.is_row_present(row)` must hold.
411    pub unsafe fn sequence_triggers_for<'a>(
412        &'a self,
413        blob_store: &'a dyn BlobStore,
414        row: RowPointer,
415    ) -> (ColList, SeqIdList) {
416        let sequences = &*self.get_schema().sequences;
417        let row_ty = self.row_layout().product();
418
419        // SAFETY: Caller promised that `self.is_row_present(row)` holds.
420        let row_ref = unsafe { self.get_row_ref_unchecked(blob_store, row) };
421
422        sequences
423            .iter()
424            // Find all the sequences that are triggered by this row.
425            .filter(|seq| {
426                // SAFETY: `seq.col_pos` is in-bounds of `row_ty.elements`
427                // as `row_ty` was derived from the same schema as `seq` is part of.
428                let elem_ty = unsafe { &row_ty.elements.get_unchecked(seq.col_pos.idx()) };
429                // SAFETY:
430                // - `elem_ty` appears as a column in the row type.
431                // - `AlgebraicValue` is compatible with all types.
432                let val = unsafe { AlgebraicValue::unchecked_read_column(row_ref, elem_ty) };
433                val.is_numeric_zero()
434            })
435            .map(|seq| (seq.col_pos, seq.sequence_id))
436            .unzip()
437    }
438
439    /// Writes `seq_val` to the column at `col_id` in the row identified by `ptr`.
440    ///
441    /// Truncates the `seq_val` to fit the type of the column.
442    ///
443    /// # Safety
444    ///
445    /// - `self.is_row_present(row)` must hold.
446    /// - `col_id` must be a valid column, with a primitive integer type, of the row type.
447    pub unsafe fn write_gen_val_to_col(&mut self, col_id: ColId, ptr: RowPointer, seq_val: i128) {
448        let row_ty = self.inner.row_layout.product();
449        // SAFETY: Caller promised that `col_id` was a valid column.
450        let elem_ty = unsafe { row_ty.elements.get_unchecked(col_id.idx()) };
451        let AlgebraicTypeLayout::Primitive(col_typ) = elem_ty.ty else {
452            // SAFETY: Columns with sequences must be primitive types.
453            unsafe { unreachable_unchecked() }
454        };
455
456        let fixed_row_size = self.inner.row_layout.size();
457        let fixed_buf = self.inner.pages[ptr.page_index()].get_fixed_row_data_mut(ptr.page_offset(), fixed_row_size);
458
459        fn write<const N: usize>(dst: &mut [u8], offset: u16, bytes: [u8; N]) {
460            let offset = offset as usize;
461            dst[offset..offset + N].copy_from_slice(&bytes);
462        }
463
464        match col_typ {
465            PrimitiveType::I8 => write(fixed_buf, elem_ty.offset, (seq_val as i8).to_le_bytes()),
466            PrimitiveType::U8 => write(fixed_buf, elem_ty.offset, (seq_val as u8).to_le_bytes()),
467            PrimitiveType::I16 => write(fixed_buf, elem_ty.offset, (seq_val as i16).to_le_bytes()),
468            PrimitiveType::U16 => write(fixed_buf, elem_ty.offset, (seq_val as u16).to_le_bytes()),
469            PrimitiveType::I32 => write(fixed_buf, elem_ty.offset, (seq_val as i32).to_le_bytes()),
470            PrimitiveType::U32 => write(fixed_buf, elem_ty.offset, (seq_val as u32).to_le_bytes()),
471            PrimitiveType::I64 => write(fixed_buf, elem_ty.offset, (seq_val as i64).to_le_bytes()),
472            PrimitiveType::U64 => write(fixed_buf, elem_ty.offset, (seq_val as u64).to_le_bytes()),
473            PrimitiveType::I128 => write(fixed_buf, elem_ty.offset, seq_val.to_le_bytes()),
474            PrimitiveType::U128 => write(fixed_buf, elem_ty.offset, (seq_val as u128).to_le_bytes()),
475            PrimitiveType::I256 => write(fixed_buf, elem_ty.offset, (i256::from(seq_val)).to_le_bytes()),
476            PrimitiveType::U256 => write(fixed_buf, elem_ty.offset, (u256::from(seq_val as u128)).to_le_bytes()),
477            // SAFETY: Columns with sequences must be integer types.
478            PrimitiveType::Bool | PrimitiveType::F32 | PrimitiveType::F64 => unsafe { unreachable_unchecked() },
479        }
480    }
481
482    /// Performs all the checks necessary after having fully decided on a rows contents.
483    ///
484    /// This includes inserting the row into any applicable indices and/or the pointer map.
485    ///
486    /// On `Ok(_)`, statistics of the table are also updated,
487    /// and the `ptr` still points to a valid row, and otherwise not.
488    ///
489    /// # Safety
490    ///
491    /// `self.is_row_present(row)` must hold.
492    pub unsafe fn confirm_insertion<'a>(
493        &'a mut self,
494        blob_store: &'a mut dyn BlobStore,
495        ptr: RowPointer,
496        blob_bytes: BlobNumBytes,
497    ) -> Result<(Option<RowHash>, RowPointer), InsertError> {
498        // SAFETY: Caller promised that `self.is_row_present(ptr)` holds.
499        let hash = unsafe { self.insert_into_pointer_map(blob_store, ptr) }?;
500        // SAFETY: Caller promised that `self.is_row_present(ptr)` holds.
501        unsafe { self.insert_into_indices(blob_store, ptr) }?;
502
503        self.update_statistics_added_row(blob_bytes);
504        Ok((hash, ptr))
505    }
506
507    /// Confirms a row update, after first updating indices and checking constraints.
508    ///
509    /// On `Ok(_)`:
510    /// - the statistics of the table are also updated,
511    /// - the `ptr` still points to a valid row.
512    ///
513    /// Otherwise, on `Err(_)`:
514    /// - `ptr` will not point to a valid row,
515    /// - the statistics won't be updated.
516    ///
517    /// # Safety
518    ///
519    /// `self.is_row_present(new_row)` and `self.is_row_present(old_row)`  must hold.
520    pub unsafe fn confirm_update<'a>(
521        &'a mut self,
522        blob_store: &'a mut dyn BlobStore,
523        new_ptr: RowPointer,
524        old_ptr: RowPointer,
525        blob_bytes_added: BlobNumBytes,
526    ) -> Result<RowPointer, UniqueConstraintViolation> {
527        // (1) Remove old row from indices.
528        // SAFETY: Caller promised that `self.is_row_present(old_ptr)` holds.
529        unsafe { self.delete_from_indices(blob_store, old_ptr) };
530
531        // Insert new row into indices.
532        // SAFETY: Caller promised that `self.is_row_present(ptr)` holds.
533        let res = unsafe { self.insert_into_indices(blob_store, new_ptr) };
534        if let Err(e) = res {
535            // Undo (1).
536            unsafe { self.insert_into_indices(blob_store, old_ptr) }
537                .expect("re-inserting the old row into indices should always work");
538            return Err(e);
539        }
540
541        // Remove the old row physically.
542        // SAFETY: The physical `old_ptr` still exists.
543        let blob_bytes_removed = unsafe { self.delete_internal_skip_pointer_map(blob_store, old_ptr) };
544        self.update_statistics_deleted_row(blob_bytes_removed);
545
546        // Update statistics.
547        self.update_statistics_added_row(blob_bytes_added);
548        Ok(new_ptr)
549    }
550
551    /// We've added a row, update the statistics to record this.
552    #[inline]
553    fn update_statistics_added_row(&mut self, blob_bytes: BlobNumBytes) {
554        self.row_count += 1;
555        self.blob_store_bytes += blob_bytes;
556    }
557
558    /// We've removed a row, update the statistics to record this.
559    #[inline]
560    fn update_statistics_deleted_row(&mut self, blob_bytes: BlobNumBytes) {
561        self.row_count -= 1;
562        self.blob_store_bytes -= blob_bytes;
563    }
564
565    /// Insert row identified by `ptr` into indices.
566    /// This also checks unique constraints.
567    /// Deletes the row if there were any violations.
568    ///
569    /// SAFETY: `self.is_row_present(row)` must hold.
570    /// Post-condition: If this method returns `Ok(_)`, the row still exists.
571    unsafe fn insert_into_indices<'a>(
572        &'a mut self,
573        blob_store: &'a mut dyn BlobStore,
574        ptr: RowPointer,
575    ) -> Result<(), UniqueConstraintViolation> {
576        let mut index_error = None;
577        for (&index_id, index) in self.indexes.iter_mut() {
578            // SAFETY: We just inserted `ptr`, so it must be present.
579            let row_ref = unsafe { self.inner.get_row_ref_unchecked(blob_store, ptr) };
580            // SAFETY: any index in this table was constructed with the same row type as this table.
581            let violation = unsafe { index.check_and_insert(row_ref) };
582            if violation.is_err() {
583                let cols = &index.indexed_columns;
584                let value = row_ref.project(cols).unwrap();
585                let error = UniqueConstraintViolation::build(&self.schema, index, index_id, value);
586                index_error = Some(error);
587                break;
588            }
589        }
590        if let Some(err) = index_error {
591            // Found unique constraint violation.
592            // Undo the insertion.
593            // SAFETY: We just inserted `ptr`, so it must be present.
594            unsafe {
595                self.delete_unchecked(blob_store, ptr);
596            }
597            return Err(err);
598        }
599        Ok(())
600    }
601
602    /// Finds the [`RowPointer`] to the row in `target_table` equal, if any,
603    /// to the row `needle_ptr` in `needle_table`,
604    /// by any unique index in `target_table`.
605    ///
606    /// # Safety
607    ///
608    /// - `target_table` and `needle_table` must have the same `row_layout`.
609    /// - `needle_table.is_row_present(needle_ptr)` must hold.
610    unsafe fn find_same_row_via_unique_index(
611        target_table: &Table,
612        needle_table: &Table,
613        needle_bs: &dyn BlobStore,
614        needle_ptr: RowPointer,
615    ) -> Option<RowPointer> {
616        // Use some index (the one with the lowest `IndexId` currently).
617        // TODO(centril): this isn't what we actually want.
618        // Rather, we'd prefer the index with the simplest type,
619        // but this is left as future work as we don't have to optimize this method now.
620        let target_index = target_table
621            .indexes
622            .values()
623            .find(|idx| idx.is_unique())
624            .expect("there should be at least one unique index");
625        // Project the needle row to the columns of the index, and then seek.
626        // As this is a unique index, there are 0-1 rows for this key.
627        let needle_row = unsafe { needle_table.get_row_ref_unchecked(needle_bs, needle_ptr) };
628        let key = needle_row
629            .project(&target_index.indexed_columns)
630            .expect("needle row should be valid");
631        target_index.seek_point(&key).next().filter(|&target_ptr| {
632            // SAFETY:
633            // - Caller promised that the row layouts were the same.
634            // - We know `target_ptr` exists, as it was in `target_index`, belonging to `target_table`.
635            // - Caller promised that `needle_ptr` is valid for `needle_table`.
636            unsafe { Self::eq_row_in_page(target_table, target_ptr, needle_table, needle_ptr) }
637        })
638    }
639
640    /// Insert the row identified by `ptr` into the table's [`PointerMap`],
641    /// if the table has one.
642    ///
643    /// This checks for set semantic violations.
644    /// If a set semantic conflict (i.e. duplicate row) is detected by the pointer map,
645    /// the row will be deleted and an error returned.
646    /// If the pointer map confirms that the row was unique, returns the `RowHash` of that row.
647    ///
648    /// If this table has no `PointerMap`, returns `Ok(None)`.
649    /// In that case, the row's uniqueness will be verified by [`Self::insert_into_indices`],
650    /// as this table has at least one unique index.
651    ///
652    /// SAFETY: `self.is_row_present(row)` must hold.
653    /// Post-condition: If this method returns `Ok(_)`, the row still exists.
654    unsafe fn insert_into_pointer_map<'a>(
655        &'a mut self,
656        blob_store: &'a mut dyn BlobStore,
657        ptr: RowPointer,
658    ) -> Result<Option<RowHash>, DuplicateError> {
659        if self.pointer_map.is_none() {
660            // No pointer map? Set semantic constraint is checked by a unique index instead.
661            return Ok(None);
662        };
663
664        // SAFETY:
665        // - `self` trivially has the same `row_layout` as `self`.
666        // - Caller promised that `self.is_row_present(row)` holds.
667        let (hash, existing_row) = unsafe { Self::find_same_row_via_pointer_map(self, self, blob_store, ptr, None) };
668
669        if let Some(existing_row) = existing_row {
670            // If an equal row was already present,
671            // roll back our optimistic insert to avoid violating set semantics.
672
673            // SAFETY: Caller promised that `ptr` is a valid row in `self`.
674            unsafe {
675                self.inner
676                    .pages
677                    .delete_row(&self.inner.visitor_prog, self.row_size(), ptr, blob_store)
678            };
679            return Err(DuplicateError(existing_row));
680        }
681
682        // If the optimistic insertion was correct,
683        // i.e. this is not a set-semantic duplicate,
684        // add it to the `pointer_map`.
685        self.pointer_map
686            .as_mut()
687            .expect("pointer map should exist, as it did previously")
688            .insert(hash, ptr);
689
690        Ok(Some(hash))
691    }
692
693    /// Returns the list of pointers to rows which hash to `row_hash`.
694    ///
695    /// If `self` does not have a [`PointerMap`], always returns the empty slice.
696    fn pointers_for(&self, row_hash: RowHash) -> &[RowPointer] {
697        self.pointer_map.as_ref().map_or(&[], |pm| pm.pointers_for(row_hash))
698    }
699
700    /// Using the [`PointerMap`],
701    /// searches `target_table` for a row equal to `needle_table[needle_ptr]`.
702    ///
703    /// Rows are compared for equality by [`eq_row_in_page`].
704    ///
705    /// Lazily computes the row hash if needed and returns it, or uses the one provided, if any.
706    ///
707    /// Used for detecting set-semantic duplicates when inserting
708    /// into tables without any unique constraints.
709    ///
710    /// Does nothing and always returns `None` if `target_table` does not have a `PointerMap`,
711    /// in which case the caller should instead use [`Self::find_same_row_via_unique_index`].
712    ///
713    /// Note that we don't need the blob store to compute equality,
714    /// as content-addressing means it's sufficient to compare the hashes of large blobs.
715    /// (If we see a collision in `BlobHash` we have bigger problems.)
716    ///
717    /// # Safety
718    ///
719    /// - `target_table` and `needle_table` must have the same `row_layout`.
720    /// - `needle_table.is_row_present(needle_ptr)`.
721    pub unsafe fn find_same_row_via_pointer_map(
722        target_table: &Table,
723        needle_table: &Table,
724        needle_bs: &dyn BlobStore,
725        needle_ptr: RowPointer,
726        row_hash: Option<RowHash>,
727    ) -> (RowHash, Option<RowPointer>) {
728        let row_hash = row_hash.unwrap_or_else(|| {
729            // SAFETY: Caller promised that `needle_table.is_row_present(needle_ptr)`.
730            let row_ref = unsafe { needle_table.get_row_ref_unchecked(needle_bs, needle_ptr) };
731            row_ref.row_hash()
732        });
733
734        // Scan all the frow pointers with `row_hash` in the `committed_table`.
735        let row_ptr = target_table.pointers_for(row_hash).iter().copied().find(|&target_ptr| {
736            // SAFETY:
737            // - Caller promised that the row layouts were the same.
738            // - We know `target_ptr` exists, as it was found in a pointer map.
739            // - Caller promised that `needle_ptr` is valid for `needle_table`.
740            unsafe { Self::eq_row_in_page(target_table, target_ptr, needle_table, needle_ptr) }
741        });
742
743        (row_hash, row_ptr)
744    }
745
746    /// Returns whether the row `target_ptr` in `target_table`
747    /// is exactly equal to the row `needle_ptr` in `needle_ptr`.
748    ///
749    /// # Safety
750    ///
751    /// - `target_table` and `needle_table` must have the same `row_layout`.
752    /// - `target_table.is_row_present(target_ptr)`.
753    /// - `needle_table.is_row_present(needle_ptr)`.
754    pub unsafe fn eq_row_in_page(
755        target_table: &Table,
756        target_ptr: RowPointer,
757        needle_table: &Table,
758        needle_ptr: RowPointer,
759    ) -> bool {
760        let (target_page, target_offset) = target_table.inner.page_and_offset(target_ptr);
761        let (needle_page, needle_offset) = needle_table.inner.page_and_offset(needle_ptr);
762
763        // SAFETY:
764        // - Caller promised that `target_ptr` is valid, so `target_page` and `target_offset` are both valid.
765        // - Caller promised that `needle_ptr` is valid, so `needle_page` and `needle_offset` are both valid.
766        // - Caller promised that the layouts of `target_table` and `needle_table` are the same,
767        //   so `target_table` applies to both.
768        //   Moreover `(x: Table).inner.static_layout` is always derived from `x.row_layout`.
769        unsafe {
770            eq_row_in_page(
771                target_page,
772                needle_page,
773                target_offset,
774                needle_offset,
775                &target_table.inner.row_layout,
776                target_table.static_layout(),
777            )
778        }
779    }
780
781    /// Searches `target_table` for a row equal to `needle_table[needle_ptr]`,
782    /// and returns the [`RowPointer`] to that row in `target_table`, if it exists.
783    ///
784    /// Searches using the [`PointerMap`] or a unique index, as appropriate for the table.
785    ///
786    /// Lazily computes the row hash if needed and returns it, or uses the one provided, if any.
787    ///
788    /// # Safety
789    ///
790    /// - `target_table` and `needle_table` must have the same `row_layout`.
791    /// - `needle_table.is_row_present(needle_ptr)` must hold.
792    pub unsafe fn find_same_row(
793        target_table: &Table,
794        needle_table: &Table,
795        needle_bs: &dyn BlobStore,
796        needle_ptr: RowPointer,
797        row_hash: Option<RowHash>,
798    ) -> (Option<RowHash>, Option<RowPointer>) {
799        if target_table.pointer_map.is_some() {
800            // SAFETY: Caller promised that `target_table` and `needle_table` have the same `row_layout`.
801            // SAFETY: Caller promised that `needle_table.is_row_present(needle_ptr)`.
802            let (row_hash, row_ptr) = unsafe {
803                Self::find_same_row_via_pointer_map(target_table, needle_table, needle_bs, needle_ptr, row_hash)
804            };
805            (Some(row_hash), row_ptr)
806        } else {
807            (
808                row_hash,
809                // SAFETY: Caller promised that `target_table` and `needle_table` have the same `row_layout`.
810                // SAFETY: Caller promised that `needle_table.is_row_present(needle_ptr)`.
811                unsafe { Self::find_same_row_via_unique_index(target_table, needle_table, needle_bs, needle_ptr) },
812            )
813        }
814    }
815
816    /// Returns a [`RowRef`] for `ptr` or `None` if the row isn't present.
817    pub fn get_row_ref<'a>(&'a self, blob_store: &'a dyn BlobStore, ptr: RowPointer) -> Option<RowRef<'a>> {
818        self.is_row_present(ptr)
819            // SAFETY: We only call `get_row_ref_unchecked` when `is_row_present` holds.
820            .then(|| unsafe { self.get_row_ref_unchecked(blob_store, ptr) })
821    }
822
823    /// Assumes `ptr` is a present row in `self` and returns a [`RowRef`] to it.
824    ///
825    /// # Safety
826    ///
827    /// The requirement is that `self.is_row_present(ptr)` must hold.
828    /// That is, `ptr` must refer to a row within `self`
829    /// which was previously inserted and has not been deleted since.
830    ///
831    /// This means:
832    /// - The `PageIndex` of `ptr` must be in-bounds for `self.pages`.
833    /// - The `PageOffset` of `ptr` must be properly aligned for the row type of `self`,
834    ///   and must refer to a valid, live row in that page.
835    /// - The `SquashedOffset` of `ptr` must match `self.squashed_offset`.
836    ///
837    /// Showing that `ptr` was the result of a call to [`Table::insert(table, ..)`]
838    /// and has not been passed to [`Table::delete(table, ..)`]
839    /// is sufficient to demonstrate all of these properties.
840    pub unsafe fn get_row_ref_unchecked<'a>(&'a self, blob_store: &'a dyn BlobStore, ptr: RowPointer) -> RowRef<'a> {
841        debug_assert!(self.is_row_present(ptr));
842        // SAFETY: Caller promised that ^-- holds.
843        unsafe { RowRef::new(&self.inner, blob_store, ptr) }
844    }
845
846    /// Deletes a row in the page manager
847    /// without deleting it logically in the pointer map.
848    ///
849    /// # Safety
850    ///
851    /// `ptr` must point to a valid, live row in this table.
852    pub unsafe fn delete_internal_skip_pointer_map(
853        &mut self,
854        blob_store: &mut dyn BlobStore,
855        ptr: RowPointer,
856    ) -> BlobNumBytes {
857        // Delete the physical row.
858        //
859        // SAFETY:
860        // - `ptr` points to a valid row in this table, per our invariants.
861        // - `self.row_size` known to be consistent with `self.pages`,
862        //    as the two are tied together in `Table::new`.
863        unsafe {
864            self.inner
865                .pages
866                .delete_row(&self.inner.visitor_prog, self.row_size(), ptr, blob_store)
867        }
868    }
869
870    /// Deletes the row identified by `ptr` from the table.
871    ///
872    /// Returns the number of blob bytes added. This method does not update statistics by itself.
873    ///
874    /// NOTE: This method skips updating indexes.
875    /// Use `delete_unchecked` or `delete` to delete a row with index updating.
876    ///
877    /// SAFETY: `self.is_row_present(row)` must hold.
878    unsafe fn delete_internal(&mut self, blob_store: &mut dyn BlobStore, ptr: RowPointer) -> BlobNumBytes {
879        // Remove the set semantic association.
880        if let Some(pointer_map) = &mut self.pointer_map {
881            // SAFETY: `self.is_row_present(row)` holds.
882            let row = unsafe { RowRef::new(&self.inner, blob_store, ptr) };
883
884            let _remove_result = pointer_map.remove(row.row_hash(), ptr);
885            debug_assert!(_remove_result);
886        }
887
888        // Delete the physical row.
889        // SAFETY: `ptr` points to a valid row in this table as `self.is_row_present(row)` holds.
890        unsafe { self.delete_internal_skip_pointer_map(blob_store, ptr) }
891    }
892
893    /// Deletes the row identified by `ptr` from the table.
894    ///
895    /// Returns the number of blob bytes deleted. This method does not update statistics by itself.
896    ///
897    /// SAFETY: `self.is_row_present(row)` must hold.
898    unsafe fn delete_unchecked(&mut self, blob_store: &mut dyn BlobStore, ptr: RowPointer) -> BlobNumBytes {
899        // Delete row from indices.
900        // Do this before the actual deletion, as `index.delete` needs a `RowRef`
901        // so it can extract the appropriate value.
902        // SAFETY: Caller promised that `self.is_row_present(row)` holds.
903        unsafe { self.delete_from_indices(blob_store, ptr) };
904
905        // SAFETY: Caller promised that `self.is_row_present(row)` holds.
906        unsafe { self.delete_internal(blob_store, ptr) }
907    }
908
909    /// Delete `row_ref` from all the indices of this table.
910    ///
911    /// SAFETY: `self.is_row_present(row)` must hold.
912    unsafe fn delete_from_indices(&mut self, blob_store: &dyn BlobStore, ptr: RowPointer) {
913        // SAFETY: Caller promised that `self.is_row_present(row)` holds.
914        let row_ref = unsafe { self.inner.get_row_ref_unchecked(blob_store, ptr) };
915
916        for index in self.indexes.values_mut() {
917            index.delete(row_ref).unwrap();
918        }
919    }
920
921    /// Deletes the row identified by `ptr` from the table.
922    ///
923    /// The function `before` is run on the to-be-deleted row,
924    /// if it is present, before deleting.
925    /// This enables callers to extract the deleted row.
926    /// E.g. applying deletes when squashing/merging a transaction into the committed state
927    /// passes `|row| row.to_product_value()` as `before`
928    /// so that the resulting `ProductValue`s can be passed to the subscription evaluator.
929    pub fn delete<'a, R>(
930        &'a mut self,
931        blob_store: &'a mut dyn BlobStore,
932        ptr: RowPointer,
933        before: impl for<'b> FnOnce(RowRef<'b>) -> R,
934    ) -> Option<R> {
935        if !self.is_row_present(ptr) {
936            return None;
937        };
938
939        // SAFETY: We only call `get_row_ref_unchecked` when `is_row_present` holds.
940        let row_ref = unsafe { self.inner.get_row_ref_unchecked(blob_store, ptr) };
941
942        let ret = before(row_ref);
943
944        // SAFETY: We've checked above that `self.is_row_present(ptr)`.
945        let blob_bytes_deleted = unsafe { self.delete_unchecked(blob_store, ptr) };
946        self.update_statistics_deleted_row(blob_bytes_deleted);
947
948        Some(ret)
949    }
950
951    /// If a row exists in `self` which matches `row`
952    /// by [`Table::find_same_row`],
953    /// delete that row.
954    ///
955    /// If a matching row was found, returns the pointer to that row.
956    /// The returned pointer is now invalid, as the row to which it referred has been deleted.
957    ///
958    /// This operation works by temporarily inserting the `row` into `self`,
959    /// checking `find_same_row` on the newly-inserted row,
960    /// deleting the matching row if it exists,
961    /// then deleting the temporary insertion.
962    pub fn delete_equal_row(
963        &mut self,
964        blob_store: &mut dyn BlobStore,
965        row: &ProductValue,
966    ) -> Result<Option<RowPointer>, Error> {
967        // Insert `row` temporarily so `temp_ptr` and `hash` can be used to find the row.
968        // This must avoid consulting and inserting to the pointer map,
969        // as the row is already present, set-semantically.
970        let (temp_row, _) = self.insert_physically_pv(blob_store, row)?;
971        let temp_ptr = temp_row.pointer();
972
973        // Find the row equal to the passed-in `row`.
974        // This uses one of two approaches.
975        // Either there is a pointer map, so we use that,
976        // or, here is at least one unique index, so we use one of them.
977        //
978        // SAFETY:
979        // - `self` trivially has the same `row_layout` as `self`.
980        // - We just inserted `temp_ptr`, so it's valid.
981        let (_, existing_row_ptr) = unsafe { Self::find_same_row(self, self, blob_store, temp_ptr, None) };
982
983        // If an equal row was present, delete it.
984        if let Some(existing_row_ptr) = existing_row_ptr {
985            let blob_bytes_deleted = unsafe {
986                // SAFETY: `find_same_row` ensures that the pointer is valid.
987                self.delete_unchecked(blob_store, existing_row_ptr)
988            };
989            self.update_statistics_deleted_row(blob_bytes_deleted);
990        }
991
992        // Remove the temporary row we inserted in the beginning.
993        // Avoid the pointer map, since we don't want to delete it twice.
994        // SAFETY: `ptr` is valid as we just inserted it.
995        unsafe {
996            self.delete_internal_skip_pointer_map(blob_store, temp_ptr);
997        }
998
999        Ok(existing_row_ptr)
1000    }
1001
1002    /// Returns the row type for rows in this table.
1003    pub fn get_row_type(&self) -> &ProductType {
1004        self.get_schema().get_row_type()
1005    }
1006
1007    /// Returns the schema for this table.
1008    pub fn get_schema(&self) -> &Arc<TableSchema> {
1009        &self.schema
1010    }
1011
1012    /// Runs a mutation on the [`TableSchema`] of this table.
1013    ///
1014    /// This uses a clone-on-write mechanism.
1015    /// If none but `self` refers to the schema, then the mutation will be in-place.
1016    /// Otherwise, the schema must be cloned, mutated,
1017    /// and then the cloned version is written back to the table.
1018    pub fn with_mut_schema(&mut self, with: impl FnOnce(&mut TableSchema)) {
1019        with(Arc::make_mut(&mut self.schema));
1020    }
1021
1022    /// Returns a new [`TableIndex`] for `table`.
1023    pub fn new_index(&self, algo: &IndexAlgorithm, is_unique: bool) -> Result<TableIndex, InvalidFieldError> {
1024        TableIndex::new(self.get_schema().get_row_type(), algo, is_unique)
1025    }
1026
1027    /// Inserts a new `index` into the table.
1028    ///
1029    /// The index will be populated using the rows of the table.
1030    ///
1031    /// # Panics
1032    ///
1033    /// Panics if any row would violate `index`'s unique constraint, if it has one.
1034    ///
1035    /// # Safety
1036    ///
1037    /// Caller must promise that `index` was constructed with the same row type/layout as this table.
1038    pub unsafe fn insert_index(&mut self, blob_store: &dyn BlobStore, index_id: IndexId, mut index: TableIndex) {
1039        let rows = self.scan_rows(blob_store);
1040        // SAFETY: Caller promised that table's row type/layout
1041        // matches that which `index` was constructed with.
1042        // It follows that this applies to any `rows`, as required.
1043        let violation = unsafe { index.build_from_rows(rows) };
1044        violation.unwrap_or_else(|ptr| {
1045            panic!("adding `index` should cause no unique constraint violations, but {ptr:?} would")
1046        });
1047        // SAFETY: Forward caller requirement.
1048        unsafe { self.add_index(index_id, index) };
1049    }
1050
1051    /// Adds an index to the table without populating.
1052    ///
1053    /// # Safety
1054    ///
1055    /// Caller must promise that `index` was constructed with the same row type/layout as this table.
1056    pub unsafe fn add_index(&mut self, index_id: IndexId, index: TableIndex) {
1057        let is_unique = index.is_unique();
1058        self.indexes.insert(index_id, index);
1059
1060        // Remove the pointer map, if any.
1061        if is_unique {
1062            self.pointer_map = None;
1063        }
1064    }
1065
1066    /// Removes an index from the table.
1067    ///
1068    /// Returns whether an index existed with `index_id`.
1069    pub fn delete_index(&mut self, blob_store: &dyn BlobStore, index_id: IndexId) -> bool {
1070        let index = self.indexes.remove(&index_id);
1071        let ret = index.is_some();
1072
1073        // If we removed the last unique index, add a pointer map.
1074        if index.is_some_and(|i| i.is_unique()) && !self.indexes.values().any(|idx| idx.is_unique()) {
1075            self.rebuild_pointer_map(blob_store);
1076        }
1077
1078        // Remove index from schema.
1079        //
1080        // This likely will do a clone-write as over time?
1081        // The schema might have found other referents.
1082        self.with_mut_schema(|s| s.indexes.retain(|x| x.index_id != index_id));
1083
1084        ret
1085    }
1086
1087    /// Returns an iterator over all the rows of `self`, yielded as [`RefRef`]s.
1088    pub fn scan_rows<'a>(&'a self, blob_store: &'a dyn BlobStore) -> TableScanIter<'a> {
1089        TableScanIter {
1090            current_page: None, // Will be filled by the iterator.
1091            current_page_idx: PageIndex(0),
1092            table: self,
1093            blob_store,
1094        }
1095    }
1096
1097    /// Returns this table combined with the index for [`IndexId`], if any.
1098    pub fn get_index_by_id_with_table<'a>(
1099        &'a self,
1100        blob_store: &'a dyn BlobStore,
1101        index_id: IndexId,
1102    ) -> Option<TableAndIndex<'a>> {
1103        Some(TableAndIndex {
1104            table: self,
1105            blob_store,
1106            index: self.get_index_by_id(index_id)?,
1107        })
1108    }
1109
1110    /// Returns the [`TableIndex`] for this [`IndexId`].
1111    pub fn get_index_by_id(&self, index_id: IndexId) -> Option<&TableIndex> {
1112        self.indexes.get(&index_id)
1113    }
1114
1115    /// Returns this table combined with the first index with `cols`, if any.
1116    pub fn get_index_by_cols_with_table<'a>(
1117        &'a self,
1118        blob_store: &'a dyn BlobStore,
1119        cols: &ColList,
1120    ) -> Option<TableAndIndex<'a>> {
1121        let (_, index) = self.get_index_by_cols(cols)?;
1122        Some(TableAndIndex {
1123            table: self,
1124            blob_store,
1125            index,
1126        })
1127    }
1128
1129    /// Returns the first [`TableIndex`] with the given [`ColList`].
1130    pub fn get_index_by_cols(&self, cols: &ColList) -> Option<(IndexId, &TableIndex)> {
1131        self.indexes
1132            .iter()
1133            .find(|(_, index)| &index.indexed_columns == cols)
1134            .map(|(id, idx)| (*id, idx))
1135    }
1136
1137    /// Clones the structure of this table into a new one with
1138    /// the same schema, visitor program, and indices.
1139    /// The new table will be completely empty
1140    /// and will use the given `squashed_offset` instead of that of `self`.
1141    pub fn clone_structure(&self, squashed_offset: SquashedOffset) -> Self {
1142        let schema = self.schema.clone();
1143        let layout = self.row_layout().clone();
1144        let sbl = self.inner.static_layout.clone();
1145        let visitor = self.inner.visitor_prog.clone();
1146        // If we had a pointer map, we'll have one in the cloned one as well, but empty.
1147        let pm = self.pointer_map.as_ref().map(|_| PointerMap::default());
1148        let mut new = Table::new_with_indexes_capacity(schema, layout, sbl, visitor, squashed_offset, pm);
1149
1150        // Clone the index structure. The table is empty, so no need to `build_from_rows`.
1151        for (&index_id, index) in self.indexes.iter() {
1152            new.indexes.insert(index_id, index.clone_structure());
1153        }
1154        new
1155    }
1156
1157    /// Returns the number of bytes occupied by the pages and the blob store.
1158    /// Note that result can be more than the actual physical size occupied by the table
1159    /// because the blob store implementation can do internal optimizations.
1160    /// For more details, refer to the documentation of `self.blob_store_bytes`.
1161    pub fn bytes_occupied_overestimate(&self) -> usize {
1162        (self.num_pages() * PAGE_DATA_SIZE) + (self.blob_store_bytes.0)
1163    }
1164
1165    /// Reset the internal storage of `self` to be `pages`.
1166    ///
1167    /// This recomputes the pointer map based on the `pages`,
1168    /// but does not recompute indexes.
1169    ///
1170    /// Used when restoring from a snapshot.
1171    ///
1172    /// # Safety
1173    ///
1174    /// The schema of rows stored in the `pages` must exactly match `self.schema` and `self.inner.row_layout`.
1175    pub unsafe fn set_pages(&mut self, pages: Vec<Box<Page>>, blob_store: &dyn BlobStore) {
1176        self.inner.pages.set_contents(pages, self.inner.row_layout.size());
1177
1178        // Recompute table metadata based on the new pages.
1179        // Compute the row count first, in case later computations want to use it as a capacity to pre-allocate.
1180        self.compute_row_count(blob_store);
1181        self.rebuild_pointer_map(blob_store);
1182    }
1183
1184    /// Returns the number of rows resident in this table.
1185    ///
1186    /// This scales in runtime with the number of pages in the table.
1187    pub fn num_rows(&self) -> u64 {
1188        self.pages().iter().map(|page| page.num_rows() as u64).sum()
1189    }
1190
1191    #[cfg(test)]
1192    fn reconstruct_num_rows(&self) -> u64 {
1193        self.pages().iter().map(|page| page.reconstruct_num_rows() as u64).sum()
1194    }
1195
1196    /// Returns the number of bytes used by rows resident in this table.
1197    ///
1198    /// This includes data bytes, padding bytes and some overhead bytes,
1199    /// as described in the docs for [`Page::bytes_used_by_rows`],
1200    /// but *does not* include:
1201    ///
1202    /// - Unallocated space within pages.
1203    /// - Per-page overhead (e.g. page headers).
1204    /// - Table overhead (e.g. the [`RowTypeLayout`], [`PointerMap`], [`Schema`] &c).
1205    /// - Indexes.
1206    /// - Large blobs in the [`BlobStore`].
1207    ///
1208    /// Of these, the caller should inspect the blob store in order to account for memory usage by large blobs,
1209    /// and call [`Self::bytes_used_by_index_keys`] to account for indexes,
1210    /// but we intend to eat all the other overheads when billing.
1211    pub fn bytes_used_by_rows(&self) -> u64 {
1212        self.pages()
1213            .iter()
1214            .map(|page| page.bytes_used_by_rows(self.inner.row_layout.size()) as u64)
1215            .sum()
1216    }
1217
1218    #[cfg(test)]
1219    fn reconstruct_bytes_used_by_rows(&self) -> u64 {
1220        self.pages()
1221            .iter()
1222            .map(|page| unsafe {
1223                // Safety: `page` is in `self`, and was constructed using `self.innser.row_layout` and `self.inner.visitor_prog`,
1224                // so the three are mutually consistent.
1225                page.reconstruct_bytes_used_by_rows(self.inner.row_layout.size(), &self.inner.visitor_prog)
1226            } as u64)
1227            .sum()
1228    }
1229
1230    /// Returns the number of rows (or [`RowPointer`]s, more accurately)
1231    /// stored in indexes by this table.
1232    ///
1233    /// This method runs in constant time.
1234    pub fn num_rows_in_indexes(&self) -> u64 {
1235        // Assume that each index contains all rows in the table.
1236        self.num_rows() * self.indexes.len() as u64
1237    }
1238
1239    /// Returns the number of bytes used by keys stored in indexes by this table.
1240    ///
1241    /// This method scales in runtime with the number of indexes in the table,
1242    /// but not with the number of pages or rows.
1243    ///
1244    /// Key size is measured using a metric called "key size" or "data size,"
1245    /// which is intended to capture the number of live user-supplied bytes,
1246    /// not including representational overhead.
1247    /// This is distinct from the BFLATN size measured by [`Self::bytes_used_by_rows`].
1248    /// See the trait [`crate::btree_index::KeySize`] for specifics on the metric measured.
1249    pub fn bytes_used_by_index_keys(&self) -> u64 {
1250        self.indexes.values().map(|idx| idx.num_key_bytes()).sum()
1251    }
1252}
1253
1254/// A reference to a single row within a table.
1255///
1256/// # Safety
1257///
1258/// Having a `r: RowRef` is a proof that [`r.pointer()`](RowRef::pointer) refers to a valid row.
1259/// This makes constructing a `RowRef`, i.e., `RowRef::new`, an `unsafe` operation.
1260#[derive(Copy, Clone)]
1261pub struct RowRef<'a> {
1262    /// The table that has the row at `self.pointer`.
1263    table: &'a TableInner,
1264    /// The blob store used in case there are blob hashes to resolve.
1265    blob_store: &'a dyn BlobStore,
1266    /// The pointer to the row in `self.table`.
1267    pointer: RowPointer,
1268}
1269
1270impl fmt::Debug for RowRef<'_> {
1271    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1272        fmt.debug_struct("RowRef")
1273            .field("pointer", &self.pointer)
1274            .finish_non_exhaustive()
1275    }
1276}
1277
1278impl<'a> RowRef<'a> {
1279    /// Construct a `RowRef` to the row at `pointer` within `table`.
1280    ///
1281    /// # Safety
1282    ///
1283    /// `pointer` must refer to a row within `table`
1284    /// which was previously inserted and has not been deleted since.
1285    ///
1286    /// This means:
1287    /// - The `PageIndex` of `pointer` must be in-bounds for `table.pages`.
1288    /// - The `PageOffset` of `pointer` must be properly aligned for the row type of `table`,
1289    ///   and must refer to a valid, live row in that page.
1290    /// - The `SquashedOffset` of `pointer` must match `table.squashed_offset`.
1291    ///
1292    /// Showing that `pointer` was the result of a call to `table.insert`
1293    /// and has not been passed to `table.delete`
1294    /// is sufficient to demonstrate all of these properties.
1295    unsafe fn new(table: &'a TableInner, blob_store: &'a dyn BlobStore, pointer: RowPointer) -> Self {
1296        Self {
1297            table,
1298            blob_store,
1299            pointer,
1300        }
1301    }
1302
1303    /// Extract a `ProductValue` from the table.
1304    ///
1305    /// This is a potentially expensive operation,
1306    /// as it must walk the table's `ProductTypeLayout`
1307    /// and heap-allocate various substructures of the `ProductValue`.
1308    pub fn to_product_value(&self) -> ProductValue {
1309        let res = self
1310            .serialize(ValueSerializer)
1311            .unwrap_or_else(|x| match x {})
1312            .into_product();
1313        // SAFETY: the top layer of a row when serialized is always a product.
1314        unsafe { res.unwrap_unchecked() }
1315    }
1316
1317    /// Check that the `idx`th column of the row type stored by `self` is compatible with `T`,
1318    /// and read the value of that column from `self`.
1319    #[inline]
1320    pub fn read_col<T: ReadColumn>(self, col: impl Into<ColId>) -> Result<T, TypeError> {
1321        T::read_column(self, col.into().idx())
1322    }
1323
1324    /// Construct a projection of the row at `self` by extracting the `cols`.
1325    ///
1326    /// If `cols` contains zero or more than one column, the values of the projected columns are wrapped in a [`ProductValue`].
1327    /// If `cols` is a single column, the value of that column is returned without wrapping in a `ProductValue`.
1328    ///
1329    /// # Safety
1330    ///
1331    /// - `cols` must not specify any column which is out-of-bounds for the row `self´.
1332    pub unsafe fn project_unchecked(self, cols: &ColList) -> AlgebraicValue {
1333        let col_layouts = &self.row_layout().product().elements;
1334
1335        if let Some(head) = cols.as_singleton() {
1336            let head = head.idx();
1337            // SAFETY: caller promised that `head` is in-bounds of `col_layouts`.
1338            let col_layout = unsafe { col_layouts.get_unchecked(head) };
1339            // SAFETY:
1340            // - `col_layout` was just derived from the row layout.
1341            // - `AlgebraicValue` is compatible with any  `col_layout`.
1342            // - `self` is a valid row and offsetting to `col_layout` is valid.
1343            return unsafe { AlgebraicValue::unchecked_read_column(self, col_layout) };
1344        }
1345        let mut elements = Vec::with_capacity(cols.len() as usize);
1346        for col in cols.iter() {
1347            let col = col.idx();
1348            // SAFETY: caller promised that any `col` is in-bounds of `col_layouts`.
1349            let col_layout = unsafe { col_layouts.get_unchecked(col) };
1350            // SAFETY:
1351            // - `col_layout` was just derived from the row layout.
1352            // - `AlgebraicValue` is compatible with any  `col_layout`.
1353            // - `self` is a valid row and offsetting to `col_layout` is valid.
1354            elements.push(unsafe { AlgebraicValue::unchecked_read_column(self, col_layout) });
1355        }
1356        AlgebraicValue::product(elements)
1357    }
1358
1359    /// Construct a projection of the row at `self` by extracting the `cols`.
1360    ///
1361    /// Returns an error if `cols` specifies an index which is out-of-bounds for the row at `self`.
1362    ///
1363    /// If `cols` contains zero or more than one column, the values of the projected columns are wrapped in a [`ProductValue`].
1364    /// If `cols` is a single column, the value of that column is returned without wrapping in a `ProductValue`.
1365    pub fn project(self, cols: &ColList) -> Result<AlgebraicValue, InvalidFieldError> {
1366        if let Some(head) = cols.as_singleton() {
1367            return self.read_col(head).map_err(|_| head.into());
1368        }
1369        let mut elements = Vec::with_capacity(cols.len() as usize);
1370        for col in cols.iter() {
1371            let col_val = self.read_col(col).map_err(|err| match err {
1372                TypeError::WrongType { .. } => {
1373                    unreachable!("AlgebraicValue::read_column never returns a `TypeError::WrongType`")
1374                }
1375                TypeError::IndexOutOfBounds { .. } => col,
1376            })?;
1377            elements.push(col_val);
1378        }
1379        Ok(AlgebraicValue::product(elements))
1380    }
1381
1382    /// Returns the raw row pointer for this row reference.
1383    pub fn pointer(&self) -> RowPointer {
1384        self.pointer
1385    }
1386
1387    /// Returns the blob store that any [`crate::blob_store::BlobHash`]es within the row refer to.
1388    pub(crate) fn blob_store(&self) -> &dyn BlobStore {
1389        self.blob_store
1390    }
1391
1392    /// Return the layout of the row.
1393    ///
1394    /// All rows within the same table will have the same layout.
1395    pub fn row_layout(&self) -> &RowTypeLayout {
1396        &self.table.row_layout
1397    }
1398
1399    /// Returns the page the row is in and the offset of the row within that page.
1400    pub fn page_and_offset(&self) -> (&Page, PageOffset) {
1401        self.table.page_and_offset(self.pointer())
1402    }
1403
1404    /// Returns the bytes for the fixed portion of this row.
1405    pub(crate) fn get_row_data(&self) -> &Bytes {
1406        let (page, offset) = self.page_and_offset();
1407        page.get_row_data(offset, self.table.row_layout.size())
1408    }
1409
1410    /// Returns the row hash for `ptr`.
1411    pub fn row_hash(&self) -> RowHash {
1412        RowHash(RowHash::hasher_builder().hash_one(self))
1413    }
1414
1415    /// Returns the static layout for this row reference, if any.
1416    pub fn static_layout(&self) -> Option<&StaticLayout> {
1417        self.table.static_layout.as_ref().map(|(s, _)| s)
1418    }
1419
1420    /// Encode the row referred to by `self` into a `Vec<u8>` using BSATN and then deserialize it.
1421    pub fn read_via_bsatn<T>(&self, scratch: &mut Vec<u8>) -> Result<T, ReadViaBsatnError>
1422    where
1423        T: DeserializeOwned,
1424    {
1425        self.to_bsatn_extend(scratch)?;
1426        Ok(bsatn::from_slice::<T>(scratch)?)
1427    }
1428
1429    /// Return the number of bytes in the blob store to which this object holds a reference.
1430    ///
1431    /// Used to compute the table's `blob_store_bytes` when reconstructing a snapshot.
1432    ///
1433    /// Even within a single row, this is a conservative overestimate,
1434    /// as a row may contain multiple references to the same large blob.
1435    /// This seems unlikely to occur in practice.
1436    fn blob_store_bytes(&self) -> usize {
1437        let row_data = self.get_row_data();
1438        let (page, _) = self.page_and_offset();
1439        // SAFETY:
1440        // - Existence of a `RowRef` treated as proof
1441        //   of the row's validity and type information's correctness.
1442        unsafe { self.table.visitor_prog.visit_var_len(row_data) }
1443            .filter(|vlr| vlr.is_large_blob())
1444            .map(|vlr| {
1445                // SAFETY:
1446                // - Because `vlr.is_large_blob`, it points to exactly one granule.
1447                let granule = unsafe { page.iter_var_len_object(vlr.first_granule) }.next().unwrap();
1448                let blob_hash = granule.blob_hash();
1449                let blob = self.blob_store.retrieve_blob(&blob_hash).unwrap();
1450
1451                blob.len()
1452            })
1453            .sum()
1454    }
1455}
1456
1457impl Serialize for RowRef<'_> {
1458    fn serialize<S: Serializer>(&self, ser: S) -> Result<S::Ok, S::Error> {
1459        let table = self.table;
1460        let (page, offset) = table.page_and_offset(self.pointer);
1461        // SAFETY: `ptr` points to a valid row in this table per above check.
1462        unsafe { serialize_row_from_page(ser, page, self.blob_store, offset, &table.row_layout) }
1463    }
1464}
1465
1466impl ToBsatn for RowRef<'_> {
1467    /// BSATN-encode the row referred to by `self` into a freshly-allocated `Vec<u8>`.
1468    ///
1469    /// This method will use a [`StaticLayout`] if one is available,
1470    /// and may therefore be faster than calling [`bsatn::to_vec`].
1471    fn to_bsatn_vec(&self) -> Result<Vec<u8>, BsatnError> {
1472        if let Some(static_layout) = self.static_layout() {
1473            // Use fast path, by first fetching the row data and then using the static layout.
1474            let row = self.get_row_data();
1475            // SAFETY:
1476            // - Existence of a `RowRef` treated as proof
1477            //   of row's validity and type information's correctness.
1478            Ok(unsafe { static_layout.serialize_row_into_vec(row) })
1479        } else {
1480            bsatn::to_vec(self)
1481        }
1482    }
1483
1484    /// BSATN-encode the row referred to by `self` into `buf`,
1485    /// pushing `self`'s bytes onto the end of `buf`, similar to [`Vec::extend`].
1486    ///
1487    /// This method will use a [`StaticLayout`] if one is available,
1488    /// and may therefore be faster than calling [`bsatn::to_writer`].
1489    fn to_bsatn_extend(&self, buf: &mut Vec<u8>) -> Result<(), BsatnError> {
1490        if let Some(static_layout) = self.static_layout() {
1491            // Use fast path, by first fetching the row data and then using the static layout.
1492            let row = self.get_row_data();
1493            // SAFETY:
1494            // - Existence of a `RowRef` treated as proof
1495            //   of row's validity and type information's correctness.
1496            unsafe {
1497                static_layout.serialize_row_extend(buf, row);
1498            }
1499            Ok(())
1500        } else {
1501            // Use the slower, but more general, `bsatn_from` serializer to write the row.
1502            bsatn::to_writer(buf, self)
1503        }
1504    }
1505
1506    fn static_bsatn_size(&self) -> Option<u16> {
1507        self.static_layout().map(|sl| sl.bsatn_length)
1508    }
1509}
1510
1511impl Eq for RowRef<'_> {}
1512impl PartialEq for RowRef<'_> {
1513    fn eq(&self, other: &Self) -> bool {
1514        // Ensure that the layouts are the same
1515        // so that we can use `eq_row_in_page`.
1516        // To do this, we first try address equality on the layouts.
1517        // This should succeed when the rows originate from the same table.
1518        // Otherwise, actually compare the layouts, which is expensive, but unlikely to happen.
1519        let a_ty = self.row_layout();
1520        let b_ty = other.row_layout();
1521        if !(ptr::eq(a_ty, b_ty) || a_ty == b_ty) {
1522            return false;
1523        }
1524        let (page_a, offset_a) = self.page_and_offset();
1525        let (page_b, offset_b) = other.page_and_offset();
1526        let static_layout = self.static_layout();
1527        // SAFETY: `offset_a/b` are valid rows in `page_a/b` typed at `a_ty`
1528        // and `static_bsatn_layout` is derived from `a_ty`.
1529        unsafe { eq_row_in_page(page_a, page_b, offset_a, offset_b, a_ty, static_layout) }
1530    }
1531}
1532
1533impl PartialEq<ProductValue> for RowRef<'_> {
1534    fn eq(&self, rhs: &ProductValue) -> bool {
1535        let ty = self.row_layout();
1536        let (page, offset) = self.page_and_offset();
1537        // SAFETY: By having `RowRef`,
1538        // we know that `offset` is a valid offset for a row in `page` typed at `ty`.
1539        unsafe { eq_row_in_page_to_pv(self.blob_store, page, offset, rhs, ty) }
1540    }
1541}
1542
1543impl Hash for RowRef<'_> {
1544    fn hash<H: Hasher>(&self, state: &mut H) {
1545        let (page, offset) = self.table.page_and_offset(self.pointer);
1546        let ty = &self.table.row_layout;
1547        // SAFETY: A `RowRef` is a proof that `self.pointer` refers to a live fixed row in `self.table`, so:
1548        // 1. `offset` points at a row in `page` lasting `ty.size()` bytes.
1549        // 2. the row is valid for `ty`.
1550        // 3. for any `vlr: VarLenRef` stored in the row,
1551        //    `vlr.first_offset` is either `NULL` or points to a valid granule in `page`.
1552        unsafe { hash_row_in_page(state, page, self.blob_store, offset, ty) };
1553    }
1554}
1555
1556/// An iterator over all the rows, yielded as [`RowRef`]s, in a table.
1557pub struct TableScanIter<'table> {
1558    /// The current page we're yielding rows from.
1559    /// When `None`, the iterator will attempt to advance to the next page, if any.
1560    current_page: Option<FixedLenRowsIter<'table>>,
1561    /// The current page index we are or will visit.
1562    current_page_idx: PageIndex,
1563    /// The table the iterator is yielding rows from.
1564    pub(crate) table: &'table Table,
1565    /// The `BlobStore` that row references may refer into.
1566    pub(crate) blob_store: &'table dyn BlobStore,
1567}
1568
1569impl<'a> Iterator for TableScanIter<'a> {
1570    type Item = RowRef<'a>;
1571
1572    fn next(&mut self) -> Option<Self::Item> {
1573        // This could have been written using `.flat_map`,
1574        // but we don't have `type Foo = impl Iterator<...>;` on stable yet.
1575        loop {
1576            match &mut self.current_page {
1577                // We're currently visiting a page,
1578                Some(iter_fixed_len) => {
1579                    if let Some(page_offset) = iter_fixed_len.next() {
1580                        // There's still at least one row in that page to visit,
1581                        // return a ref to that row.
1582                        let ptr =
1583                            RowPointer::new(false, self.current_page_idx, page_offset, self.table.squashed_offset);
1584
1585                        // SAFETY: `offset` came from the `iter_fixed_len`, so it must point to a valid row.
1586                        let row_ref = unsafe { self.table.get_row_ref_unchecked(self.blob_store, ptr) };
1587                        return Some(row_ref);
1588                    } else {
1589                        // We've finished visiting that page, so set `current_page` to `None`,
1590                        // increment `self.current_page_idx` to the index of the next page,
1591                        // and go to the `None` case (1) in the match.
1592                        self.current_page = None;
1593                        self.current_page_idx.0 += 1;
1594                    }
1595                }
1596
1597                // (1) If we aren't currently visiting a page,
1598                // the `else` case in the `Some` match arm
1599                // already incremented `self.current_page_idx`,
1600                // or we're just beginning and so it was initialized as 0.
1601                None => {
1602                    // If there's another page, set `self.current_page` to it,
1603                    // and go to the `Some` case in the match.
1604                    let next_page = self.table.pages().get(self.current_page_idx.idx())?;
1605                    let iter = next_page.iter_fixed_len(self.table.row_size());
1606                    self.current_page = Some(iter);
1607                }
1608            }
1609        }
1610    }
1611}
1612
1613/// A combined table and index,
1614/// allowing direct extraction of a [`IndexScanIter`].
1615#[derive(Copy, Clone)]
1616pub struct TableAndIndex<'a> {
1617    table: &'a Table,
1618    blob_store: &'a dyn BlobStore,
1619    index: &'a TableIndex,
1620}
1621
1622impl<'a> TableAndIndex<'a> {
1623    pub fn table(&self) -> &'a Table {
1624        self.table
1625    }
1626
1627    pub fn index(&self) -> &'a TableIndex {
1628        self.index
1629    }
1630
1631    /// Returns an iterator yielding all rows in this index for `key`.
1632    ///
1633    /// Matching is defined by `Ord for AlgebraicValue`.
1634    pub fn seek_point(&self, key: &AlgebraicValue) -> IndexScanPointIter<'a> {
1635        IndexScanPointIter {
1636            table: self.table,
1637            blob_store: self.blob_store,
1638            btree_index_iter: self.index.seek_point(key),
1639        }
1640    }
1641
1642    /// Returns an iterator yielding all rows in this index that fall within `range`.
1643    ///
1644    /// Matching is defined by `Ord for AlgebraicValue`.
1645    pub fn seek_range(&self, range: &impl RangeBounds<AlgebraicValue>) -> IndexScanRangeIter<'a> {
1646        IndexScanRangeIter {
1647            table: self.table,
1648            blob_store: self.blob_store,
1649            btree_index_iter: self.index.seek_range(range),
1650        }
1651    }
1652}
1653
1654/// An iterator using a [`TableIndex`] to scan a `table`
1655/// for all the [`RowRef`]s matching the specified `key` in the indexed column(s).
1656///
1657/// Matching is defined by `Ord for AlgebraicValue`.
1658pub struct IndexScanPointIter<'a> {
1659    /// The table being scanned for rows.
1660    table: &'a Table,
1661    /// The blob store; passed on to the [`RowRef`]s in case they need it.
1662    blob_store: &'a dyn BlobStore,
1663    /// The iterator performing the index scan yielding row pointers.
1664    btree_index_iter: TableIndexPointIter<'a>,
1665}
1666
1667impl<'a> Iterator for IndexScanPointIter<'a> {
1668    type Item = RowRef<'a>;
1669
1670    fn next(&mut self) -> Option<Self::Item> {
1671        let ptr = self.btree_index_iter.next()?;
1672        // FIXME: Determine if this is correct and if so use `_unchecked`.
1673        // Will a table's index necessarily hold only pointers into that index?
1674        // Edge case: if an index is added during a transaction which then scans that index,
1675        // it appears that the newly-created `TxState` index
1676        // will also hold pointers into the `CommittedState`.
1677        //
1678        // SAFETY: Assuming this is correct,
1679        // `ptr` came from the index, which always holds pointers to valid rows.
1680        self.table.get_row_ref(self.blob_store, ptr)
1681    }
1682}
1683
1684/// An iterator using a [`TableIndex`] to scan a `table`
1685/// for all the [`RowRef`]s matching the specified `range` in the indexed column(s).
1686///
1687/// Matching is defined by `Ord for AlgebraicValue`.
1688pub struct IndexScanRangeIter<'a> {
1689    /// The table being scanned for rows.
1690    table: &'a Table,
1691    /// The blob store; passed on to the [`RowRef`]s in case they need it.
1692    blob_store: &'a dyn BlobStore,
1693    /// The iterator performing the index scan yielding row pointers.
1694    btree_index_iter: TableIndexRangeIter<'a>,
1695}
1696
1697impl<'a> Iterator for IndexScanRangeIter<'a> {
1698    type Item = RowRef<'a>;
1699
1700    fn next(&mut self) -> Option<Self::Item> {
1701        let ptr = self.btree_index_iter.next()?;
1702        // FIXME: Determine if this is correct and if so use `_unchecked`.
1703        // Will a table's index necessarily hold only pointers into that index?
1704        // Edge case: if an index is added during a transaction which then scans that index,
1705        // it appears that the newly-created `TxState` index
1706        // will also hold pointers into the `CommittedState`.
1707        //
1708        // SAFETY: Assuming this is correct,
1709        // `ptr` came from the index, which always holds pointers to valid rows.
1710        self.table.get_row_ref(self.blob_store, ptr)
1711    }
1712}
1713
1714#[derive(Error, Debug, PartialEq, Eq)]
1715#[error("Unique constraint violation '{}' in table '{}': column(s): '{:?}' value: {}", constraint_name, table_name, cols, value.to_satn())]
1716pub struct UniqueConstraintViolation {
1717    pub constraint_name: Box<str>,
1718    pub table_name: Box<str>,
1719    pub cols: Vec<Box<str>>,
1720    pub value: AlgebraicValue,
1721}
1722
1723impl UniqueConstraintViolation {
1724    /// Returns a unique constraint violation error for the given `index`
1725    /// and the `value` that would have been duplicated.
1726    #[cold]
1727    fn build(schema: &TableSchema, index: &TableIndex, index_id: IndexId, value: AlgebraicValue) -> Self {
1728        // Fetch the table name.
1729        let table_name = schema.table_name.clone();
1730
1731        // Fetch the names of the columns used in the index.
1732        let cols = index
1733            .indexed_columns
1734            .iter()
1735            .map(|x| schema.columns()[x.idx()].col_name.clone())
1736            .collect();
1737
1738        // Fetch the name of the index.
1739        let constraint_name = schema
1740            .indexes
1741            .iter()
1742            .find(|i| i.index_id == index_id)
1743            .unwrap()
1744            .index_name
1745            .clone();
1746
1747        Self {
1748            constraint_name,
1749            table_name,
1750            cols,
1751            value,
1752        }
1753    }
1754}
1755
1756// Private API:
1757impl Table {
1758    /// Returns a unique constraint violation error for the given `index`
1759    /// and the `value` that would have been duplicated.
1760    #[cold]
1761    pub fn build_error_unique(
1762        &self,
1763        index: &TableIndex,
1764        index_id: IndexId,
1765        value: AlgebraicValue,
1766    ) -> UniqueConstraintViolation {
1767        let schema = self.get_schema();
1768        UniqueConstraintViolation::build(schema, index, index_id, value)
1769    }
1770
1771    /// Returns a new empty table using the particulars passed.
1772    fn new_with_indexes_capacity(
1773        schema: Arc<TableSchema>,
1774        row_layout: RowTypeLayout,
1775        static_layout: Option<(StaticLayout, StaticBsatnValidator)>,
1776        visitor_prog: VarLenVisitorProgram,
1777        squashed_offset: SquashedOffset,
1778        pointer_map: Option<PointerMap>,
1779    ) -> Self {
1780        Self {
1781            inner: TableInner {
1782                row_layout,
1783                static_layout,
1784                visitor_prog,
1785                pages: Pages::default(),
1786            },
1787            is_scheduler: schema.schedule.is_some(),
1788            schema,
1789            indexes: BTreeMap::new(),
1790            pointer_map,
1791            squashed_offset,
1792            row_count: 0,
1793            blob_store_bytes: BlobNumBytes::default(),
1794        }
1795    }
1796
1797    /// Returns whether the row at `ptr` is present or not.
1798    // TODO: Remove all uses of this method,
1799    //       or more likely, gate them behind `debug_assert!`
1800    //       so they don't have semantic meaning.
1801    //
1802    //       Unlike the previous `locking_tx_datastore::Table`'s `RowId`,
1803    //       `RowPointer` is not content-addressed.
1804    //       This means it is possible to:
1805    //       - have a `RowPointer` A* to row A,
1806    //       - Delete row A,
1807    //       - Insert row B into the same storage as freed from A,
1808    //       - Test `is_row_present(A*)`, which falsely reports that row A is still present.
1809    //
1810    //       In the final interface, this method is superfluous anyways,
1811    //       as `RowPointer` is not part of our public interface.
1812    //       Instead, we will always discover a known-present `RowPointer`
1813    //       during a table scan or index seek.
1814    //       As such, our `delete` and `insert` methods can be `unsafe`
1815    //       and trust that the `RowPointer` is valid.
1816    fn is_row_present(&self, ptr: RowPointer) -> bool {
1817        if self.squashed_offset != ptr.squashed_offset() {
1818            return false;
1819        }
1820        let Some((page, offset)) = self.inner.try_page_and_offset(ptr) else {
1821            return false;
1822        };
1823        page.has_row_offset(self.row_size(), offset)
1824    }
1825
1826    /// Returns the row size for a row in the table.
1827    pub fn row_size(&self) -> Size {
1828        self.row_layout().size()
1829    }
1830
1831    /// Returns the layout for a row in the table.
1832    fn row_layout(&self) -> &RowTypeLayout {
1833        &self.inner.row_layout
1834    }
1835
1836    /// Returns the pages storing the physical rows of this table.
1837    fn pages(&self) -> &Pages {
1838        &self.inner.pages
1839    }
1840
1841    /// Iterates over each [`Page`] in this table, ensuring that its hash is computed before yielding it.
1842    ///
1843    /// Used when capturing a snapshot.
1844    pub fn iter_pages_with_hashes(&mut self) -> impl Iterator<Item = (blake3::Hash, &Page)> {
1845        self.inner.pages.iter_mut().map(|page| {
1846            let hash = page.save_or_get_content_hash();
1847            (hash, &**page)
1848        })
1849    }
1850
1851    /// Returns the number of pages storing the physical rows of this table.
1852    fn num_pages(&self) -> usize {
1853        self.inner.pages.len()
1854    }
1855
1856    /// Returns the [`StaticLayout`] for this table,
1857    pub(crate) fn static_layout(&self) -> Option<&StaticLayout> {
1858        self.inner.static_layout.as_ref().map(|(s, _)| s)
1859    }
1860
1861    /// Rebuild the [`PointerMap`] by iterating over all the rows in `self` and inserting them.
1862    ///
1863    /// Called when restoring from a snapshot after installing the pages,
1864    /// but after computing the row count,
1865    /// since snapshots do not save the pointer map..
1866    fn rebuild_pointer_map(&mut self, blob_store: &dyn BlobStore) {
1867        // TODO(perf): Pre-allocate `PointerMap.map` with capacity `self.row_count`.
1868        // Alternatively, do this at the same time as `compute_row_count`.
1869        let ptrs = self
1870            .scan_rows(blob_store)
1871            .map(|row_ref| (row_ref.row_hash(), row_ref.pointer()))
1872            .collect::<PointerMap>();
1873        self.pointer_map = Some(ptrs);
1874    }
1875
1876    /// Compute and store `self.row_count` and `self.blob_store_bytes`
1877    /// by iterating over all the rows in `self` and counting them.
1878    ///
1879    /// Called when restoring from a snapshot after installing the pages,
1880    /// since snapshots do not save this metadata.
1881    fn compute_row_count(&mut self, blob_store: &dyn BlobStore) {
1882        let mut row_count = 0;
1883        let mut blob_store_bytes = 0;
1884        for row in self.scan_rows(blob_store) {
1885            row_count += 1;
1886            blob_store_bytes += row.blob_store_bytes();
1887        }
1888        self.row_count = row_count as u64;
1889        self.blob_store_bytes = blob_store_bytes.into();
1890    }
1891}
1892
1893#[cfg(test)]
1894pub(crate) mod test {
1895    use super::*;
1896    use crate::blob_store::{HashMapBlobStore, NullBlobStore};
1897    use crate::page::tests::hash_unmodified_save_get;
1898    use crate::var_len::VarLenGranule;
1899    use proptest::prelude::*;
1900    use proptest::test_runner::TestCaseResult;
1901    use spacetimedb_lib::db::raw_def::v9::{RawIndexAlgorithm, RawModuleDefV9Builder};
1902    use spacetimedb_primitives::{col_list, TableId};
1903    use spacetimedb_sats::bsatn::to_vec;
1904    use spacetimedb_sats::proptest::{generate_typed_row, generate_typed_row_vec};
1905    use spacetimedb_sats::{product, AlgebraicType, ArrayValue};
1906    use spacetimedb_schema::def::{BTreeAlgorithm, ModuleDef};
1907    use spacetimedb_schema::schema::Schema as _;
1908
1909    /// Create a `Table` from a `ProductType` without validation.
1910    pub(crate) fn table(ty: ProductType) -> Table {
1911        // Use a fast path here to avoid slowing down Miri in the proptests.
1912        // Does not perform validation.
1913        let schema = TableSchema::from_product_type(ty);
1914        Table::new(schema.into(), SquashedOffset::COMMITTED_STATE)
1915    }
1916
1917    #[test]
1918    fn unique_violation_error() {
1919        let table_name = "UniqueIndexed";
1920        let index_name = "UniqueIndexed_unique_col_idx_btree";
1921        let mut builder = RawModuleDefV9Builder::new();
1922        builder
1923            .build_table_with_new_type(
1924                table_name,
1925                ProductType::from([("unique_col", AlgebraicType::I32), ("other_col", AlgebraicType::I32)]),
1926                true,
1927            )
1928            .with_unique_constraint(0)
1929            .with_index(
1930                RawIndexAlgorithm::BTree { columns: col_list![0] },
1931                "accessor_name_doesnt_matter",
1932            );
1933
1934        let def: ModuleDef = builder.finish().try_into().expect("Failed to build schema");
1935
1936        let schema = TableSchema::from_module_def(&def, def.table(table_name).unwrap(), (), TableId::SENTINEL);
1937        assert_eq!(schema.indexes.len(), 1);
1938        let index_schema = schema.indexes[0].clone();
1939
1940        let mut table = Table::new(schema.into(), SquashedOffset::COMMITTED_STATE);
1941        let cols = ColList::new(0.into());
1942        let algo = BTreeAlgorithm { columns: cols.clone() }.into();
1943
1944        let index = table.new_index(&algo, true).unwrap();
1945        // SAFETY: Index was derived from `table`.
1946        unsafe { table.insert_index(&NullBlobStore, index_schema.index_id, index) };
1947
1948        // Reserve a page so that we can check the hash.
1949        let pi = table.inner.pages.reserve_empty_page(table.row_size()).unwrap();
1950        let hash_pre_ins = hash_unmodified_save_get(&mut table.inner.pages[pi]);
1951
1952        // Insert the row (0, 0).
1953        table
1954            .insert(&mut NullBlobStore, &product![0i32, 0i32])
1955            .expect("Initial insert failed");
1956
1957        // Inserting cleared the hash.
1958        let hash_post_ins = hash_unmodified_save_get(&mut table.inner.pages[pi]);
1959        assert_ne!(hash_pre_ins, hash_post_ins);
1960
1961        // Try to insert the row (0, 1), and assert that we get the expected error.
1962        match table.insert(&mut NullBlobStore, &product![0i32, 1i32]) {
1963            Ok(_) => panic!("Second insert with same unique value succeeded"),
1964            Err(InsertError::IndexError(UniqueConstraintViolation {
1965                constraint_name,
1966                table_name,
1967                cols,
1968                value,
1969            })) => {
1970                assert_eq!(&*constraint_name, index_name);
1971                assert_eq!(&*table_name, "UniqueIndexed");
1972                assert_eq!(cols.iter().map(|c| c.to_string()).collect::<Vec<_>>(), &["unique_col"]);
1973                assert_eq!(value, AlgebraicValue::I32(0));
1974            }
1975            Err(e) => panic!("Expected UniqueConstraintViolation but found {:?}", e),
1976        }
1977
1978        // Second insert did clear the hash while we had a constraint violation,
1979        // as constraint checking is done after insertion and then rolled back.
1980        assert_eq!(table.inner.pages[pi].unmodified_hash(), None);
1981    }
1982
1983    fn insert_retrieve_body(ty: impl Into<ProductType>, val: impl Into<ProductValue>) -> TestCaseResult {
1984        let val = val.into();
1985        let mut blob_store = HashMapBlobStore::default();
1986        let mut table = table(ty.into());
1987        let (hash, row) = table.insert(&mut blob_store, &val).unwrap();
1988        let hash = hash.unwrap();
1989        prop_assert_eq!(row.row_hash(), hash);
1990        let ptr = row.pointer();
1991        prop_assert_eq!(table.pointers_for(hash), &[ptr]);
1992
1993        prop_assert_eq!(table.inner.pages.len(), 1);
1994        prop_assert_eq!(table.inner.pages[PageIndex(0)].num_rows(), 1);
1995
1996        let row_ref = table.get_row_ref(&blob_store, ptr).unwrap();
1997        prop_assert_eq!(row_ref.to_product_value(), val.clone());
1998        let bsatn_val = to_vec(&val).unwrap();
1999        prop_assert_eq!(&bsatn_val, &to_vec(&row_ref).unwrap());
2000        prop_assert_eq!(&bsatn_val, &row_ref.to_bsatn_vec().unwrap());
2001
2002        prop_assert_eq!(
2003            &table.scan_rows(&blob_store).map(|r| r.pointer()).collect::<Vec<_>>(),
2004            &[ptr]
2005        );
2006
2007        Ok(())
2008    }
2009
2010    #[test]
2011    fn repro_serialize_bsatn_empty_array() {
2012        let ty = AlgebraicType::array(AlgebraicType::U64);
2013        let arr = ArrayValue::from(Vec::<u64>::new().into_boxed_slice());
2014        insert_retrieve_body(ty, AlgebraicValue::from(arr)).unwrap();
2015    }
2016
2017    #[test]
2018    fn repro_serialize_bsatn_debug_assert() {
2019        let ty = AlgebraicType::array(AlgebraicType::U64);
2020        let arr = ArrayValue::from((0..130u64).collect::<Box<_>>());
2021        insert_retrieve_body(ty, AlgebraicValue::from(arr)).unwrap();
2022    }
2023
2024    fn reconstruct_index_num_key_bytes(table: &Table, blob_store: &dyn BlobStore, index_id: IndexId) -> u64 {
2025        let index = table.get_index_by_id(index_id).unwrap();
2026
2027        index
2028            .seek_range(&(..))
2029            .map(|row_ptr| {
2030                let row_ref = table.get_row_ref(blob_store, row_ptr).unwrap();
2031                let key = row_ref.project(&index.indexed_columns).unwrap();
2032                crate::table_index::KeySize::key_size_in_bytes(&key) as u64
2033            })
2034            .sum()
2035    }
2036
2037    /// Given a row type `ty`, a set of rows of that type `vals`,
2038    /// and a set of columns within that type `indexed_columns`,
2039    /// populate a table with `vals`, add an index on the `indexed_columns`,
2040    /// and perform various assertions that the reported index size metrics are correct.
2041    fn test_index_size_reporting(
2042        ty: ProductType,
2043        vals: Vec<ProductValue>,
2044        indexed_columns: ColList,
2045    ) -> Result<(), TestCaseError> {
2046        let mut blob_store = HashMapBlobStore::default();
2047        let mut table = table(ty.clone());
2048
2049        for row in &vals {
2050            prop_assume!(table.insert(&mut blob_store, row).is_ok());
2051        }
2052
2053        // We haven't added any indexes yet, so there should be 0 rows in indexes.
2054        prop_assert_eq!(table.num_rows_in_indexes(), 0);
2055
2056        let index_id = IndexId(0);
2057
2058        let algo = BTreeAlgorithm {
2059            columns: indexed_columns.clone(),
2060        }
2061        .into();
2062        let index = TableIndex::new(&ty, &algo, false).unwrap();
2063        // Add an index on column 0.
2064        // Safety:
2065        // We're using `ty` as the row type for both `table` and the new index.
2066        unsafe { table.insert_index(&blob_store, index_id, index) };
2067
2068        // We have one index, which should be fully populated,
2069        // so in total we should have the same number of rows in indexes as we have rows.
2070        prop_assert_eq!(table.num_rows_in_indexes(), table.num_rows());
2071
2072        let index = table.get_index_by_id(index_id).unwrap();
2073
2074        // One index, so table's reporting of bytes used should match that index's reporting.
2075        prop_assert_eq!(table.bytes_used_by_index_keys(), index.num_key_bytes());
2076
2077        // Walk all the rows in the index, sum their key size,
2078        // and assert it matches the `index.num_key_bytes()`
2079        prop_assert_eq!(
2080            index.num_key_bytes(),
2081            reconstruct_index_num_key_bytes(&table, &blob_store, index_id)
2082        );
2083
2084        // Walk all the rows we inserted, project them to the cols that will be their keys,
2085        // sum their key size,
2086        // and assert it matches the `index.num_key_bytes()`
2087        let key_size_in_pvs = vals
2088            .iter()
2089            .map(|row| crate::table_index::KeySize::key_size_in_bytes(&row.project(&indexed_columns).unwrap()) as u64)
2090            .sum();
2091        prop_assert_eq!(index.num_key_bytes(), key_size_in_pvs);
2092
2093        let algo = BTreeAlgorithm {
2094            columns: indexed_columns,
2095        }
2096        .into();
2097        let index = TableIndex::new(&ty, &algo, false).unwrap();
2098        // Add a duplicate of the same index, so we can check that all above quantities double.
2099        // Safety:
2100        // As above, we're using `ty` as the row type for both `table` and the new index.
2101        unsafe { table.insert_index(&blob_store, IndexId(1), index) };
2102
2103        prop_assert_eq!(table.num_rows_in_indexes(), table.num_rows() * 2);
2104        prop_assert_eq!(table.bytes_used_by_index_keys(), key_size_in_pvs * 2);
2105
2106        Ok(())
2107    }
2108
2109    proptest! {
2110        #![proptest_config(ProptestConfig { max_shrink_iters: 0x10000000, ..Default::default() })]
2111
2112        #[test]
2113        fn insert_retrieve((ty, val) in generate_typed_row()) {
2114            insert_retrieve_body(ty, val)?;
2115        }
2116
2117        #[test]
2118        fn insert_delete_removed_from_pointer_map((ty, val) in generate_typed_row()) {
2119            let mut blob_store = HashMapBlobStore::default();
2120            let mut table = table(ty);
2121            let (hash, row) = table.insert(&mut blob_store, &val).unwrap();
2122            let hash = hash.unwrap();
2123            prop_assert_eq!(row.row_hash(), hash);
2124            let ptr = row.pointer();
2125            prop_assert_eq!(table.pointers_for(hash), &[ptr]);
2126
2127            prop_assert_eq!(table.inner.pages.len(), 1);
2128            prop_assert_eq!(table.inner.pages[PageIndex(0)].num_rows(), 1);
2129            prop_assert_eq!(&table.scan_rows(&blob_store).map(|r| r.pointer()).collect::<Vec<_>>(), &[ptr]);
2130            prop_assert_eq!(table.row_count, 1);
2131
2132            let hash_pre_del = hash_unmodified_save_get(&mut table.inner.pages[ptr.page_index()]);
2133
2134            table.delete(&mut blob_store, ptr, |_| ());
2135
2136            let hash_post_del = hash_unmodified_save_get(&mut table.inner.pages[ptr.page_index()]);
2137            assert_ne!(hash_pre_del, hash_post_del);
2138
2139            prop_assert_eq!(table.pointers_for(hash), &[]);
2140
2141            prop_assert_eq!(table.inner.pages.len(), 1);
2142            prop_assert_eq!(table.inner.pages[PageIndex(0)].num_rows(), 0);
2143            prop_assert_eq!(table.row_count, 0);
2144
2145            prop_assert!(&table.scan_rows(&blob_store).next().is_none());
2146        }
2147
2148        #[test]
2149        fn insert_duplicate_set_semantic((ty, val) in generate_typed_row()) {
2150            let mut blob_store = HashMapBlobStore::default();
2151            let mut table = table(ty);
2152
2153            let (hash, row) = table.insert(&mut blob_store, &val).unwrap();
2154            let hash = hash.unwrap();
2155            prop_assert_eq!(row.row_hash(), hash);
2156            let ptr = row.pointer();
2157            prop_assert_eq!(table.inner.pages.len(), 1);
2158            prop_assert_eq!(table.pointers_for(hash), &[ptr]);
2159            prop_assert_eq!(table.row_count, 1);
2160            prop_assert_eq!(&table.scan_rows(&blob_store).map(|r| r.pointer()).collect::<Vec<_>>(), &[ptr]);
2161
2162            let blob_uses = blob_store.usage_counter();
2163
2164            let hash_pre_ins = hash_unmodified_save_get(&mut table.inner.pages[ptr.page_index()]);
2165
2166            prop_assert!(table.insert(&mut blob_store, &val).is_err());
2167
2168            // Hash was cleared and is different despite failure to insert.
2169            let hash_post_ins = hash_unmodified_save_get(&mut table.inner.pages[ptr.page_index()]);
2170            assert_ne!(hash_pre_ins, hash_post_ins);
2171
2172            prop_assert_eq!(table.row_count, 1);
2173            prop_assert_eq!(table.inner.pages.len(), 1);
2174            prop_assert_eq!(table.pointers_for(hash), &[ptr]);
2175
2176            let blob_uses_after = blob_store.usage_counter();
2177
2178            prop_assert_eq!(blob_uses_after, blob_uses);
2179            prop_assert_eq!(table.inner.pages[PageIndex(0)].num_rows(), 1);
2180            prop_assert_eq!(&table.scan_rows(&blob_store).map(|r| r.pointer()).collect::<Vec<_>>(), &[ptr]);
2181        }
2182
2183        #[test]
2184        fn insert_bsatn_same_as_pv((ty, val) in generate_typed_row()) {
2185            let mut bs_pv = HashMapBlobStore::default();
2186            let mut table_pv = table(ty.clone());
2187            let res_pv = table_pv.insert(&mut bs_pv, &val);
2188
2189            let mut bs_bsatn = HashMapBlobStore::default();
2190            let mut table_bsatn = table(ty);
2191            let res_bsatn = insert_bsatn(&mut table_bsatn, &mut bs_bsatn, &val);
2192
2193            prop_assert_eq!(res_pv, res_bsatn);
2194            prop_assert_eq!(bs_pv, bs_bsatn);
2195            prop_assert_eq!(table_pv, table_bsatn);
2196        }
2197
2198        #[test]
2199        fn row_size_reporting_matches_slow_implementations((ty, vals) in generate_typed_row_vec(128, 2048)) {
2200            let mut blob_store = HashMapBlobStore::default();
2201            let mut table = table(ty.clone());
2202
2203            for row in &vals {
2204                prop_assume!(table.insert(&mut blob_store, row).is_ok());
2205            }
2206
2207            prop_assert_eq!(table.bytes_used_by_rows(), table.reconstruct_bytes_used_by_rows());
2208            prop_assert_eq!(table.num_rows(), table.reconstruct_num_rows());
2209            prop_assert_eq!(table.num_rows(), vals.len() as u64);
2210
2211            // TODO(testing): Determine if there's a meaningful way to test that the blob store reporting is correct.
2212            // I (pgoldman 2025-01-27) doubt it, as the test would be "visit every blob and sum their size,"
2213            // which is already what the actual implementation does.
2214        }
2215
2216        #[test]
2217        fn index_size_reporting_matches_slow_implementations_single_column((ty, vals) in generate_typed_row_vec(128, 2048)) {
2218            prop_assume!(!ty.elements.is_empty());
2219
2220            test_index_size_reporting(ty, vals, ColList::from(ColId(0)))?;
2221        }
2222
2223        #[test]
2224        fn index_size_reporting_matches_slow_implementations_two_column((ty, vals) in generate_typed_row_vec(128, 2048)) {
2225            prop_assume!(ty.elements.len() >= 2);
2226
2227
2228            test_index_size_reporting(ty, vals, ColList::from([ColId(0), ColId(1)]))?;
2229        }
2230    }
2231
2232    fn insert_bsatn<'a>(
2233        table: &'a mut Table,
2234        blob_store: &'a mut dyn BlobStore,
2235        val: &ProductValue,
2236    ) -> Result<(Option<RowHash>, RowRef<'a>), InsertError> {
2237        let row = &to_vec(&val).unwrap();
2238
2239        // Optimistically insert the `row` before checking any constraints
2240        // under the assumption that errors (unique constraint & set semantic violations) are rare.
2241        let (row_ref, blob_bytes) = table.insert_physically_bsatn(blob_store, row)?;
2242        let row_ptr = row_ref.pointer();
2243
2244        // Confirm the insertion, checking any constraints, removing the physical row on error.
2245        // SAFETY: We just inserted `ptr`, so it must be present.
2246        let (hash, row_ptr) = unsafe { table.confirm_insertion(blob_store, row_ptr, blob_bytes) }?;
2247        // SAFETY: Per post-condition of `confirm_insertion`, `row_ptr` refers to a valid row.
2248        let row_ref = unsafe { table.inner.get_row_ref_unchecked(blob_store, row_ptr) };
2249        Ok((hash, row_ref))
2250    }
2251
2252    // Compare `scan_rows` against a simpler implementation.
2253    #[test]
2254    fn table_scan_iter_eq_flatmap() {
2255        let mut blob_store = HashMapBlobStore::default();
2256        let mut table = table(AlgebraicType::U64.into());
2257        for v in 0..2u64.pow(14) {
2258            table.insert(&mut blob_store, &product![v]).unwrap();
2259        }
2260
2261        let complex = table.scan_rows(&blob_store).map(|r| r.pointer());
2262        let simple = table
2263            .inner
2264            .pages
2265            .iter()
2266            .zip((0..).map(PageIndex))
2267            .flat_map(|(page, pi)| {
2268                page.iter_fixed_len(table.row_size())
2269                    .map(move |po| RowPointer::new(false, pi, po, table.squashed_offset))
2270            });
2271        assert!(complex.eq(simple));
2272    }
2273
2274    #[test]
2275    #[should_panic]
2276    fn read_row_unaligned_page_offset_soundness() {
2277        // Insert a `u64` into a table.
2278        let pt = AlgebraicType::U64.into();
2279        let pv = product![42u64];
2280        let mut table = table(pt);
2281        let blob_store = &mut NullBlobStore;
2282        let (_, row_ref) = table.insert(blob_store, &pv).unwrap();
2283
2284        // Manipulate the page offset to 1 instead of 0.
2285        // This now points into the "middle" of a row.
2286        let ptr = row_ref.pointer().with_page_offset(PageOffset(1));
2287
2288        // We expect this to panic.
2289        // Miri should not have any issue with this call either.
2290        table.get_row_ref(&NullBlobStore, ptr).unwrap().to_product_value();
2291    }
2292
2293    #[test]
2294    fn test_blob_store_bytes() {
2295        let pt: ProductType = [AlgebraicType::String, AlgebraicType::I32].into();
2296        let blob_store = &mut HashMapBlobStore::default();
2297        let mut insert =
2298            |table: &mut Table, string, num| table.insert(blob_store, &product![string, num]).unwrap().1.pointer();
2299        let mut table1 = table(pt.clone());
2300
2301        // Insert short string, `blob_store_bytes` should be 0.
2302        let short_str = std::str::from_utf8(&[98; 6]).unwrap();
2303        let short_row_ptr = insert(&mut table1, short_str, 0);
2304        assert_eq!(table1.blob_store_bytes.0, 0);
2305
2306        // Insert long string, `blob_store_bytes` should be the length of the string.
2307        const BLOB_OBJ_LEN: BlobNumBytes = BlobNumBytes(VarLenGranule::OBJECT_SIZE_BLOB_THRESHOLD + 1);
2308        let long_str = std::str::from_utf8(&[98; BLOB_OBJ_LEN.0]).unwrap();
2309        let long_row_ptr = insert(&mut table1, long_str, 0);
2310        assert_eq!(table1.blob_store_bytes, BLOB_OBJ_LEN);
2311
2312        // Insert previous long string in the same table,
2313        // `blob_store_bytes` should count the length twice,
2314        // even though `HashMapBlobStore` deduplicates it.
2315        let long_row_ptr2 = insert(&mut table1, long_str, 1);
2316        const BLOB_OBJ_LEN_2X: BlobNumBytes = BlobNumBytes(BLOB_OBJ_LEN.0 * 2);
2317        assert_eq!(table1.blob_store_bytes, BLOB_OBJ_LEN_2X);
2318
2319        // Insert previous long string in a new table,
2320        // `blob_store_bytes` should show the length,
2321        // even though `HashMapBlobStore` deduplicates it.
2322        let mut table2 = table(pt);
2323        let _ = insert(&mut table2, long_str, 0);
2324        assert_eq!(table2.blob_store_bytes, BLOB_OBJ_LEN);
2325
2326        // Delete `short_str` row. This should not affect the byte count.
2327        table1.delete(blob_store, short_row_ptr, |_| ()).unwrap();
2328        assert_eq!(table1.blob_store_bytes, BLOB_OBJ_LEN_2X);
2329
2330        // Delete the first long string row. This gets us down to `BLOB_OBJ_LEN` (we had 2x before).
2331        table1.delete(blob_store, long_row_ptr, |_| ()).unwrap();
2332        assert_eq!(table1.blob_store_bytes, BLOB_OBJ_LEN);
2333
2334        // Delete the first long string row. This gets us down to 0 (we've now deleted 2x).
2335        table1.delete(blob_store, long_row_ptr2, |_| ()).unwrap();
2336        assert_eq!(table1.blob_store_bytes, 0.into());
2337    }
2338
2339    /// Assert that calling `get_row_ref` to get a row ref to a non-existent `RowPointer`
2340    /// does not panic.
2341    #[test]
2342    fn get_row_ref_no_panic() {
2343        let blob_store = &mut HashMapBlobStore::default();
2344        let table = table([AlgebraicType::String, AlgebraicType::I32].into());
2345
2346        // This row pointer has an incorrect `SquashedOffset`, and so does not point into `table`.
2347        assert!(table
2348            .get_row_ref(
2349                blob_store,
2350                RowPointer::new(false, PageIndex(0), PageOffset(0), SquashedOffset::TX_STATE),
2351            )
2352            .is_none());
2353
2354        // This row pointer has the correct `SquashedOffset`, but points out-of-bounds within `table`.
2355        assert!(table
2356            .get_row_ref(
2357                blob_store,
2358                RowPointer::new(false, PageIndex(0), PageOffset(0), SquashedOffset::COMMITTED_STATE),
2359            )
2360            .is_none());
2361    }
2362}