spacetimedb_table/
table.rs

1use crate::{
2    bflatn_to::write_row_to_pages_bsatn,
3    layout::AlgebraicTypeLayout,
4    static_bsatn_validator::{static_bsatn_validator, validate_bsatn, StaticBsatnValidator},
5    table_index::TableIndexPointIter,
6};
7
8use super::{
9    bflatn_from::serialize_row_from_page,
10    bflatn_to::{write_row_to_pages, Error},
11    blob_store::BlobStore,
12    eq::eq_row_in_page,
13    eq_to_pv::eq_row_in_page_to_pv,
14    indexes::{Bytes, PageIndex, PageOffset, RowHash, RowPointer, Size, SquashedOffset, PAGE_DATA_SIZE},
15    layout::RowTypeLayout,
16    page::{FixedLenRowsIter, Page},
17    pages::Pages,
18    pointer_map::PointerMap,
19    read_column::{ReadColumn, TypeError},
20    row_hash::hash_row_in_page,
21    row_type_visitor::{row_type_visitor, VarLenVisitorProgram},
22    static_assert_size,
23    static_layout::StaticLayout,
24    table_index::{TableIndex, TableIndexRangeIter},
25    var_len::VarLenMembers,
26    MemoryUsage,
27};
28use core::ops::RangeBounds;
29use core::{fmt, ptr};
30use core::{
31    hash::{Hash, Hasher},
32    hint::unreachable_unchecked,
33};
34use derive_more::{Add, AddAssign, From, Sub, SubAssign};
35use enum_as_inner::EnumAsInner;
36use smallvec::SmallVec;
37use spacetimedb_lib::{bsatn::DecodeError, de::DeserializeOwned};
38use spacetimedb_primitives::{ColId, ColList, IndexId, SequenceId};
39use spacetimedb_sats::{
40    algebraic_value::ser::ValueSerializer,
41    bsatn::{self, ser::BsatnError, ToBsatn},
42    i256,
43    product_value::InvalidFieldError,
44    satn::Satn,
45    ser::{Serialize, Serializer},
46    u256, AlgebraicValue, ProductType, ProductValue,
47};
48use spacetimedb_schema::{def::IndexAlgorithm, schema::TableSchema, type_for_generate::PrimitiveType};
49use std::{
50    collections::{btree_map, BTreeMap},
51    sync::Arc,
52};
53use thiserror::Error;
54
55/// The number of bytes used by, added to, or removed from a [`Table`]'s share of a [`BlobStore`].
56#[derive(Copy, Clone, PartialEq, Eq, Debug, Default, From, Add, Sub, AddAssign, SubAssign)]
57pub struct BlobNumBytes(usize);
58
59impl MemoryUsage for BlobNumBytes {}
60
61pub type SeqIdList = SmallVec<[SequenceId; 4]>;
62static_assert_size!(SeqIdList, 24);
63
64/// A database table containing the row schema, the rows, and indices.
65///
66/// The table stores the rows into a page manager
67/// and uses an internal map to ensure that no identical row is stored more than once.
68#[derive(Debug, PartialEq, Eq)]
69pub struct Table {
70    /// Page manager and row layout grouped together, for `RowRef` purposes.
71    inner: TableInner,
72    /// Maps `RowHash -> [RowPointer]` where a [`RowPointer`] points into `pages`.
73    /// A [`PointerMap`] is effectively a specialized unique index on all the columns.
74    ///
75    /// In tables without any other unique constraints,
76    /// the pointer map is used to enforce set semantics,
77    /// i.e. to prevent duplicate rows.
78    /// If `self.indexes` contains at least one unique index,
79    /// duplicate rows are impossible regardless, so this will be `None`.
80    pointer_map: Option<PointerMap>,
81    /// The indices associated with a set of columns of the table.
82    pub indexes: BTreeMap<IndexId, TableIndex>,
83    /// The schema of the table, from which the type, and other details are derived.
84    pub schema: Arc<TableSchema>,
85    /// `SquashedOffset::TX_STATE` or `SquashedOffset::COMMITTED_STATE`
86    /// depending on whether this is a tx scratchpad table
87    /// or a committed table.
88    squashed_offset: SquashedOffset,
89    /// Store number of rows present in table.
90    pub row_count: u64,
91    /// Stores the sum total number of bytes that each blob object in the table occupies.
92    ///
93    /// Note that the [`HashMapBlobStore`] does ref-counting and de-duplication,
94    /// but this sum will count an object each time its hash is mentioned, rather than just once.
95    blob_store_bytes: BlobNumBytes,
96    /// Indicates whether this is a scheduler table or not.
97    ///
98    /// This is an optimization to avoid checking the schema in e.g., `InstanceEnv::{insert, update}`.
99    is_scheduler: bool,
100}
101
102/// The part of a `Table` concerned only with storing rows.
103///
104/// Separated from the "outer" parts of `Table`, especially the `indexes`,
105/// so that `RowRef` can borrow only the `TableInner`,
106/// while other mutable references to the `indexes` exist.
107/// This is necessary because index insertions and deletions take a `RowRef` as an argument,
108/// from which they [`ReadColumn::read_column`] their keys.
109#[derive(Debug, PartialEq, Eq)]
110pub(crate) struct TableInner {
111    /// The type of rows this table stores, with layout information included.
112    row_layout: RowTypeLayout,
113    /// A [`StaticLayout`] for fast BFLATN <-> BSATN conversion,
114    /// if the [`RowTypeLayout`] has a static BSATN length and layout.
115    ///
116    /// A [`StaticBsatnValidator`] is also included.
117    /// It's used to validate BSATN-encoded rows before converting to BFLATN.
118    static_layout: Option<(StaticLayout, StaticBsatnValidator)>,
119    /// The visitor program for `row_layout`.
120    ///
121    /// Must be in the `TableInner` so that [`RowRef::blob_store_bytes`] can use it.
122    visitor_prog: VarLenVisitorProgram,
123    /// The page manager that holds rows
124    /// including both their fixed and variable components.
125    pages: Pages,
126}
127
128impl TableInner {
129    /// Assumes `ptr` is a present row in `self` and returns a [`RowRef`] to it.
130    ///
131    /// # Safety
132    ///
133    /// The requirement is that `table.is_row_present(ptr)` must hold,
134    /// where `table` is the `Table` which contains this `TableInner`.
135    /// That is, `ptr` must refer to a row within `self`
136    /// which was previously inserted and has not been deleted since.
137    ///
138    /// This means:
139    /// - The `PageIndex` of `ptr` must be in-bounds for `self.pages`.
140    /// - The `PageOffset` of `ptr` must be properly aligned for the row type of `self`,
141    ///   and must refer to a valid, live row in that page.
142    /// - The `SquashedOffset` of `ptr` must match the enclosing table's `table.squashed_offset`.
143    ///
144    /// Showing that `ptr` was the result of a call to [`Table::insert(table, ..)`]
145    /// and has not been passed to [`Table::delete(table, ..)`]
146    /// is sufficient to demonstrate all of these properties.
147    unsafe fn get_row_ref_unchecked<'a>(&'a self, blob_store: &'a dyn BlobStore, ptr: RowPointer) -> RowRef<'a> {
148        // SAFETY: Forward caller requirements.
149        unsafe { RowRef::new(self, blob_store, ptr) }
150    }
151
152    fn try_page_and_offset(&self, ptr: RowPointer) -> Option<(&Page, PageOffset)> {
153        (ptr.page_index().idx() < self.pages.len()).then(|| (&self.pages[ptr.page_index()], ptr.page_offset()))
154    }
155
156    /// Returns the page and page offset that `ptr` points to.
157    fn page_and_offset(&self, ptr: RowPointer) -> (&Page, PageOffset) {
158        self.try_page_and_offset(ptr).unwrap()
159    }
160}
161
162static_assert_size!(Table, 264);
163
164impl MemoryUsage for Table {
165    fn heap_usage(&self) -> usize {
166        let Self {
167            inner,
168            pointer_map,
169            indexes,
170            // MEMUSE: intentionally ignoring schema
171            schema: _,
172            squashed_offset,
173            row_count,
174            blob_store_bytes,
175            is_scheduler,
176        } = self;
177        inner.heap_usage()
178            + pointer_map.heap_usage()
179            + indexes.heap_usage()
180            + squashed_offset.heap_usage()
181            + row_count.heap_usage()
182            + blob_store_bytes.heap_usage()
183            + is_scheduler.heap_usage()
184    }
185}
186
187impl MemoryUsage for TableInner {
188    fn heap_usage(&self) -> usize {
189        let Self {
190            row_layout,
191            static_layout,
192            visitor_prog,
193            pages,
194        } = self;
195        row_layout.heap_usage() + static_layout.heap_usage() + visitor_prog.heap_usage() + pages.heap_usage()
196    }
197}
198
199/// There was already a row with the same value.
200#[derive(Error, Debug, PartialEq, Eq)]
201#[error("Duplicate insertion of row {0:?} violates set semantics")]
202pub struct DuplicateError(pub RowPointer);
203
204/// Various error that can happen on table insertion.
205#[derive(Error, Debug, PartialEq, Eq, EnumAsInner)]
206pub enum InsertError {
207    /// There was already a row with the same value.
208    #[error(transparent)]
209    Duplicate(#[from] DuplicateError),
210
211    /// Couldn't write the row to the page manager.
212    #[error(transparent)]
213    Bflatn(#[from] super::bflatn_to::Error),
214
215    /// Some index related error occurred.
216    #[error(transparent)]
217    IndexError(#[from] UniqueConstraintViolation),
218}
219
220/// Errors that can occur while trying to read a value via bsatn.
221#[derive(Error, Debug)]
222pub enum ReadViaBsatnError {
223    #[error(transparent)]
224    BSatnError(#[from] BsatnError),
225
226    #[error(transparent)]
227    DecodeError(#[from] DecodeError),
228}
229
230// Public API:
231impl Table {
232    /// Creates a new empty table with the given `schema` and `squashed_offset`.
233    pub fn new(schema: Arc<TableSchema>, squashed_offset: SquashedOffset) -> Self {
234        let row_layout: RowTypeLayout = schema.get_row_type().clone().into();
235        let static_layout = StaticLayout::for_row_type(&row_layout).map(|sl| (sl, static_bsatn_validator(&row_layout)));
236        let visitor_prog = row_type_visitor(&row_layout);
237        // By default, we start off with an empty pointer map,
238        // which is removed when the first unique index is added.
239        let pm = Some(PointerMap::default());
240        Self::new_with_indexes_capacity(schema, row_layout, static_layout, visitor_prog, squashed_offset, pm)
241    }
242
243    /// Returns whether this is a scheduler table.
244    pub fn is_scheduler(&self) -> bool {
245        self.is_scheduler
246    }
247
248    /// Check if the `row` conflicts with any unique index on `self`,
249    /// and if there is a conflict, return `Err`.
250    ///
251    /// `is_deleted` is a predicate which, for a given row pointer,
252    /// returns true if and only if that row should be ignored.
253    /// While checking unique constraints against the committed state,
254    /// `MutTxId::insert` will ignore rows which are listed in the delete table.
255    ///
256    /// # Safety
257    ///
258    /// `row.row_layout() == self.row_layout()` must hold.
259    pub unsafe fn check_unique_constraints<'a, I: Iterator<Item = (&'a IndexId, &'a TableIndex)>>(
260        &'a self,
261        row: RowRef<'_>,
262        adapt: impl FnOnce(btree_map::Iter<'a, IndexId, TableIndex>) -> I,
263        mut is_deleted: impl FnMut(RowPointer) -> bool,
264    ) -> Result<(), UniqueConstraintViolation> {
265        for (&index_id, index) in adapt(self.indexes.iter()).filter(|(_, index)| index.is_unique()) {
266            // SAFETY: Caller promised that `row´ has the same layout as `self`.
267            // Thus, as `index.indexed_columns` is in-bounds of `self`'s layout,
268            // it's also in-bounds of `row`'s layout.
269            let value = unsafe { row.project_unchecked(&index.indexed_columns) };
270            if index.seek_point(&value).next().is_some_and(|ptr| !is_deleted(ptr)) {
271                return Err(self.build_error_unique(index, index_id, value));
272            }
273        }
274        Ok(())
275    }
276
277    /// Insert a `row` into this table, storing its large var-len members in the `blob_store`.
278    ///
279    /// On success, returns the hash, if any, of the newly-inserted row,
280    /// and a `RowRef` referring to the row.s
281    /// The hash is only computed if this table has a [`PointerMap`],
282    /// i.e., does not have any unique indexes.
283    /// If the table has unique indexes,
284    /// the returned `Option<RowHash>` will be `None`.
285    ///
286    /// When a row equal to `row` already exists in `self`,
287    /// returns `InsertError::Duplicate(existing_row_pointer)`,
288    /// where `existing_row_pointer` is a `RowPointer` which identifies the existing row.
289    /// In this case, the duplicate is not inserted,
290    /// but internal data structures may be altered in ways that affect performance and fragmentation.
291    ///
292    /// TODO(error-handling): describe errors from `write_row_to_pages` and return meaningful errors.
293    pub fn insert<'a>(
294        &'a mut self,
295        blob_store: &'a mut dyn BlobStore,
296        row: &ProductValue,
297    ) -> Result<(Option<RowHash>, RowRef<'a>), InsertError> {
298        // Optimistically insert the `row` before checking any constraints
299        // under the assumption that errors (unique constraint & set semantic violations) are rare.
300        let (row_ref, blob_bytes) = self.insert_physically_pv(blob_store, row)?;
301        let row_ptr = row_ref.pointer();
302
303        // Confirm the insertion, checking any constraints, removing the physical row on error.
304        // SAFETY: We just inserted `ptr`, so it must be present.
305        // Re. `CHECK_SAME_ROW = true`,
306        // where `insert` is called, we are not dealing with transactions,
307        // and we already know there cannot be a duplicate row error,
308        // but we check just in case it isn't.
309        let (hash, row_ptr) = unsafe { self.confirm_insertion::<true>(blob_store, row_ptr, blob_bytes) }?;
310        // SAFETY: Per post-condition of `confirm_insertion`, `row_ptr` refers to a valid row.
311        let row_ref = unsafe { self.inner.get_row_ref_unchecked(blob_store, row_ptr) };
312        Ok((hash, row_ref))
313    }
314
315    /// Physically inserts `row` into the page
316    /// without inserting it logically into the pointer map.
317    ///
318    /// This is useful when we need to insert a row temporarily to get back a `RowPointer`.
319    /// A call to this method should be followed by a call to [`delete_internal_skip_pointer_map`].
320    pub fn insert_physically_pv<'a>(
321        &'a mut self,
322        blob_store: &'a mut dyn BlobStore,
323        row: &ProductValue,
324    ) -> Result<(RowRef<'a>, BlobNumBytes), Error> {
325        // SAFETY: `self.pages` is known to be specialized for `self.row_layout`,
326        // as `self.pages` was constructed from `self.row_layout` in `Table::new`.
327        let (ptr, blob_bytes) = unsafe {
328            write_row_to_pages(
329                &mut self.inner.pages,
330                &self.inner.visitor_prog,
331                blob_store,
332                &self.inner.row_layout,
333                row,
334                self.squashed_offset,
335            )
336        }?;
337        // SAFETY: We just inserted `ptr`, so it must be present.
338        let row_ref = unsafe { self.inner.get_row_ref_unchecked(blob_store, ptr) };
339
340        Ok((row_ref, blob_bytes))
341    }
342
343    /// Physically insert a `row`, encoded in BSATN, into this table,
344    /// storing its large var-len members in the `blob_store`.
345    ///
346    /// On success, returns the hash of the newly-inserted row,
347    /// and a `RowRef` referring to the row.
348    ///
349    /// This does not check for set semantic or unique constraints.
350    ///
351    /// This is also useful when we need to insert a row temporarily to get back a `RowPointer`.
352    /// In this case, A call to this method should be followed by a call to [`delete_internal_skip_pointer_map`].
353    ///
354    /// When `row` is not valid BSATN at the table's row type,
355    /// an error is returned and there will be nothing for the caller to revert.
356    pub fn insert_physically_bsatn<'a>(
357        &'a mut self,
358        blob_store: &'a mut dyn BlobStore,
359        row: &[u8],
360    ) -> Result<(RowRef<'a>, BlobNumBytes), Error> {
361        // Got a static layout? => Use fast-path insertion.
362        let (ptr, blob_bytes) = if let Some((static_layout, static_validator)) = self.inner.static_layout.as_ref() {
363            // Before inserting, validate the row, ensuring type safety.
364            // SAFETY: The `static_validator` was derived from the same row layout as the static layout.
365            unsafe { validate_bsatn(static_validator, static_layout, row) }.map_err(Error::Decode)?;
366
367            let fixed_row_size = self.inner.row_layout.size();
368            let squashed_offset = self.squashed_offset;
369            let res = self
370                .inner
371                .pages
372                .with_page_to_insert_row(fixed_row_size, 0, |page| {
373                    // SAFETY: We've used the right `row_size` and we trust that others have too.
374                    // `RowTypeLayout` also ensures that we satisfy the minimum row size.
375                    let fixed_offset = unsafe { page.alloc_fixed_len(fixed_row_size) }.map_err(Error::PageError)?;
376                    let (mut fixed, _) = page.split_fixed_var_mut();
377                    let fixed_buf = fixed.get_row_mut(fixed_offset, fixed_row_size);
378                    // SAFETY:
379                    // - We've validated that `row` is of sufficient length.
380                    // - The `fixed_buf` is exactly the right `fixed_row_size`.
381                    unsafe { static_layout.deserialize_row_into(fixed_buf, row) };
382                    Ok(fixed_offset)
383                })
384                .map_err(Error::PagesError)?;
385            match res {
386                (page, Ok(offset)) => (RowPointer::new(false, page, offset, squashed_offset), 0.into()),
387                (_, Err(e)) => return Err(e),
388            }
389        } else {
390            // SAFETY: `self.pages` is known to be specialized for `self.row_layout`,
391            // as `self.pages` was constructed from `self.row_layout` in `Table::new`.
392            unsafe {
393                write_row_to_pages_bsatn(
394                    &mut self.inner.pages,
395                    &self.inner.visitor_prog,
396                    blob_store,
397                    &self.inner.row_layout,
398                    row,
399                    self.squashed_offset,
400                )
401            }?
402        };
403
404        // SAFETY: We just inserted `ptr`, so it must be present.
405        let row_ref = unsafe { self.inner.get_row_ref_unchecked(blob_store, ptr) };
406
407        Ok((row_ref, blob_bytes))
408    }
409
410    /// Returns all the columns with sequences that need generation for this `row`.
411    ///
412    /// # Safety
413    ///
414    /// `self.is_row_present(row)` must hold.
415    pub unsafe fn sequence_triggers_for<'a>(
416        &'a self,
417        blob_store: &'a dyn BlobStore,
418        row: RowPointer,
419    ) -> (ColList, SeqIdList) {
420        let sequences = &*self.get_schema().sequences;
421        let row_ty = self.row_layout().product();
422
423        // SAFETY: Caller promised that `self.is_row_present(row)` holds.
424        let row_ref = unsafe { self.get_row_ref_unchecked(blob_store, row) };
425
426        sequences
427            .iter()
428            // Find all the sequences that are triggered by this row.
429            .filter(|seq| {
430                // SAFETY: `seq.col_pos` is in-bounds of `row_ty.elements`
431                // as `row_ty` was derived from the same schema as `seq` is part of.
432                let elem_ty = unsafe { &row_ty.elements.get_unchecked(seq.col_pos.idx()) };
433                // SAFETY:
434                // - `elem_ty` appears as a column in the row type.
435                // - `AlgebraicValue` is compatible with all types.
436                let val = unsafe { AlgebraicValue::unchecked_read_column(row_ref, elem_ty) };
437                val.is_numeric_zero()
438            })
439            .map(|seq| (seq.col_pos, seq.sequence_id))
440            .unzip()
441    }
442
443    /// Writes `seq_val` to the column at `col_id` in the row identified by `ptr`.
444    ///
445    /// Truncates the `seq_val` to fit the type of the column.
446    ///
447    /// # Safety
448    ///
449    /// - `self.is_row_present(row)` must hold.
450    /// - `col_id` must be a valid column, with a primitive integer type, of the row type.
451    pub unsafe fn write_gen_val_to_col(&mut self, col_id: ColId, ptr: RowPointer, seq_val: i128) {
452        let row_ty = self.inner.row_layout.product();
453        // SAFETY: Caller promised that `col_id` was a valid column.
454        let elem_ty = unsafe { row_ty.elements.get_unchecked(col_id.idx()) };
455        let AlgebraicTypeLayout::Primitive(col_typ) = elem_ty.ty else {
456            // SAFETY: Columns with sequences must be primitive types.
457            unsafe { unreachable_unchecked() }
458        };
459
460        let fixed_row_size = self.inner.row_layout.size();
461        let fixed_buf = self.inner.pages[ptr.page_index()].get_fixed_row_data_mut(ptr.page_offset(), fixed_row_size);
462
463        fn write<const N: usize>(dst: &mut [u8], offset: u16, bytes: [u8; N]) {
464            let offset = offset as usize;
465            dst[offset..offset + N].copy_from_slice(&bytes);
466        }
467
468        match col_typ {
469            PrimitiveType::I8 => write(fixed_buf, elem_ty.offset, (seq_val as i8).to_le_bytes()),
470            PrimitiveType::U8 => write(fixed_buf, elem_ty.offset, (seq_val as u8).to_le_bytes()),
471            PrimitiveType::I16 => write(fixed_buf, elem_ty.offset, (seq_val as i16).to_le_bytes()),
472            PrimitiveType::U16 => write(fixed_buf, elem_ty.offset, (seq_val as u16).to_le_bytes()),
473            PrimitiveType::I32 => write(fixed_buf, elem_ty.offset, (seq_val as i32).to_le_bytes()),
474            PrimitiveType::U32 => write(fixed_buf, elem_ty.offset, (seq_val as u32).to_le_bytes()),
475            PrimitiveType::I64 => write(fixed_buf, elem_ty.offset, (seq_val as i64).to_le_bytes()),
476            PrimitiveType::U64 => write(fixed_buf, elem_ty.offset, (seq_val as u64).to_le_bytes()),
477            PrimitiveType::I128 => write(fixed_buf, elem_ty.offset, seq_val.to_le_bytes()),
478            PrimitiveType::U128 => write(fixed_buf, elem_ty.offset, (seq_val as u128).to_le_bytes()),
479            PrimitiveType::I256 => write(fixed_buf, elem_ty.offset, (i256::from(seq_val)).to_le_bytes()),
480            PrimitiveType::U256 => write(fixed_buf, elem_ty.offset, (u256::from(seq_val as u128)).to_le_bytes()),
481            // SAFETY: Columns with sequences must be integer types.
482            PrimitiveType::Bool | PrimitiveType::F32 | PrimitiveType::F64 => unsafe { unreachable_unchecked() },
483        }
484    }
485
486    /// Performs all the checks necessary after having fully decided on a rows contents.
487    ///
488    /// This includes inserting the row into any applicable indices and/or the pointer map.
489    ///
490    /// On `Ok(_)`, statistics of the table are also updated,
491    /// and the `ptr` still points to a valid row, and otherwise not.
492    ///
493    /// If `CHECK_SAME_ROW` holds, an identical row will be treated as a set-semantic duplicate.
494    /// Otherwise, it will be treated as a unique constraint violation.
495    /// However, `false` should only be passed if it's known beforehand that there is no identical row.
496    ///
497    /// # Safety
498    ///
499    /// `self.is_row_present(row)` must hold.
500    pub unsafe fn confirm_insertion<'a, const CHECK_SAME_ROW: bool>(
501        &'a mut self,
502        blob_store: &'a mut dyn BlobStore,
503        ptr: RowPointer,
504        blob_bytes: BlobNumBytes,
505    ) -> Result<(Option<RowHash>, RowPointer), InsertError> {
506        // SAFETY: Caller promised that `self.is_row_present(ptr)` holds.
507        let hash = unsafe { self.insert_into_pointer_map(blob_store, ptr) }?;
508        // SAFETY: Caller promised that `self.is_row_present(ptr)` holds.
509        unsafe { self.insert_into_indices::<CHECK_SAME_ROW>(blob_store, ptr) }?;
510
511        self.update_statistics_added_row(blob_bytes);
512        Ok((hash, ptr))
513    }
514
515    /// Confirms a row update, after first updating indices and checking constraints.
516    ///
517    /// On `Ok(_)`:
518    /// - the statistics of the table are also updated,
519    /// - the `ptr` still points to a valid row.
520    ///
521    /// Otherwise, on `Err(_)`:
522    /// - `ptr` will not point to a valid row,
523    /// - the statistics won't be updated.
524    ///
525    /// # Safety
526    ///
527    /// `self.is_row_present(new_row)` and `self.is_row_present(old_row)`  must hold.
528    pub unsafe fn confirm_update<'a>(
529        &'a mut self,
530        blob_store: &'a mut dyn BlobStore,
531        new_ptr: RowPointer,
532        old_ptr: RowPointer,
533        blob_bytes_added: BlobNumBytes,
534    ) -> Result<RowPointer, InsertError> {
535        // (1) Remove old row from indices.
536        // SAFETY: Caller promised that `self.is_row_present(old_ptr)` holds.
537        unsafe { self.delete_from_indices(blob_store, old_ptr) };
538
539        // Insert new row into indices.
540        // SAFETY: Caller promised that `self.is_row_present(ptr)` holds.
541        let res = unsafe { self.insert_into_indices::<true>(blob_store, new_ptr) };
542        if let Err(e) = res {
543            // Undo (1).
544            unsafe { self.insert_into_indices::<true>(blob_store, old_ptr) }
545                .expect("re-inserting the old row into indices should always work");
546            return Err(e);
547        }
548
549        // Remove the old row physically.
550        // SAFETY: The physical `old_ptr` still exists.
551        let blob_bytes_removed = unsafe { self.delete_internal_skip_pointer_map(blob_store, old_ptr) };
552        self.update_statistics_deleted_row(blob_bytes_removed);
553
554        // Update statistics.
555        self.update_statistics_added_row(blob_bytes_added);
556        Ok(new_ptr)
557    }
558
559    /// We've added a row, update the statistics to record this.
560    #[inline]
561    fn update_statistics_added_row(&mut self, blob_bytes: BlobNumBytes) {
562        self.row_count += 1;
563        self.blob_store_bytes += blob_bytes;
564    }
565
566    /// We've removed a row, update the statistics to record this.
567    #[inline]
568    fn update_statistics_deleted_row(&mut self, blob_bytes: BlobNumBytes) {
569        self.row_count -= 1;
570        self.blob_store_bytes -= blob_bytes;
571    }
572
573    /// Insert row identified by `new` into indices.
574    /// This also checks unique constraints.
575    /// Deletes the row if there were any violations.
576    ///
577    /// If `CHECK_SAME_ROW`, upon a unique constraint violation,
578    /// this will check if it's really a duplicate row.
579    /// Otherwise, the unique constraint violation is returned.
580    ///
581    /// SAFETY: `self.is_row_present(new)` must hold.
582    /// Post-condition: If this method returns `Ok(_)`, the row still exists.
583    unsafe fn insert_into_indices<'a, const CHECK_SAME_ROW: bool>(
584        &'a mut self,
585        blob_store: &'a mut dyn BlobStore,
586        new: RowPointer,
587    ) -> Result<(), InsertError> {
588        self.indexes
589            .iter_mut()
590            .try_for_each(|(index_id, index)| {
591                // SAFETY: We just inserted `ptr`, so it must be present.
592                let new = unsafe { self.inner.get_row_ref_unchecked(blob_store, new) };
593                // SAFETY: any index in this table was constructed with the same row type as this table.
594                let violation = unsafe { index.check_and_insert(new) };
595                violation.map_err(|old| (*index_id, old, new))
596            })
597            .map_err(|(index_id, old, new)| {
598                // Found unique constraint violation!
599                if CHECK_SAME_ROW
600                    // If the index was added in this tx,
601                    // `old` could be a committed row,
602                    // which we want to avoid here.
603                    // TODO(centril): not 100% correct, could still be a duplicate,
604                    // but this is rather pathological and should be fixed when we restructure.
605                    && old.squashed_offset().is_tx_state()
606                    // SAFETY:
607                    // - The row layouts are the same as it's the same table.
608                    // - We know `old` exists in `self` as we just found it in an index.
609                    // - Caller promised that `new` is valid for `self`.
610                    && unsafe { Self::eq_row_in_page(self, old, self, new.pointer()) }
611                {
612                    return (index_id, DuplicateError(old).into());
613                }
614
615                let index = self.indexes.get(&index_id).unwrap();
616                let value = new.project(&index.indexed_columns).unwrap();
617                let error = self.build_error_unique(index, index_id, value).into();
618                (index_id, error)
619            })
620            .map_err(|(index_id, error)| {
621                // Delete row from indices.
622                // Do this before the actual deletion, as `index.delete` needs a `RowRef`
623                // so it can extract the appropriate value.
624                // SAFETY: We just inserted `new`, so it must be present.
625                unsafe { self.delete_from_indices_until(blob_store, new, index_id) };
626
627                // Cleanup, undo the row insertion of `new`s.
628                // SAFETY: We just inserted `new`, so it must be present.
629                unsafe { self.delete_internal(blob_store, new) };
630
631                error
632            })
633    }
634
635    /// Finds the [`RowPointer`] to the row in `target_table` equal, if any,
636    /// to the row `needle_ptr` in `needle_table`,
637    /// by any unique index in `target_table`.
638    ///
639    /// # Safety
640    ///
641    /// - `target_table` and `needle_table` must have the same `row_layout`.
642    /// - `needle_table.is_row_present(needle_ptr)` must hold.
643    unsafe fn find_same_row_via_unique_index(
644        target_table: &Table,
645        needle_table: &Table,
646        needle_bs: &dyn BlobStore,
647        needle_ptr: RowPointer,
648    ) -> Option<RowPointer> {
649        // Use some index (the one with the lowest `IndexId` currently).
650        // TODO(centril): this isn't what we actually want.
651        // Rather, we'd prefer the index with the simplest type,
652        // but this is left as future work as we don't have to optimize this method now.
653        let target_index = target_table
654            .indexes
655            .values()
656            .find(|idx| idx.is_unique())
657            .expect("there should be at least one unique index");
658        // Project the needle row to the columns of the index, and then seek.
659        // As this is a unique index, there are 0-1 rows for this key.
660        let needle_row = unsafe { needle_table.get_row_ref_unchecked(needle_bs, needle_ptr) };
661        let key = needle_row
662            .project(&target_index.indexed_columns)
663            .expect("needle row should be valid");
664        target_index.seek_point(&key).next().filter(|&target_ptr| {
665            // SAFETY:
666            // - Caller promised that the row layouts were the same.
667            // - We know `target_ptr` exists, as it was in `target_index`, belonging to `target_table`.
668            // - Caller promised that `needle_ptr` is valid for `needle_table`.
669            unsafe { Self::eq_row_in_page(target_table, target_ptr, needle_table, needle_ptr) }
670        })
671    }
672
673    /// Insert the row identified by `ptr` into the table's [`PointerMap`],
674    /// if the table has one.
675    ///
676    /// This checks for set semantic violations.
677    /// If a set semantic conflict (i.e. duplicate row) is detected by the pointer map,
678    /// the row will be deleted and an error returned.
679    /// If the pointer map confirms that the row was unique, returns the `RowHash` of that row.
680    ///
681    /// If this table has no `PointerMap`, returns `Ok(None)`.
682    /// In that case, the row's uniqueness will be verified by [`Self::insert_into_indices`],
683    /// as this table has at least one unique index.
684    ///
685    /// SAFETY: `self.is_row_present(row)` must hold.
686    /// Post-condition: If this method returns `Ok(_)`, the row still exists.
687    unsafe fn insert_into_pointer_map<'a>(
688        &'a mut self,
689        blob_store: &'a mut dyn BlobStore,
690        ptr: RowPointer,
691    ) -> Result<Option<RowHash>, DuplicateError> {
692        if self.pointer_map.is_none() {
693            // No pointer map? Set semantic constraint is checked by a unique index instead.
694            return Ok(None);
695        };
696
697        // SAFETY:
698        // - `self` trivially has the same `row_layout` as `self`.
699        // - Caller promised that `self.is_row_present(row)` holds.
700        let (hash, existing_row) = unsafe { Self::find_same_row_via_pointer_map(self, self, blob_store, ptr, None) };
701
702        if let Some(existing_row) = existing_row {
703            // If an equal row was already present,
704            // roll back our optimistic insert to avoid violating set semantics.
705
706            // SAFETY: Caller promised that `ptr` is a valid row in `self`.
707            unsafe {
708                self.inner
709                    .pages
710                    .delete_row(&self.inner.visitor_prog, self.row_size(), ptr, blob_store)
711            };
712            return Err(DuplicateError(existing_row));
713        }
714
715        // If the optimistic insertion was correct,
716        // i.e. this is not a set-semantic duplicate,
717        // add it to the `pointer_map`.
718        self.pointer_map
719            .as_mut()
720            .expect("pointer map should exist, as it did previously")
721            .insert(hash, ptr);
722
723        Ok(Some(hash))
724    }
725
726    /// Returns the list of pointers to rows which hash to `row_hash`.
727    ///
728    /// If `self` does not have a [`PointerMap`], always returns the empty slice.
729    fn pointers_for(&self, row_hash: RowHash) -> &[RowPointer] {
730        self.pointer_map.as_ref().map_or(&[], |pm| pm.pointers_for(row_hash))
731    }
732
733    /// Using the [`PointerMap`],
734    /// searches `target_table` for a row equal to `needle_table[needle_ptr]`.
735    ///
736    /// Rows are compared for equality by [`eq_row_in_page`].
737    ///
738    /// Lazily computes the row hash if needed and returns it, or uses the one provided, if any.
739    ///
740    /// Used for detecting set-semantic duplicates when inserting
741    /// into tables without any unique constraints.
742    ///
743    /// Does nothing and always returns `None` if `target_table` does not have a `PointerMap`,
744    /// in which case the caller should instead use [`Self::find_same_row_via_unique_index`].
745    ///
746    /// Note that we don't need the blob store to compute equality,
747    /// as content-addressing means it's sufficient to compare the hashes of large blobs.
748    /// (If we see a collision in `BlobHash` we have bigger problems.)
749    ///
750    /// # Safety
751    ///
752    /// - `target_table` and `needle_table` must have the same `row_layout`.
753    /// - `needle_table.is_row_present(needle_ptr)`.
754    pub unsafe fn find_same_row_via_pointer_map(
755        target_table: &Table,
756        needle_table: &Table,
757        needle_bs: &dyn BlobStore,
758        needle_ptr: RowPointer,
759        row_hash: Option<RowHash>,
760    ) -> (RowHash, Option<RowPointer>) {
761        let row_hash = row_hash.unwrap_or_else(|| {
762            // SAFETY: Caller promised that `needle_table.is_row_present(needle_ptr)`.
763            let row_ref = unsafe { needle_table.get_row_ref_unchecked(needle_bs, needle_ptr) };
764            row_ref.row_hash()
765        });
766
767        // Scan all the frow pointers with `row_hash` in the `committed_table`.
768        let row_ptr = target_table.pointers_for(row_hash).iter().copied().find(|&target_ptr| {
769            // SAFETY:
770            // - Caller promised that the row layouts were the same.
771            // - We know `target_ptr` exists, as it was found in a pointer map.
772            // - Caller promised that `needle_ptr` is valid for `needle_table`.
773            unsafe { Self::eq_row_in_page(target_table, target_ptr, needle_table, needle_ptr) }
774        });
775
776        (row_hash, row_ptr)
777    }
778
779    /// Returns whether the row `target_ptr` in `target_table`
780    /// is exactly equal to the row `needle_ptr` in `needle_ptr`.
781    ///
782    /// # Safety
783    ///
784    /// - `target_table` and `needle_table` must have the same `row_layout`.
785    /// - `target_table.is_row_present(target_ptr)`.
786    /// - `needle_table.is_row_present(needle_ptr)`.
787    pub unsafe fn eq_row_in_page(
788        target_table: &Table,
789        target_ptr: RowPointer,
790        needle_table: &Table,
791        needle_ptr: RowPointer,
792    ) -> bool {
793        let (target_page, target_offset) = target_table.inner.page_and_offset(target_ptr);
794        let (needle_page, needle_offset) = needle_table.inner.page_and_offset(needle_ptr);
795
796        // SAFETY:
797        // - Caller promised that `target_ptr` is valid, so `target_page` and `target_offset` are both valid.
798        // - Caller promised that `needle_ptr` is valid, so `needle_page` and `needle_offset` are both valid.
799        // - Caller promised that the layouts of `target_table` and `needle_table` are the same,
800        //   so `target_table` applies to both.
801        //   Moreover `(x: Table).inner.static_layout` is always derived from `x.row_layout`.
802        unsafe {
803            eq_row_in_page(
804                target_page,
805                needle_page,
806                target_offset,
807                needle_offset,
808                &target_table.inner.row_layout,
809                target_table.static_layout(),
810            )
811        }
812    }
813
814    /// Searches `target_table` for a row equal to `needle_table[needle_ptr]`,
815    /// and returns the [`RowPointer`] to that row in `target_table`, if it exists.
816    ///
817    /// Searches using the [`PointerMap`] or a unique index, as appropriate for the table.
818    ///
819    /// Lazily computes the row hash if needed and returns it, or uses the one provided, if any.
820    ///
821    /// # Safety
822    ///
823    /// - `target_table` and `needle_table` must have the same `row_layout`.
824    /// - `needle_table.is_row_present(needle_ptr)` must hold.
825    pub unsafe fn find_same_row(
826        target_table: &Table,
827        needle_table: &Table,
828        needle_bs: &dyn BlobStore,
829        needle_ptr: RowPointer,
830        row_hash: Option<RowHash>,
831    ) -> (Option<RowHash>, Option<RowPointer>) {
832        if target_table.pointer_map.is_some() {
833            // SAFETY: Caller promised that `target_table` and `needle_table` have the same `row_layout`.
834            // SAFETY: Caller promised that `needle_table.is_row_present(needle_ptr)`.
835            let (row_hash, row_ptr) = unsafe {
836                Self::find_same_row_via_pointer_map(target_table, needle_table, needle_bs, needle_ptr, row_hash)
837            };
838            (Some(row_hash), row_ptr)
839        } else {
840            (
841                row_hash,
842                // SAFETY: Caller promised that `target_table` and `needle_table` have the same `row_layout`.
843                // SAFETY: Caller promised that `needle_table.is_row_present(needle_ptr)`.
844                unsafe { Self::find_same_row_via_unique_index(target_table, needle_table, needle_bs, needle_ptr) },
845            )
846        }
847    }
848
849    /// Returns a [`RowRef`] for `ptr` or `None` if the row isn't present.
850    pub fn get_row_ref<'a>(&'a self, blob_store: &'a dyn BlobStore, ptr: RowPointer) -> Option<RowRef<'a>> {
851        self.is_row_present(ptr)
852            // SAFETY: We only call `get_row_ref_unchecked` when `is_row_present` holds.
853            .then(|| unsafe { self.get_row_ref_unchecked(blob_store, ptr) })
854    }
855
856    /// Assumes `ptr` is a present row in `self` and returns a [`RowRef`] to it.
857    ///
858    /// # Safety
859    ///
860    /// The requirement is that `self.is_row_present(ptr)` must hold.
861    /// That is, `ptr` must refer to a row within `self`
862    /// which was previously inserted and has not been deleted since.
863    ///
864    /// This means:
865    /// - The `PageIndex` of `ptr` must be in-bounds for `self.pages`.
866    /// - The `PageOffset` of `ptr` must be properly aligned for the row type of `self`,
867    ///   and must refer to a valid, live row in that page.
868    /// - The `SquashedOffset` of `ptr` must match `self.squashed_offset`.
869    ///
870    /// Showing that `ptr` was the result of a call to [`Table::insert(table, ..)`]
871    /// and has not been passed to [`Table::delete(table, ..)`]
872    /// is sufficient to demonstrate all of these properties.
873    pub unsafe fn get_row_ref_unchecked<'a>(&'a self, blob_store: &'a dyn BlobStore, ptr: RowPointer) -> RowRef<'a> {
874        debug_assert!(self.is_row_present(ptr));
875        // SAFETY: Caller promised that ^-- holds.
876        unsafe { RowRef::new(&self.inner, blob_store, ptr) }
877    }
878
879    /// Deletes a row in the page manager
880    /// without deleting it logically in the pointer map.
881    ///
882    /// # Safety
883    ///
884    /// `ptr` must point to a valid, live row in this table.
885    pub unsafe fn delete_internal_skip_pointer_map(
886        &mut self,
887        blob_store: &mut dyn BlobStore,
888        ptr: RowPointer,
889    ) -> BlobNumBytes {
890        // Delete the physical row.
891        //
892        // SAFETY:
893        // - `ptr` points to a valid row in this table, per our invariants.
894        // - `self.row_size` known to be consistent with `self.pages`,
895        //    as the two are tied together in `Table::new`.
896        unsafe {
897            self.inner
898                .pages
899                .delete_row(&self.inner.visitor_prog, self.row_size(), ptr, blob_store)
900        }
901    }
902
903    /// Deletes the row identified by `ptr` from the table.
904    ///
905    /// Returns the number of blob bytes added. This method does not update statistics by itself.
906    ///
907    /// NOTE: This method skips updating indexes.
908    /// Use `delete_unchecked` or `delete` to delete a row with index updating.
909    ///
910    /// SAFETY: `self.is_row_present(row)` must hold.
911    unsafe fn delete_internal(&mut self, blob_store: &mut dyn BlobStore, ptr: RowPointer) -> BlobNumBytes {
912        // Remove the set semantic association.
913        if let Some(pointer_map) = &mut self.pointer_map {
914            // SAFETY: `self.is_row_present(row)` holds.
915            let row = unsafe { RowRef::new(&self.inner, blob_store, ptr) };
916
917            let _remove_result = pointer_map.remove(row.row_hash(), ptr);
918            debug_assert!(_remove_result);
919        }
920
921        // Delete the physical row.
922        // SAFETY: `ptr` points to a valid row in this table as `self.is_row_present(row)` holds.
923        unsafe { self.delete_internal_skip_pointer_map(blob_store, ptr) }
924    }
925
926    /// Deletes the row identified by `ptr` from the table.
927    ///
928    /// Returns the number of blob bytes deleted. This method does not update statistics by itself.
929    ///
930    /// SAFETY: `self.is_row_present(row)` must hold.
931    unsafe fn delete_unchecked(&mut self, blob_store: &mut dyn BlobStore, ptr: RowPointer) -> BlobNumBytes {
932        // Delete row from indices.
933        // Do this before the actual deletion, as `index.delete` needs a `RowRef`
934        // so it can extract the appropriate value.
935        // SAFETY: Caller promised that `self.is_row_present(row)` holds.
936        unsafe { self.delete_from_indices(blob_store, ptr) };
937
938        // SAFETY: Caller promised that `self.is_row_present(row)` holds.
939        unsafe { self.delete_internal(blob_store, ptr) }
940    }
941
942    /// Delete `row_ref` from all the indices of this table until `index_id` is reached.
943    /// The range is exclusive of `index_id`.
944    ///
945    /// SAFETY: `self.is_row_present(row)` must hold.
946    unsafe fn delete_from_indices_until(&mut self, blob_store: &dyn BlobStore, ptr: RowPointer, index_id: IndexId) {
947        // SAFETY: Caller promised that `self.is_row_present(row)` holds.
948        let row_ref = unsafe { self.inner.get_row_ref_unchecked(blob_store, ptr) };
949
950        for (_, index) in self.indexes.range_mut(..index_id) {
951            index.delete(row_ref).unwrap();
952        }
953    }
954
955    /// Delete `row_ref` from all the indices of this table.
956    ///
957    /// SAFETY: `self.is_row_present(row)` must hold.
958    unsafe fn delete_from_indices(&mut self, blob_store: &dyn BlobStore, ptr: RowPointer) {
959        // SAFETY: Caller promised that `self.is_row_present(row)` holds.
960        let row_ref = unsafe { self.inner.get_row_ref_unchecked(blob_store, ptr) };
961
962        for index in self.indexes.values_mut() {
963            index.delete(row_ref).unwrap();
964        }
965    }
966
967    /// Deletes the row identified by `ptr` from the table.
968    ///
969    /// The function `before` is run on the to-be-deleted row,
970    /// if it is present, before deleting.
971    /// This enables callers to extract the deleted row.
972    /// E.g. applying deletes when squashing/merging a transaction into the committed state
973    /// passes `|row| row.to_product_value()` as `before`
974    /// so that the resulting `ProductValue`s can be passed to the subscription evaluator.
975    pub fn delete<'a, R>(
976        &'a mut self,
977        blob_store: &'a mut dyn BlobStore,
978        ptr: RowPointer,
979        before: impl for<'b> FnOnce(RowRef<'b>) -> R,
980    ) -> Option<R> {
981        if !self.is_row_present(ptr) {
982            return None;
983        };
984
985        // SAFETY: We only call `get_row_ref_unchecked` when `is_row_present` holds.
986        let row_ref = unsafe { self.inner.get_row_ref_unchecked(blob_store, ptr) };
987
988        let ret = before(row_ref);
989
990        // SAFETY: We've checked above that `self.is_row_present(ptr)`.
991        let blob_bytes_deleted = unsafe { self.delete_unchecked(blob_store, ptr) };
992        self.update_statistics_deleted_row(blob_bytes_deleted);
993
994        Some(ret)
995    }
996
997    /// If a row exists in `self` which matches `row`
998    /// by [`Table::find_same_row`],
999    /// delete that row.
1000    ///
1001    /// If a matching row was found, returns the pointer to that row.
1002    /// The returned pointer is now invalid, as the row to which it referred has been deleted.
1003    ///
1004    /// This operation works by temporarily inserting the `row` into `self`,
1005    /// checking `find_same_row` on the newly-inserted row,
1006    /// deleting the matching row if it exists,
1007    /// then deleting the temporary insertion.
1008    pub fn delete_equal_row(
1009        &mut self,
1010        blob_store: &mut dyn BlobStore,
1011        row: &ProductValue,
1012    ) -> Result<Option<RowPointer>, Error> {
1013        // Insert `row` temporarily so `temp_ptr` and `hash` can be used to find the row.
1014        // This must avoid consulting and inserting to the pointer map,
1015        // as the row is already present, set-semantically.
1016        let (temp_row, _) = self.insert_physically_pv(blob_store, row)?;
1017        let temp_ptr = temp_row.pointer();
1018
1019        // Find the row equal to the passed-in `row`.
1020        // This uses one of two approaches.
1021        // Either there is a pointer map, so we use that,
1022        // or, here is at least one unique index, so we use one of them.
1023        //
1024        // SAFETY:
1025        // - `self` trivially has the same `row_layout` as `self`.
1026        // - We just inserted `temp_ptr`, so it's valid.
1027        let (_, existing_row_ptr) = unsafe { Self::find_same_row(self, self, blob_store, temp_ptr, None) };
1028
1029        // If an equal row was present, delete it.
1030        if let Some(existing_row_ptr) = existing_row_ptr {
1031            let blob_bytes_deleted = unsafe {
1032                // SAFETY: `find_same_row` ensures that the pointer is valid.
1033                self.delete_unchecked(blob_store, existing_row_ptr)
1034            };
1035            self.update_statistics_deleted_row(blob_bytes_deleted);
1036        }
1037
1038        // Remove the temporary row we inserted in the beginning.
1039        // Avoid the pointer map, since we don't want to delete it twice.
1040        // SAFETY: `ptr` is valid as we just inserted it.
1041        unsafe {
1042            self.delete_internal_skip_pointer_map(blob_store, temp_ptr);
1043        }
1044
1045        Ok(existing_row_ptr)
1046    }
1047
1048    /// Returns the row type for rows in this table.
1049    pub fn get_row_type(&self) -> &ProductType {
1050        self.get_schema().get_row_type()
1051    }
1052
1053    /// Returns the schema for this table.
1054    pub fn get_schema(&self) -> &Arc<TableSchema> {
1055        &self.schema
1056    }
1057
1058    /// Runs a mutation on the [`TableSchema`] of this table.
1059    ///
1060    /// This uses a clone-on-write mechanism.
1061    /// If none but `self` refers to the schema, then the mutation will be in-place.
1062    /// Otherwise, the schema must be cloned, mutated,
1063    /// and then the cloned version is written back to the table.
1064    pub fn with_mut_schema(&mut self, with: impl FnOnce(&mut TableSchema)) {
1065        with(Arc::make_mut(&mut self.schema));
1066    }
1067
1068    /// Returns a new [`TableIndex`] for `table`.
1069    pub fn new_index(&self, algo: &IndexAlgorithm, is_unique: bool) -> Result<TableIndex, InvalidFieldError> {
1070        TableIndex::new(self.get_schema().get_row_type(), algo, is_unique)
1071    }
1072
1073    /// Inserts a new `index` into the table.
1074    ///
1075    /// The index will be populated using the rows of the table.
1076    ///
1077    /// # Panics
1078    ///
1079    /// Panics if any row would violate `index`'s unique constraint, if it has one.
1080    ///
1081    /// # Safety
1082    ///
1083    /// Caller must promise that `index` was constructed with the same row type/layout as this table.
1084    pub unsafe fn insert_index(&mut self, blob_store: &dyn BlobStore, index_id: IndexId, mut index: TableIndex) {
1085        let rows = self.scan_rows(blob_store);
1086        // SAFETY: Caller promised that table's row type/layout
1087        // matches that which `index` was constructed with.
1088        // It follows that this applies to any `rows`, as required.
1089        let violation = unsafe { index.build_from_rows(rows) };
1090        violation.unwrap_or_else(|ptr| {
1091            panic!("adding `index` should cause no unique constraint violations, but {ptr:?} would")
1092        });
1093        // SAFETY: Forward caller requirement.
1094        unsafe { self.add_index(index_id, index) };
1095    }
1096
1097    /// Adds an index to the table without populating.
1098    ///
1099    /// # Safety
1100    ///
1101    /// Caller must promise that `index` was constructed with the same row type/layout as this table.
1102    pub unsafe fn add_index(&mut self, index_id: IndexId, index: TableIndex) {
1103        let is_unique = index.is_unique();
1104        self.indexes.insert(index_id, index);
1105
1106        // Remove the pointer map, if any.
1107        if is_unique {
1108            self.pointer_map = None;
1109        }
1110    }
1111
1112    /// Removes an index from the table.
1113    ///
1114    /// Returns whether an index existed with `index_id`.
1115    pub fn delete_index(&mut self, blob_store: &dyn BlobStore, index_id: IndexId) -> bool {
1116        let index = self.indexes.remove(&index_id);
1117        let ret = index.is_some();
1118
1119        // If we removed the last unique index, add a pointer map.
1120        if index.is_some_and(|i| i.is_unique()) && !self.indexes.values().any(|idx| idx.is_unique()) {
1121            self.rebuild_pointer_map(blob_store);
1122        }
1123
1124        // Remove index from schema.
1125        //
1126        // This likely will do a clone-write as over time?
1127        // The schema might have found other referents.
1128        self.with_mut_schema(|s| s.indexes.retain(|x| x.index_id != index_id));
1129
1130        ret
1131    }
1132
1133    /// Returns an iterator over all the rows of `self`, yielded as [`RefRef`]s.
1134    pub fn scan_rows<'a>(&'a self, blob_store: &'a dyn BlobStore) -> TableScanIter<'a> {
1135        TableScanIter {
1136            current_page: None, // Will be filled by the iterator.
1137            current_page_idx: PageIndex(0),
1138            table: self,
1139            blob_store,
1140        }
1141    }
1142
1143    /// Returns this table combined with the index for [`IndexId`], if any.
1144    pub fn get_index_by_id_with_table<'a>(
1145        &'a self,
1146        blob_store: &'a dyn BlobStore,
1147        index_id: IndexId,
1148    ) -> Option<TableAndIndex<'a>> {
1149        Some(TableAndIndex {
1150            table: self,
1151            blob_store,
1152            index: self.get_index_by_id(index_id)?,
1153        })
1154    }
1155
1156    /// Returns the [`TableIndex`] for this [`IndexId`].
1157    pub fn get_index_by_id(&self, index_id: IndexId) -> Option<&TableIndex> {
1158        self.indexes.get(&index_id)
1159    }
1160
1161    /// Returns this table combined with the first index with `cols`, if any.
1162    pub fn get_index_by_cols_with_table<'a>(
1163        &'a self,
1164        blob_store: &'a dyn BlobStore,
1165        cols: &ColList,
1166    ) -> Option<TableAndIndex<'a>> {
1167        let (_, index) = self.get_index_by_cols(cols)?;
1168        Some(TableAndIndex {
1169            table: self,
1170            blob_store,
1171            index,
1172        })
1173    }
1174
1175    /// Returns the first [`TableIndex`] with the given [`ColList`].
1176    pub fn get_index_by_cols(&self, cols: &ColList) -> Option<(IndexId, &TableIndex)> {
1177        self.indexes
1178            .iter()
1179            .find(|(_, index)| &index.indexed_columns == cols)
1180            .map(|(id, idx)| (*id, idx))
1181    }
1182
1183    /// Clones the structure of this table into a new one with
1184    /// the same schema, visitor program, and indices.
1185    /// The new table will be completely empty
1186    /// and will use the given `squashed_offset` instead of that of `self`.
1187    pub fn clone_structure(&self, squashed_offset: SquashedOffset) -> Self {
1188        let schema = self.schema.clone();
1189        let layout = self.row_layout().clone();
1190        let sbl = self.inner.static_layout.clone();
1191        let visitor = self.inner.visitor_prog.clone();
1192        // If we had a pointer map, we'll have one in the cloned one as well, but empty.
1193        let pm = self.pointer_map.as_ref().map(|_| PointerMap::default());
1194        let mut new = Table::new_with_indexes_capacity(schema, layout, sbl, visitor, squashed_offset, pm);
1195
1196        // Clone the index structure. The table is empty, so no need to `build_from_rows`.
1197        for (&index_id, index) in self.indexes.iter() {
1198            new.indexes.insert(index_id, index.clone_structure());
1199        }
1200        new
1201    }
1202
1203    /// Returns the number of bytes occupied by the pages and the blob store.
1204    /// Note that result can be more than the actual physical size occupied by the table
1205    /// because the blob store implementation can do internal optimizations.
1206    /// For more details, refer to the documentation of `self.blob_store_bytes`.
1207    pub fn bytes_occupied_overestimate(&self) -> usize {
1208        (self.num_pages() * PAGE_DATA_SIZE) + (self.blob_store_bytes.0)
1209    }
1210
1211    /// Reset the internal storage of `self` to be `pages`.
1212    ///
1213    /// This recomputes the pointer map based on the `pages`,
1214    /// but does not recompute indexes.
1215    ///
1216    /// Used when restoring from a snapshot.
1217    ///
1218    /// # Safety
1219    ///
1220    /// The schema of rows stored in the `pages` must exactly match `self.schema` and `self.inner.row_layout`.
1221    pub unsafe fn set_pages(&mut self, pages: Vec<Box<Page>>, blob_store: &dyn BlobStore) {
1222        self.inner.pages.set_contents(pages, self.inner.row_layout.size());
1223
1224        // Recompute table metadata based on the new pages.
1225        // Compute the row count first, in case later computations want to use it as a capacity to pre-allocate.
1226        self.compute_row_count(blob_store);
1227        self.rebuild_pointer_map(blob_store);
1228    }
1229
1230    /// Returns the number of rows resident in this table.
1231    ///
1232    /// This scales in runtime with the number of pages in the table.
1233    pub fn num_rows(&self) -> u64 {
1234        self.pages().iter().map(|page| page.num_rows() as u64).sum()
1235    }
1236
1237    #[cfg(test)]
1238    fn reconstruct_num_rows(&self) -> u64 {
1239        self.pages().iter().map(|page| page.reconstruct_num_rows() as u64).sum()
1240    }
1241
1242    /// Returns the number of bytes used by rows resident in this table.
1243    ///
1244    /// This includes data bytes, padding bytes and some overhead bytes,
1245    /// as described in the docs for [`Page::bytes_used_by_rows`],
1246    /// but *does not* include:
1247    ///
1248    /// - Unallocated space within pages.
1249    /// - Per-page overhead (e.g. page headers).
1250    /// - Table overhead (e.g. the [`RowTypeLayout`], [`PointerMap`], [`Schema`] &c).
1251    /// - Indexes.
1252    /// - Large blobs in the [`BlobStore`].
1253    ///
1254    /// Of these, the caller should inspect the blob store in order to account for memory usage by large blobs,
1255    /// and call [`Self::bytes_used_by_index_keys`] to account for indexes,
1256    /// but we intend to eat all the other overheads when billing.
1257    pub fn bytes_used_by_rows(&self) -> u64 {
1258        self.pages()
1259            .iter()
1260            .map(|page| page.bytes_used_by_rows(self.inner.row_layout.size()) as u64)
1261            .sum()
1262    }
1263
1264    #[cfg(test)]
1265    fn reconstruct_bytes_used_by_rows(&self) -> u64 {
1266        self.pages()
1267            .iter()
1268            .map(|page| unsafe {
1269                // Safety: `page` is in `self`, and was constructed using `self.innser.row_layout` and `self.inner.visitor_prog`,
1270                // so the three are mutually consistent.
1271                page.reconstruct_bytes_used_by_rows(self.inner.row_layout.size(), &self.inner.visitor_prog)
1272            } as u64)
1273            .sum()
1274    }
1275
1276    /// Returns the number of rows (or [`RowPointer`]s, more accurately)
1277    /// stored in indexes by this table.
1278    ///
1279    /// This method runs in constant time.
1280    pub fn num_rows_in_indexes(&self) -> u64 {
1281        // Assume that each index contains all rows in the table.
1282        self.num_rows() * self.indexes.len() as u64
1283    }
1284
1285    /// Returns the number of bytes used by keys stored in indexes by this table.
1286    ///
1287    /// This method scales in runtime with the number of indexes in the table,
1288    /// but not with the number of pages or rows.
1289    ///
1290    /// Key size is measured using a metric called "key size" or "data size,"
1291    /// which is intended to capture the number of live user-supplied bytes,
1292    /// not including representational overhead.
1293    /// This is distinct from the BFLATN size measured by [`Self::bytes_used_by_rows`].
1294    /// See the trait [`crate::btree_index::KeySize`] for specifics on the metric measured.
1295    pub fn bytes_used_by_index_keys(&self) -> u64 {
1296        self.indexes.values().map(|idx| idx.num_key_bytes()).sum()
1297    }
1298}
1299
1300/// A reference to a single row within a table.
1301///
1302/// # Safety
1303///
1304/// Having a `r: RowRef` is a proof that [`r.pointer()`](RowRef::pointer) refers to a valid row.
1305/// This makes constructing a `RowRef`, i.e., `RowRef::new`, an `unsafe` operation.
1306#[derive(Copy, Clone)]
1307pub struct RowRef<'a> {
1308    /// The table that has the row at `self.pointer`.
1309    table: &'a TableInner,
1310    /// The blob store used in case there are blob hashes to resolve.
1311    blob_store: &'a dyn BlobStore,
1312    /// The pointer to the row in `self.table`.
1313    pointer: RowPointer,
1314}
1315
1316impl fmt::Debug for RowRef<'_> {
1317    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1318        fmt.debug_struct("RowRef")
1319            .field("pointer", &self.pointer)
1320            .field("value", &self.to_product_value())
1321            .finish_non_exhaustive()
1322    }
1323}
1324
1325impl<'a> RowRef<'a> {
1326    /// Construct a `RowRef` to the row at `pointer` within `table`.
1327    ///
1328    /// # Safety
1329    ///
1330    /// `pointer` must refer to a row within `table`
1331    /// which was previously inserted and has not been deleted since.
1332    ///
1333    /// This means:
1334    /// - The `PageIndex` of `pointer` must be in-bounds for `table.pages`.
1335    /// - The `PageOffset` of `pointer` must be properly aligned for the row type of `table`,
1336    ///   and must refer to a valid, live row in that page.
1337    /// - The `SquashedOffset` of `pointer` must match `table.squashed_offset`.
1338    ///
1339    /// Showing that `pointer` was the result of a call to `table.insert`
1340    /// and has not been passed to `table.delete`
1341    /// is sufficient to demonstrate all of these properties.
1342    unsafe fn new(table: &'a TableInner, blob_store: &'a dyn BlobStore, pointer: RowPointer) -> Self {
1343        Self {
1344            table,
1345            blob_store,
1346            pointer,
1347        }
1348    }
1349
1350    /// Extract a `ProductValue` from the table.
1351    ///
1352    /// This is a potentially expensive operation,
1353    /// as it must walk the table's `ProductTypeLayout`
1354    /// and heap-allocate various substructures of the `ProductValue`.
1355    pub fn to_product_value(&self) -> ProductValue {
1356        let res = self
1357            .serialize(ValueSerializer)
1358            .unwrap_or_else(|x| match x {})
1359            .into_product();
1360        // SAFETY: the top layer of a row when serialized is always a product.
1361        unsafe { res.unwrap_unchecked() }
1362    }
1363
1364    /// Check that the `idx`th column of the row type stored by `self` is compatible with `T`,
1365    /// and read the value of that column from `self`.
1366    #[inline]
1367    pub fn read_col<T: ReadColumn>(self, col: impl Into<ColId>) -> Result<T, TypeError> {
1368        T::read_column(self, col.into().idx())
1369    }
1370
1371    /// Construct a projection of the row at `self` by extracting the `cols`.
1372    ///
1373    /// If `cols` contains zero or more than one column, the values of the projected columns are wrapped in a [`ProductValue`].
1374    /// If `cols` is a single column, the value of that column is returned without wrapping in a `ProductValue`.
1375    ///
1376    /// # Safety
1377    ///
1378    /// - `cols` must not specify any column which is out-of-bounds for the row `self´.
1379    pub unsafe fn project_unchecked(self, cols: &ColList) -> AlgebraicValue {
1380        let col_layouts = &self.row_layout().product().elements;
1381
1382        if let Some(head) = cols.as_singleton() {
1383            let head = head.idx();
1384            // SAFETY: caller promised that `head` is in-bounds of `col_layouts`.
1385            let col_layout = unsafe { col_layouts.get_unchecked(head) };
1386            // SAFETY:
1387            // - `col_layout` was just derived from the row layout.
1388            // - `AlgebraicValue` is compatible with any  `col_layout`.
1389            // - `self` is a valid row and offsetting to `col_layout` is valid.
1390            return unsafe { AlgebraicValue::unchecked_read_column(self, col_layout) };
1391        }
1392        let mut elements = Vec::with_capacity(cols.len() as usize);
1393        for col in cols.iter() {
1394            let col = col.idx();
1395            // SAFETY: caller promised that any `col` is in-bounds of `col_layouts`.
1396            let col_layout = unsafe { col_layouts.get_unchecked(col) };
1397            // SAFETY:
1398            // - `col_layout` was just derived from the row layout.
1399            // - `AlgebraicValue` is compatible with any  `col_layout`.
1400            // - `self` is a valid row and offsetting to `col_layout` is valid.
1401            elements.push(unsafe { AlgebraicValue::unchecked_read_column(self, col_layout) });
1402        }
1403        AlgebraicValue::product(elements)
1404    }
1405
1406    /// Construct a projection of the row at `self` by extracting the `cols`.
1407    ///
1408    /// Returns an error if `cols` specifies an index which is out-of-bounds for the row at `self`.
1409    ///
1410    /// If `cols` contains zero or more than one column, the values of the projected columns are wrapped in a [`ProductValue`].
1411    /// If `cols` is a single column, the value of that column is returned without wrapping in a `ProductValue`.
1412    pub fn project(self, cols: &ColList) -> Result<AlgebraicValue, InvalidFieldError> {
1413        if let Some(head) = cols.as_singleton() {
1414            return self.read_col(head).map_err(|_| head.into());
1415        }
1416        let mut elements = Vec::with_capacity(cols.len() as usize);
1417        for col in cols.iter() {
1418            let col_val = self.read_col(col).map_err(|err| match err {
1419                TypeError::WrongType { .. } => {
1420                    unreachable!("AlgebraicValue::read_column never returns a `TypeError::WrongType`")
1421                }
1422                TypeError::IndexOutOfBounds { .. } => col,
1423            })?;
1424            elements.push(col_val);
1425        }
1426        Ok(AlgebraicValue::product(elements))
1427    }
1428
1429    /// Returns the raw row pointer for this row reference.
1430    pub fn pointer(&self) -> RowPointer {
1431        self.pointer
1432    }
1433
1434    /// Returns the blob store that any [`crate::blob_store::BlobHash`]es within the row refer to.
1435    pub(crate) fn blob_store(&self) -> &dyn BlobStore {
1436        self.blob_store
1437    }
1438
1439    /// Return the layout of the row.
1440    ///
1441    /// All rows within the same table will have the same layout.
1442    pub fn row_layout(&self) -> &RowTypeLayout {
1443        &self.table.row_layout
1444    }
1445
1446    /// Returns the page the row is in and the offset of the row within that page.
1447    pub fn page_and_offset(&self) -> (&Page, PageOffset) {
1448        self.table.page_and_offset(self.pointer())
1449    }
1450
1451    /// Returns the bytes for the fixed portion of this row.
1452    pub(crate) fn get_row_data(&self) -> &Bytes {
1453        let (page, offset) = self.page_and_offset();
1454        page.get_row_data(offset, self.table.row_layout.size())
1455    }
1456
1457    /// Returns the row hash for `ptr`.
1458    pub fn row_hash(&self) -> RowHash {
1459        RowHash(RowHash::hasher_builder().hash_one(self))
1460    }
1461
1462    /// Returns the static layout for this row reference, if any.
1463    pub fn static_layout(&self) -> Option<&StaticLayout> {
1464        self.table.static_layout.as_ref().map(|(s, _)| s)
1465    }
1466
1467    /// Encode the row referred to by `self` into a `Vec<u8>` using BSATN and then deserialize it.
1468    pub fn read_via_bsatn<T>(&self, scratch: &mut Vec<u8>) -> Result<T, ReadViaBsatnError>
1469    where
1470        T: DeserializeOwned,
1471    {
1472        self.to_bsatn_extend(scratch)?;
1473        Ok(bsatn::from_slice::<T>(scratch)?)
1474    }
1475
1476    /// Return the number of bytes in the blob store to which this object holds a reference.
1477    ///
1478    /// Used to compute the table's `blob_store_bytes` when reconstructing a snapshot.
1479    ///
1480    /// Even within a single row, this is a conservative overestimate,
1481    /// as a row may contain multiple references to the same large blob.
1482    /// This seems unlikely to occur in practice.
1483    fn blob_store_bytes(&self) -> usize {
1484        let row_data = self.get_row_data();
1485        let (page, _) = self.page_and_offset();
1486        // SAFETY:
1487        // - Existence of a `RowRef` treated as proof
1488        //   of the row's validity and type information's correctness.
1489        unsafe { self.table.visitor_prog.visit_var_len(row_data) }
1490            .filter(|vlr| vlr.is_large_blob())
1491            .map(|vlr| {
1492                // SAFETY:
1493                // - Because `vlr.is_large_blob`, it points to exactly one granule.
1494                let granule = unsafe { page.iter_var_len_object(vlr.first_granule) }.next().unwrap();
1495                let blob_hash = granule.blob_hash();
1496                let blob = self.blob_store.retrieve_blob(&blob_hash).unwrap();
1497
1498                blob.len()
1499            })
1500            .sum()
1501    }
1502}
1503
1504impl Serialize for RowRef<'_> {
1505    fn serialize<S: Serializer>(&self, ser: S) -> Result<S::Ok, S::Error> {
1506        let table = self.table;
1507        let (page, offset) = table.page_and_offset(self.pointer);
1508        // SAFETY: `ptr` points to a valid row in this table per above check.
1509        unsafe { serialize_row_from_page(ser, page, self.blob_store, offset, &table.row_layout) }
1510    }
1511}
1512
1513impl ToBsatn for RowRef<'_> {
1514    /// BSATN-encode the row referred to by `self` into a freshly-allocated `Vec<u8>`.
1515    ///
1516    /// This method will use a [`StaticLayout`] if one is available,
1517    /// and may therefore be faster than calling [`bsatn::to_vec`].
1518    fn to_bsatn_vec(&self) -> Result<Vec<u8>, BsatnError> {
1519        if let Some(static_layout) = self.static_layout() {
1520            // Use fast path, by first fetching the row data and then using the static layout.
1521            let row = self.get_row_data();
1522            // SAFETY:
1523            // - Existence of a `RowRef` treated as proof
1524            //   of row's validity and type information's correctness.
1525            Ok(unsafe { static_layout.serialize_row_into_vec(row) })
1526        } else {
1527            bsatn::to_vec(self)
1528        }
1529    }
1530
1531    /// BSATN-encode the row referred to by `self` into `buf`,
1532    /// pushing `self`'s bytes onto the end of `buf`, similar to [`Vec::extend`].
1533    ///
1534    /// This method will use a [`StaticLayout`] if one is available,
1535    /// and may therefore be faster than calling [`bsatn::to_writer`].
1536    fn to_bsatn_extend(&self, buf: &mut Vec<u8>) -> Result<(), BsatnError> {
1537        if let Some(static_layout) = self.static_layout() {
1538            // Use fast path, by first fetching the row data and then using the static layout.
1539            let row = self.get_row_data();
1540            // SAFETY:
1541            // - Existence of a `RowRef` treated as proof
1542            //   of row's validity and type information's correctness.
1543            unsafe {
1544                static_layout.serialize_row_extend(buf, row);
1545            }
1546            Ok(())
1547        } else {
1548            // Use the slower, but more general, `bsatn_from` serializer to write the row.
1549            bsatn::to_writer(buf, self)
1550        }
1551    }
1552
1553    fn static_bsatn_size(&self) -> Option<u16> {
1554        self.static_layout().map(|sl| sl.bsatn_length)
1555    }
1556}
1557
1558impl Eq for RowRef<'_> {}
1559impl PartialEq for RowRef<'_> {
1560    fn eq(&self, other: &Self) -> bool {
1561        // Ensure that the layouts are the same
1562        // so that we can use `eq_row_in_page`.
1563        // To do this, we first try address equality on the layouts.
1564        // This should succeed when the rows originate from the same table.
1565        // Otherwise, actually compare the layouts, which is expensive, but unlikely to happen.
1566        let a_ty = self.row_layout();
1567        let b_ty = other.row_layout();
1568        if !(ptr::eq(a_ty, b_ty) || a_ty == b_ty) {
1569            return false;
1570        }
1571        let (page_a, offset_a) = self.page_and_offset();
1572        let (page_b, offset_b) = other.page_and_offset();
1573        let static_layout = self.static_layout();
1574        // SAFETY: `offset_a/b` are valid rows in `page_a/b` typed at `a_ty`
1575        // and `static_bsatn_layout` is derived from `a_ty`.
1576        unsafe { eq_row_in_page(page_a, page_b, offset_a, offset_b, a_ty, static_layout) }
1577    }
1578}
1579
1580impl PartialEq<ProductValue> for RowRef<'_> {
1581    fn eq(&self, rhs: &ProductValue) -> bool {
1582        let ty = self.row_layout();
1583        let (page, offset) = self.page_and_offset();
1584        // SAFETY: By having `RowRef`,
1585        // we know that `offset` is a valid offset for a row in `page` typed at `ty`.
1586        unsafe { eq_row_in_page_to_pv(self.blob_store, page, offset, rhs, ty) }
1587    }
1588}
1589
1590impl Hash for RowRef<'_> {
1591    fn hash<H: Hasher>(&self, state: &mut H) {
1592        let (page, offset) = self.table.page_and_offset(self.pointer);
1593        let ty = &self.table.row_layout;
1594        // SAFETY: A `RowRef` is a proof that `self.pointer` refers to a live fixed row in `self.table`, so:
1595        // 1. `offset` points at a row in `page` lasting `ty.size()` bytes.
1596        // 2. the row is valid for `ty`.
1597        // 3. for any `vlr: VarLenRef` stored in the row,
1598        //    `vlr.first_offset` is either `NULL` or points to a valid granule in `page`.
1599        unsafe { hash_row_in_page(state, page, self.blob_store, offset, ty) };
1600    }
1601}
1602
1603/// An iterator over all the rows, yielded as [`RowRef`]s, in a table.
1604pub struct TableScanIter<'table> {
1605    /// The current page we're yielding rows from.
1606    /// When `None`, the iterator will attempt to advance to the next page, if any.
1607    current_page: Option<FixedLenRowsIter<'table>>,
1608    /// The current page index we are or will visit.
1609    current_page_idx: PageIndex,
1610    /// The table the iterator is yielding rows from.
1611    pub(crate) table: &'table Table,
1612    /// The `BlobStore` that row references may refer into.
1613    pub(crate) blob_store: &'table dyn BlobStore,
1614}
1615
1616impl<'a> Iterator for TableScanIter<'a> {
1617    type Item = RowRef<'a>;
1618
1619    fn next(&mut self) -> Option<Self::Item> {
1620        // This could have been written using `.flat_map`,
1621        // but we don't have `type Foo = impl Iterator<...>;` on stable yet.
1622        loop {
1623            match &mut self.current_page {
1624                // We're currently visiting a page,
1625                Some(iter_fixed_len) => {
1626                    if let Some(page_offset) = iter_fixed_len.next() {
1627                        // There's still at least one row in that page to visit,
1628                        // return a ref to that row.
1629                        let ptr =
1630                            RowPointer::new(false, self.current_page_idx, page_offset, self.table.squashed_offset);
1631
1632                        // SAFETY: `offset` came from the `iter_fixed_len`, so it must point to a valid row.
1633                        let row_ref = unsafe { self.table.get_row_ref_unchecked(self.blob_store, ptr) };
1634                        return Some(row_ref);
1635                    } else {
1636                        // We've finished visiting that page, so set `current_page` to `None`,
1637                        // increment `self.current_page_idx` to the index of the next page,
1638                        // and go to the `None` case (1) in the match.
1639                        self.current_page = None;
1640                        self.current_page_idx.0 += 1;
1641                    }
1642                }
1643
1644                // (1) If we aren't currently visiting a page,
1645                // the `else` case in the `Some` match arm
1646                // already incremented `self.current_page_idx`,
1647                // or we're just beginning and so it was initialized as 0.
1648                None => {
1649                    // If there's another page, set `self.current_page` to it,
1650                    // and go to the `Some` case in the match.
1651                    let next_page = self.table.pages().get(self.current_page_idx.idx())?;
1652                    let iter = next_page.iter_fixed_len(self.table.row_size());
1653                    self.current_page = Some(iter);
1654                }
1655            }
1656        }
1657    }
1658}
1659
1660/// A combined table and index,
1661/// allowing direct extraction of a [`IndexScanIter`].
1662#[derive(Copy, Clone)]
1663pub struct TableAndIndex<'a> {
1664    table: &'a Table,
1665    blob_store: &'a dyn BlobStore,
1666    index: &'a TableIndex,
1667}
1668
1669impl<'a> TableAndIndex<'a> {
1670    pub fn table(&self) -> &'a Table {
1671        self.table
1672    }
1673
1674    pub fn index(&self) -> &'a TableIndex {
1675        self.index
1676    }
1677
1678    /// Returns an iterator yielding all rows in this index for `key`.
1679    ///
1680    /// Matching is defined by `Ord for AlgebraicValue`.
1681    pub fn seek_point(&self, key: &AlgebraicValue) -> IndexScanPointIter<'a> {
1682        IndexScanPointIter {
1683            table: self.table,
1684            blob_store: self.blob_store,
1685            btree_index_iter: self.index.seek_point(key),
1686        }
1687    }
1688
1689    /// Returns an iterator yielding all rows in this index that fall within `range`.
1690    ///
1691    /// Matching is defined by `Ord for AlgebraicValue`.
1692    pub fn seek_range(&self, range: &impl RangeBounds<AlgebraicValue>) -> IndexScanRangeIter<'a> {
1693        IndexScanRangeIter {
1694            table: self.table,
1695            blob_store: self.blob_store,
1696            btree_index_iter: self.index.seek_range(range),
1697        }
1698    }
1699}
1700
1701/// An iterator using a [`TableIndex`] to scan a `table`
1702/// for all the [`RowRef`]s matching the specified `key` in the indexed column(s).
1703///
1704/// Matching is defined by `Ord for AlgebraicValue`.
1705pub struct IndexScanPointIter<'a> {
1706    /// The table being scanned for rows.
1707    table: &'a Table,
1708    /// The blob store; passed on to the [`RowRef`]s in case they need it.
1709    blob_store: &'a dyn BlobStore,
1710    /// The iterator performing the index scan yielding row pointers.
1711    btree_index_iter: TableIndexPointIter<'a>,
1712}
1713
1714impl<'a> Iterator for IndexScanPointIter<'a> {
1715    type Item = RowRef<'a>;
1716
1717    fn next(&mut self) -> Option<Self::Item> {
1718        let ptr = self.btree_index_iter.next()?;
1719        // FIXME: Determine if this is correct and if so use `_unchecked`.
1720        // Will a table's index necessarily hold only pointers into that index?
1721        // Edge case: if an index is added during a transaction which then scans that index,
1722        // it appears that the newly-created `TxState` index
1723        // will also hold pointers into the `CommittedState`.
1724        //
1725        // SAFETY: Assuming this is correct,
1726        // `ptr` came from the index, which always holds pointers to valid rows.
1727        self.table.get_row_ref(self.blob_store, ptr)
1728    }
1729}
1730
1731/// An iterator using a [`TableIndex`] to scan a `table`
1732/// for all the [`RowRef`]s matching the specified `range` in the indexed column(s).
1733///
1734/// Matching is defined by `Ord for AlgebraicValue`.
1735pub struct IndexScanRangeIter<'a> {
1736    /// The table being scanned for rows.
1737    table: &'a Table,
1738    /// The blob store; passed on to the [`RowRef`]s in case they need it.
1739    blob_store: &'a dyn BlobStore,
1740    /// The iterator performing the index scan yielding row pointers.
1741    btree_index_iter: TableIndexRangeIter<'a>,
1742}
1743
1744impl<'a> Iterator for IndexScanRangeIter<'a> {
1745    type Item = RowRef<'a>;
1746
1747    fn next(&mut self) -> Option<Self::Item> {
1748        let ptr = self.btree_index_iter.next()?;
1749        // FIXME: Determine if this is correct and if so use `_unchecked`.
1750        // Will a table's index necessarily hold only pointers into that index?
1751        // Edge case: if an index is added during a transaction which then scans that index,
1752        // it appears that the newly-created `TxState` index
1753        // will also hold pointers into the `CommittedState`.
1754        //
1755        // SAFETY: Assuming this is correct,
1756        // `ptr` came from the index, which always holds pointers to valid rows.
1757        self.table.get_row_ref(self.blob_store, ptr)
1758    }
1759}
1760
1761#[derive(Error, Debug, PartialEq, Eq)]
1762#[error("Unique constraint violation '{}' in table '{}': column(s): '{:?}' value: {}", constraint_name, table_name, cols, value.to_satn())]
1763pub struct UniqueConstraintViolation {
1764    pub constraint_name: Box<str>,
1765    pub table_name: Box<str>,
1766    pub cols: Vec<Box<str>>,
1767    pub value: AlgebraicValue,
1768}
1769
1770impl UniqueConstraintViolation {
1771    /// Returns a unique constraint violation error for the given `index`
1772    /// and the `value` that would have been duplicated.
1773    #[cold]
1774    fn build(schema: &TableSchema, index: &TableIndex, index_id: IndexId, value: AlgebraicValue) -> Self {
1775        // Fetch the table name.
1776        let table_name = schema.table_name.clone();
1777
1778        // Fetch the names of the columns used in the index.
1779        let cols = index
1780            .indexed_columns
1781            .iter()
1782            .map(|x| schema.columns()[x.idx()].col_name.clone())
1783            .collect();
1784
1785        // Fetch the name of the index.
1786        let constraint_name = schema
1787            .indexes
1788            .iter()
1789            .find(|i| i.index_id == index_id)
1790            .unwrap()
1791            .index_name
1792            .clone();
1793
1794        Self {
1795            constraint_name,
1796            table_name,
1797            cols,
1798            value,
1799        }
1800    }
1801}
1802
1803// Private API:
1804impl Table {
1805    /// Returns a unique constraint violation error for the given `index`
1806    /// and the `value` that would have been duplicated.
1807    #[cold]
1808    pub fn build_error_unique(
1809        &self,
1810        index: &TableIndex,
1811        index_id: IndexId,
1812        value: AlgebraicValue,
1813    ) -> UniqueConstraintViolation {
1814        let schema = self.get_schema();
1815        UniqueConstraintViolation::build(schema, index, index_id, value)
1816    }
1817
1818    /// Returns a new empty table using the particulars passed.
1819    fn new_with_indexes_capacity(
1820        schema: Arc<TableSchema>,
1821        row_layout: RowTypeLayout,
1822        static_layout: Option<(StaticLayout, StaticBsatnValidator)>,
1823        visitor_prog: VarLenVisitorProgram,
1824        squashed_offset: SquashedOffset,
1825        pointer_map: Option<PointerMap>,
1826    ) -> Self {
1827        Self {
1828            inner: TableInner {
1829                row_layout,
1830                static_layout,
1831                visitor_prog,
1832                pages: Pages::default(),
1833            },
1834            is_scheduler: schema.schedule.is_some(),
1835            schema,
1836            indexes: BTreeMap::new(),
1837            pointer_map,
1838            squashed_offset,
1839            row_count: 0,
1840            blob_store_bytes: BlobNumBytes::default(),
1841        }
1842    }
1843
1844    /// Returns whether the row at `ptr` is present or not.
1845    // TODO: Remove all uses of this method,
1846    //       or more likely, gate them behind `debug_assert!`
1847    //       so they don't have semantic meaning.
1848    //
1849    //       Unlike the previous `locking_tx_datastore::Table`'s `RowId`,
1850    //       `RowPointer` is not content-addressed.
1851    //       This means it is possible to:
1852    //       - have a `RowPointer` A* to row A,
1853    //       - Delete row A,
1854    //       - Insert row B into the same storage as freed from A,
1855    //       - Test `is_row_present(A*)`, which falsely reports that row A is still present.
1856    //
1857    //       In the final interface, this method is superfluous anyways,
1858    //       as `RowPointer` is not part of our public interface.
1859    //       Instead, we will always discover a known-present `RowPointer`
1860    //       during a table scan or index seek.
1861    //       As such, our `delete` and `insert` methods can be `unsafe`
1862    //       and trust that the `RowPointer` is valid.
1863    fn is_row_present(&self, ptr: RowPointer) -> bool {
1864        if self.squashed_offset != ptr.squashed_offset() {
1865            return false;
1866        }
1867        let Some((page, offset)) = self.inner.try_page_and_offset(ptr) else {
1868            return false;
1869        };
1870        page.has_row_offset(self.row_size(), offset)
1871    }
1872
1873    /// Returns the row size for a row in the table.
1874    pub fn row_size(&self) -> Size {
1875        self.row_layout().size()
1876    }
1877
1878    /// Returns the layout for a row in the table.
1879    fn row_layout(&self) -> &RowTypeLayout {
1880        &self.inner.row_layout
1881    }
1882
1883    /// Returns the pages storing the physical rows of this table.
1884    fn pages(&self) -> &Pages {
1885        &self.inner.pages
1886    }
1887
1888    /// Iterates over each [`Page`] in this table, ensuring that its hash is computed before yielding it.
1889    ///
1890    /// Used when capturing a snapshot.
1891    pub fn iter_pages_with_hashes(&mut self) -> impl Iterator<Item = (blake3::Hash, &Page)> {
1892        self.inner.pages.iter_mut().map(|page| {
1893            let hash = page.save_or_get_content_hash();
1894            (hash, &**page)
1895        })
1896    }
1897
1898    /// Returns the number of pages storing the physical rows of this table.
1899    fn num_pages(&self) -> usize {
1900        self.inner.pages.len()
1901    }
1902
1903    /// Returns the [`StaticLayout`] for this table,
1904    pub(crate) fn static_layout(&self) -> Option<&StaticLayout> {
1905        self.inner.static_layout.as_ref().map(|(s, _)| s)
1906    }
1907
1908    /// Rebuild the [`PointerMap`] by iterating over all the rows in `self` and inserting them.
1909    ///
1910    /// Called when restoring from a snapshot after installing the pages,
1911    /// but after computing the row count,
1912    /// since snapshots do not save the pointer map..
1913    fn rebuild_pointer_map(&mut self, blob_store: &dyn BlobStore) {
1914        // TODO(perf): Pre-allocate `PointerMap.map` with capacity `self.row_count`.
1915        // Alternatively, do this at the same time as `compute_row_count`.
1916        let ptrs = self
1917            .scan_rows(blob_store)
1918            .map(|row_ref| (row_ref.row_hash(), row_ref.pointer()))
1919            .collect::<PointerMap>();
1920        self.pointer_map = Some(ptrs);
1921    }
1922
1923    /// Compute and store `self.row_count` and `self.blob_store_bytes`
1924    /// by iterating over all the rows in `self` and counting them.
1925    ///
1926    /// Called when restoring from a snapshot after installing the pages,
1927    /// since snapshots do not save this metadata.
1928    fn compute_row_count(&mut self, blob_store: &dyn BlobStore) {
1929        let mut row_count = 0;
1930        let mut blob_store_bytes = 0;
1931        for row in self.scan_rows(blob_store) {
1932            row_count += 1;
1933            blob_store_bytes += row.blob_store_bytes();
1934        }
1935        self.row_count = row_count as u64;
1936        self.blob_store_bytes = blob_store_bytes.into();
1937    }
1938}
1939
1940#[cfg(test)]
1941pub(crate) mod test {
1942    use super::*;
1943    use crate::blob_store::{HashMapBlobStore, NullBlobStore};
1944    use crate::page::tests::hash_unmodified_save_get;
1945    use crate::var_len::VarLenGranule;
1946    use proptest::prelude::*;
1947    use proptest::test_runner::TestCaseResult;
1948    use spacetimedb_lib::db::raw_def::v9::{RawIndexAlgorithm, RawModuleDefV9Builder};
1949    use spacetimedb_primitives::{col_list, TableId};
1950    use spacetimedb_sats::bsatn::to_vec;
1951    use spacetimedb_sats::proptest::{generate_typed_row, generate_typed_row_vec};
1952    use spacetimedb_sats::{product, AlgebraicType, ArrayValue};
1953    use spacetimedb_schema::def::{BTreeAlgorithm, ModuleDef};
1954    use spacetimedb_schema::schema::Schema as _;
1955
1956    /// Create a `Table` from a `ProductType` without validation.
1957    pub(crate) fn table(ty: ProductType) -> Table {
1958        // Use a fast path here to avoid slowing down Miri in the proptests.
1959        // Does not perform validation.
1960        let schema = TableSchema::from_product_type(ty);
1961        Table::new(schema.into(), SquashedOffset::COMMITTED_STATE)
1962    }
1963
1964    #[test]
1965    fn unique_violation_error() {
1966        let table_name = "UniqueIndexed";
1967        let index_name = "UniqueIndexed_unique_col_idx_btree";
1968        let mut builder = RawModuleDefV9Builder::new();
1969        builder
1970            .build_table_with_new_type(
1971                table_name,
1972                ProductType::from([("unique_col", AlgebraicType::I32), ("other_col", AlgebraicType::I32)]),
1973                true,
1974            )
1975            .with_unique_constraint(0)
1976            .with_index(
1977                RawIndexAlgorithm::BTree { columns: col_list![0] },
1978                "accessor_name_doesnt_matter",
1979            );
1980
1981        let def: ModuleDef = builder.finish().try_into().expect("Failed to build schema");
1982
1983        let schema = TableSchema::from_module_def(&def, def.table(table_name).unwrap(), (), TableId::SENTINEL);
1984        assert_eq!(schema.indexes.len(), 1);
1985        let index_schema = schema.indexes[0].clone();
1986
1987        let mut table = Table::new(schema.into(), SquashedOffset::COMMITTED_STATE);
1988        let cols = ColList::new(0.into());
1989        let algo = BTreeAlgorithm { columns: cols.clone() }.into();
1990
1991        let index = table.new_index(&algo, true).unwrap();
1992        // SAFETY: Index was derived from `table`.
1993        unsafe { table.insert_index(&NullBlobStore, index_schema.index_id, index) };
1994
1995        // Reserve a page so that we can check the hash.
1996        let pi = table.inner.pages.reserve_empty_page(table.row_size()).unwrap();
1997        let hash_pre_ins = hash_unmodified_save_get(&mut table.inner.pages[pi]);
1998
1999        // Insert the row (0, 0).
2000        table
2001            .insert(&mut NullBlobStore, &product![0i32, 0i32])
2002            .expect("Initial insert failed");
2003
2004        // Inserting cleared the hash.
2005        let hash_post_ins = hash_unmodified_save_get(&mut table.inner.pages[pi]);
2006        assert_ne!(hash_pre_ins, hash_post_ins);
2007
2008        // Try to insert the row (0, 1), and assert that we get the expected error.
2009        match table.insert(&mut NullBlobStore, &product![0i32, 1i32]) {
2010            Ok(_) => panic!("Second insert with same unique value succeeded"),
2011            Err(InsertError::IndexError(UniqueConstraintViolation {
2012                constraint_name,
2013                table_name,
2014                cols,
2015                value,
2016            })) => {
2017                assert_eq!(&*constraint_name, index_name);
2018                assert_eq!(&*table_name, "UniqueIndexed");
2019                assert_eq!(cols.iter().map(|c| c.to_string()).collect::<Vec<_>>(), &["unique_col"]);
2020                assert_eq!(value, AlgebraicValue::I32(0));
2021            }
2022            Err(e) => panic!("Expected UniqueConstraintViolation but found {:?}", e),
2023        }
2024
2025        // Second insert did clear the hash while we had a constraint violation,
2026        // as constraint checking is done after insertion and then rolled back.
2027        assert_eq!(table.inner.pages[pi].unmodified_hash(), None);
2028    }
2029
2030    fn insert_retrieve_body(ty: impl Into<ProductType>, val: impl Into<ProductValue>) -> TestCaseResult {
2031        let val = val.into();
2032        let mut blob_store = HashMapBlobStore::default();
2033        let mut table = table(ty.into());
2034        let (hash, row) = table.insert(&mut blob_store, &val).unwrap();
2035        let hash = hash.unwrap();
2036        prop_assert_eq!(row.row_hash(), hash);
2037        let ptr = row.pointer();
2038        prop_assert_eq!(table.pointers_for(hash), &[ptr]);
2039
2040        prop_assert_eq!(table.inner.pages.len(), 1);
2041        prop_assert_eq!(table.inner.pages[PageIndex(0)].num_rows(), 1);
2042
2043        let row_ref = table.get_row_ref(&blob_store, ptr).unwrap();
2044        prop_assert_eq!(row_ref.to_product_value(), val.clone());
2045        let bsatn_val = to_vec(&val).unwrap();
2046        prop_assert_eq!(&bsatn_val, &to_vec(&row_ref).unwrap());
2047        prop_assert_eq!(&bsatn_val, &row_ref.to_bsatn_vec().unwrap());
2048
2049        prop_assert_eq!(
2050            &table.scan_rows(&blob_store).map(|r| r.pointer()).collect::<Vec<_>>(),
2051            &[ptr]
2052        );
2053
2054        Ok(())
2055    }
2056
2057    #[test]
2058    fn repro_serialize_bsatn_empty_array() {
2059        let ty = AlgebraicType::array(AlgebraicType::U64);
2060        let arr = ArrayValue::from(Vec::<u64>::new().into_boxed_slice());
2061        insert_retrieve_body(ty, AlgebraicValue::from(arr)).unwrap();
2062    }
2063
2064    #[test]
2065    fn repro_serialize_bsatn_debug_assert() {
2066        let ty = AlgebraicType::array(AlgebraicType::U64);
2067        let arr = ArrayValue::from((0..130u64).collect::<Box<_>>());
2068        insert_retrieve_body(ty, AlgebraicValue::from(arr)).unwrap();
2069    }
2070
2071    fn reconstruct_index_num_key_bytes(table: &Table, blob_store: &dyn BlobStore, index_id: IndexId) -> u64 {
2072        let index = table.get_index_by_id(index_id).unwrap();
2073
2074        index
2075            .seek_range(&(..))
2076            .map(|row_ptr| {
2077                let row_ref = table.get_row_ref(blob_store, row_ptr).unwrap();
2078                let key = row_ref.project(&index.indexed_columns).unwrap();
2079                crate::table_index::KeySize::key_size_in_bytes(&key) as u64
2080            })
2081            .sum()
2082    }
2083
2084    /// Given a row type `ty`, a set of rows of that type `vals`,
2085    /// and a set of columns within that type `indexed_columns`,
2086    /// populate a table with `vals`, add an index on the `indexed_columns`,
2087    /// and perform various assertions that the reported index size metrics are correct.
2088    fn test_index_size_reporting(
2089        ty: ProductType,
2090        vals: Vec<ProductValue>,
2091        indexed_columns: ColList,
2092    ) -> Result<(), TestCaseError> {
2093        let mut blob_store = HashMapBlobStore::default();
2094        let mut table = table(ty.clone());
2095
2096        for row in &vals {
2097            prop_assume!(table.insert(&mut blob_store, row).is_ok());
2098        }
2099
2100        // We haven't added any indexes yet, so there should be 0 rows in indexes.
2101        prop_assert_eq!(table.num_rows_in_indexes(), 0);
2102
2103        let index_id = IndexId(0);
2104
2105        let algo = BTreeAlgorithm {
2106            columns: indexed_columns.clone(),
2107        }
2108        .into();
2109        let index = TableIndex::new(&ty, &algo, false).unwrap();
2110        // Add an index on column 0.
2111        // Safety:
2112        // We're using `ty` as the row type for both `table` and the new index.
2113        unsafe { table.insert_index(&blob_store, index_id, index) };
2114
2115        // We have one index, which should be fully populated,
2116        // so in total we should have the same number of rows in indexes as we have rows.
2117        prop_assert_eq!(table.num_rows_in_indexes(), table.num_rows());
2118
2119        let index = table.get_index_by_id(index_id).unwrap();
2120
2121        // One index, so table's reporting of bytes used should match that index's reporting.
2122        prop_assert_eq!(table.bytes_used_by_index_keys(), index.num_key_bytes());
2123
2124        // Walk all the rows in the index, sum their key size,
2125        // and assert it matches the `index.num_key_bytes()`
2126        prop_assert_eq!(
2127            index.num_key_bytes(),
2128            reconstruct_index_num_key_bytes(&table, &blob_store, index_id)
2129        );
2130
2131        // Walk all the rows we inserted, project them to the cols that will be their keys,
2132        // sum their key size,
2133        // and assert it matches the `index.num_key_bytes()`
2134        let key_size_in_pvs = vals
2135            .iter()
2136            .map(|row| crate::table_index::KeySize::key_size_in_bytes(&row.project(&indexed_columns).unwrap()) as u64)
2137            .sum();
2138        prop_assert_eq!(index.num_key_bytes(), key_size_in_pvs);
2139
2140        let algo = BTreeAlgorithm {
2141            columns: indexed_columns,
2142        }
2143        .into();
2144        let index = TableIndex::new(&ty, &algo, false).unwrap();
2145        // Add a duplicate of the same index, so we can check that all above quantities double.
2146        // Safety:
2147        // As above, we're using `ty` as the row type for both `table` and the new index.
2148        unsafe { table.insert_index(&blob_store, IndexId(1), index) };
2149
2150        prop_assert_eq!(table.num_rows_in_indexes(), table.num_rows() * 2);
2151        prop_assert_eq!(table.bytes_used_by_index_keys(), key_size_in_pvs * 2);
2152
2153        Ok(())
2154    }
2155
2156    proptest! {
2157        #![proptest_config(ProptestConfig { max_shrink_iters: 0x10000000, ..Default::default() })]
2158
2159        #[test]
2160        fn insert_retrieve((ty, val) in generate_typed_row()) {
2161            insert_retrieve_body(ty, val)?;
2162        }
2163
2164        #[test]
2165        fn insert_delete_removed_from_pointer_map((ty, val) in generate_typed_row()) {
2166            let mut blob_store = HashMapBlobStore::default();
2167            let mut table = table(ty);
2168            let (hash, row) = table.insert(&mut blob_store, &val).unwrap();
2169            let hash = hash.unwrap();
2170            prop_assert_eq!(row.row_hash(), hash);
2171            let ptr = row.pointer();
2172            prop_assert_eq!(table.pointers_for(hash), &[ptr]);
2173
2174            prop_assert_eq!(table.inner.pages.len(), 1);
2175            prop_assert_eq!(table.inner.pages[PageIndex(0)].num_rows(), 1);
2176            prop_assert_eq!(&table.scan_rows(&blob_store).map(|r| r.pointer()).collect::<Vec<_>>(), &[ptr]);
2177            prop_assert_eq!(table.row_count, 1);
2178
2179            let hash_pre_del = hash_unmodified_save_get(&mut table.inner.pages[ptr.page_index()]);
2180
2181            table.delete(&mut blob_store, ptr, |_| ());
2182
2183            let hash_post_del = hash_unmodified_save_get(&mut table.inner.pages[ptr.page_index()]);
2184            assert_ne!(hash_pre_del, hash_post_del);
2185
2186            prop_assert_eq!(table.pointers_for(hash), &[]);
2187
2188            prop_assert_eq!(table.inner.pages.len(), 1);
2189            prop_assert_eq!(table.inner.pages[PageIndex(0)].num_rows(), 0);
2190            prop_assert_eq!(table.row_count, 0);
2191
2192            prop_assert!(&table.scan_rows(&blob_store).next().is_none());
2193        }
2194
2195        #[test]
2196        fn insert_duplicate_set_semantic((ty, val) in generate_typed_row()) {
2197            let mut blob_store = HashMapBlobStore::default();
2198            let mut table = table(ty);
2199
2200            let (hash, row) = table.insert(&mut blob_store, &val).unwrap();
2201            let hash = hash.unwrap();
2202            prop_assert_eq!(row.row_hash(), hash);
2203            let ptr = row.pointer();
2204            prop_assert_eq!(table.inner.pages.len(), 1);
2205            prop_assert_eq!(table.pointers_for(hash), &[ptr]);
2206            prop_assert_eq!(table.row_count, 1);
2207            prop_assert_eq!(&table.scan_rows(&blob_store).map(|r| r.pointer()).collect::<Vec<_>>(), &[ptr]);
2208
2209            let blob_uses = blob_store.usage_counter();
2210
2211            let hash_pre_ins = hash_unmodified_save_get(&mut table.inner.pages[ptr.page_index()]);
2212
2213            prop_assert!(table.insert(&mut blob_store, &val).is_err());
2214
2215            // Hash was cleared and is different despite failure to insert.
2216            let hash_post_ins = hash_unmodified_save_get(&mut table.inner.pages[ptr.page_index()]);
2217            assert_ne!(hash_pre_ins, hash_post_ins);
2218
2219            prop_assert_eq!(table.row_count, 1);
2220            prop_assert_eq!(table.inner.pages.len(), 1);
2221            prop_assert_eq!(table.pointers_for(hash), &[ptr]);
2222
2223            let blob_uses_after = blob_store.usage_counter();
2224
2225            prop_assert_eq!(blob_uses_after, blob_uses);
2226            prop_assert_eq!(table.inner.pages[PageIndex(0)].num_rows(), 1);
2227            prop_assert_eq!(&table.scan_rows(&blob_store).map(|r| r.pointer()).collect::<Vec<_>>(), &[ptr]);
2228        }
2229
2230        #[test]
2231        fn insert_bsatn_same_as_pv((ty, val) in generate_typed_row()) {
2232            let mut bs_pv = HashMapBlobStore::default();
2233            let mut table_pv = table(ty.clone());
2234            let res_pv = table_pv.insert(&mut bs_pv, &val);
2235
2236            let mut bs_bsatn = HashMapBlobStore::default();
2237            let mut table_bsatn = table(ty);
2238            let res_bsatn = insert_bsatn(&mut table_bsatn, &mut bs_bsatn, &val);
2239
2240            prop_assert_eq!(res_pv, res_bsatn);
2241            prop_assert_eq!(bs_pv, bs_bsatn);
2242            prop_assert_eq!(table_pv, table_bsatn);
2243        }
2244
2245        #[test]
2246        fn row_size_reporting_matches_slow_implementations((ty, vals) in generate_typed_row_vec(128, 2048)) {
2247            let mut blob_store = HashMapBlobStore::default();
2248            let mut table = table(ty.clone());
2249
2250            for row in &vals {
2251                prop_assume!(table.insert(&mut blob_store, row).is_ok());
2252            }
2253
2254            prop_assert_eq!(table.bytes_used_by_rows(), table.reconstruct_bytes_used_by_rows());
2255            prop_assert_eq!(table.num_rows(), table.reconstruct_num_rows());
2256            prop_assert_eq!(table.num_rows(), vals.len() as u64);
2257
2258            // TODO(testing): Determine if there's a meaningful way to test that the blob store reporting is correct.
2259            // I (pgoldman 2025-01-27) doubt it, as the test would be "visit every blob and sum their size,"
2260            // which is already what the actual implementation does.
2261        }
2262
2263        #[test]
2264        fn index_size_reporting_matches_slow_implementations_single_column((ty, vals) in generate_typed_row_vec(128, 2048)) {
2265            prop_assume!(!ty.elements.is_empty());
2266
2267            test_index_size_reporting(ty, vals, ColList::from(ColId(0)))?;
2268        }
2269
2270        #[test]
2271        fn index_size_reporting_matches_slow_implementations_two_column((ty, vals) in generate_typed_row_vec(128, 2048)) {
2272            prop_assume!(ty.elements.len() >= 2);
2273
2274
2275            test_index_size_reporting(ty, vals, ColList::from([ColId(0), ColId(1)]))?;
2276        }
2277    }
2278
2279    fn insert_bsatn<'a>(
2280        table: &'a mut Table,
2281        blob_store: &'a mut dyn BlobStore,
2282        val: &ProductValue,
2283    ) -> Result<(Option<RowHash>, RowRef<'a>), InsertError> {
2284        let row = &to_vec(&val).unwrap();
2285
2286        // Optimistically insert the `row` before checking any constraints
2287        // under the assumption that errors (unique constraint & set semantic violations) are rare.
2288        let (row_ref, blob_bytes) = table.insert_physically_bsatn(blob_store, row)?;
2289        let row_ptr = row_ref.pointer();
2290
2291        // Confirm the insertion, checking any constraints, removing the physical row on error.
2292        // SAFETY: We just inserted `ptr`, so it must be present.
2293        let (hash, row_ptr) = unsafe { table.confirm_insertion::<true>(blob_store, row_ptr, blob_bytes) }?;
2294        // SAFETY: Per post-condition of `confirm_insertion`, `row_ptr` refers to a valid row.
2295        let row_ref = unsafe { table.inner.get_row_ref_unchecked(blob_store, row_ptr) };
2296        Ok((hash, row_ref))
2297    }
2298
2299    // Compare `scan_rows` against a simpler implementation.
2300    #[test]
2301    fn table_scan_iter_eq_flatmap() {
2302        let mut blob_store = HashMapBlobStore::default();
2303        let mut table = table(AlgebraicType::U64.into());
2304        for v in 0..2u64.pow(14) {
2305            table.insert(&mut blob_store, &product![v]).unwrap();
2306        }
2307
2308        let complex = table.scan_rows(&blob_store).map(|r| r.pointer());
2309        let simple = table
2310            .inner
2311            .pages
2312            .iter()
2313            .zip((0..).map(PageIndex))
2314            .flat_map(|(page, pi)| {
2315                page.iter_fixed_len(table.row_size())
2316                    .map(move |po| RowPointer::new(false, pi, po, table.squashed_offset))
2317            });
2318        assert!(complex.eq(simple));
2319    }
2320
2321    #[test]
2322    #[should_panic]
2323    fn read_row_unaligned_page_offset_soundness() {
2324        // Insert a `u64` into a table.
2325        let pt = AlgebraicType::U64.into();
2326        let pv = product![42u64];
2327        let mut table = table(pt);
2328        let blob_store = &mut NullBlobStore;
2329        let (_, row_ref) = table.insert(blob_store, &pv).unwrap();
2330
2331        // Manipulate the page offset to 1 instead of 0.
2332        // This now points into the "middle" of a row.
2333        let ptr = row_ref.pointer().with_page_offset(PageOffset(1));
2334
2335        // We expect this to panic.
2336        // Miri should not have any issue with this call either.
2337        table.get_row_ref(&NullBlobStore, ptr).unwrap().to_product_value();
2338    }
2339
2340    #[test]
2341    fn test_blob_store_bytes() {
2342        let pt: ProductType = [AlgebraicType::String, AlgebraicType::I32].into();
2343        let blob_store = &mut HashMapBlobStore::default();
2344        let mut insert =
2345            |table: &mut Table, string, num| table.insert(blob_store, &product![string, num]).unwrap().1.pointer();
2346        let mut table1 = table(pt.clone());
2347
2348        // Insert short string, `blob_store_bytes` should be 0.
2349        let short_str = std::str::from_utf8(&[98; 6]).unwrap();
2350        let short_row_ptr = insert(&mut table1, short_str, 0);
2351        assert_eq!(table1.blob_store_bytes.0, 0);
2352
2353        // Insert long string, `blob_store_bytes` should be the length of the string.
2354        const BLOB_OBJ_LEN: BlobNumBytes = BlobNumBytes(VarLenGranule::OBJECT_SIZE_BLOB_THRESHOLD + 1);
2355        let long_str = std::str::from_utf8(&[98; BLOB_OBJ_LEN.0]).unwrap();
2356        let long_row_ptr = insert(&mut table1, long_str, 0);
2357        assert_eq!(table1.blob_store_bytes, BLOB_OBJ_LEN);
2358
2359        // Insert previous long string in the same table,
2360        // `blob_store_bytes` should count the length twice,
2361        // even though `HashMapBlobStore` deduplicates it.
2362        let long_row_ptr2 = insert(&mut table1, long_str, 1);
2363        const BLOB_OBJ_LEN_2X: BlobNumBytes = BlobNumBytes(BLOB_OBJ_LEN.0 * 2);
2364        assert_eq!(table1.blob_store_bytes, BLOB_OBJ_LEN_2X);
2365
2366        // Insert previous long string in a new table,
2367        // `blob_store_bytes` should show the length,
2368        // even though `HashMapBlobStore` deduplicates it.
2369        let mut table2 = table(pt);
2370        let _ = insert(&mut table2, long_str, 0);
2371        assert_eq!(table2.blob_store_bytes, BLOB_OBJ_LEN);
2372
2373        // Delete `short_str` row. This should not affect the byte count.
2374        table1.delete(blob_store, short_row_ptr, |_| ()).unwrap();
2375        assert_eq!(table1.blob_store_bytes, BLOB_OBJ_LEN_2X);
2376
2377        // Delete the first long string row. This gets us down to `BLOB_OBJ_LEN` (we had 2x before).
2378        table1.delete(blob_store, long_row_ptr, |_| ()).unwrap();
2379        assert_eq!(table1.blob_store_bytes, BLOB_OBJ_LEN);
2380
2381        // Delete the first long string row. This gets us down to 0 (we've now deleted 2x).
2382        table1.delete(blob_store, long_row_ptr2, |_| ()).unwrap();
2383        assert_eq!(table1.blob_store_bytes, 0.into());
2384    }
2385
2386    /// Assert that calling `get_row_ref` to get a row ref to a non-existent `RowPointer`
2387    /// does not panic.
2388    #[test]
2389    fn get_row_ref_no_panic() {
2390        let blob_store = &mut HashMapBlobStore::default();
2391        let table = table([AlgebraicType::String, AlgebraicType::I32].into());
2392
2393        // This row pointer has an incorrect `SquashedOffset`, and so does not point into `table`.
2394        assert!(table
2395            .get_row_ref(
2396                blob_store,
2397                RowPointer::new(false, PageIndex(0), PageOffset(0), SquashedOffset::TX_STATE),
2398            )
2399            .is_none());
2400
2401        // This row pointer has the correct `SquashedOffset`, but points out-of-bounds within `table`.
2402        assert!(table
2403            .get_row_ref(
2404                blob_store,
2405                RowPointer::new(false, PageIndex(0), PageOffset(0), SquashedOffset::COMMITTED_STATE),
2406            )
2407            .is_none());
2408    }
2409}