spacetimedb_table/
table.rs

1use super::{
2    bflatn_from::serialize_row_from_page,
3    bflatn_to::{write_row_to_pages, write_row_to_pages_bsatn, Error},
4    blob_store::BlobStore,
5    eq::eq_row_in_page,
6    eq_to_pv::eq_row_in_page_to_pv,
7    indexes::{Bytes, PageIndex, PageOffset, RowHash, RowPointer, Size, SquashedOffset, PAGE_DATA_SIZE},
8    layout::{AlgebraicTypeLayout, RowTypeLayout},
9    page::{FixedLenRowsIter, Page},
10    page_pool::PagePool,
11    pages::Pages,
12    pointer_map::PointerMap,
13    read_column::{ReadColumn, TypeError},
14    row_hash::hash_row_in_page,
15    row_type_visitor::{row_type_visitor, VarLenVisitorProgram},
16    static_assert_size,
17    static_bsatn_validator::{static_bsatn_validator, validate_bsatn, StaticBsatnValidator},
18    static_layout::StaticLayout,
19    table_index::{TableIndex, TableIndexPointIter, TableIndexRangeIter},
20    var_len::VarLenMembers,
21    MemoryUsage,
22};
23use core::ops::RangeBounds;
24use core::{fmt, ptr};
25use core::{
26    hash::{Hash, Hasher},
27    hint::unreachable_unchecked,
28};
29use derive_more::{Add, AddAssign, From, Sub, SubAssign};
30use enum_as_inner::EnumAsInner;
31use smallvec::SmallVec;
32use spacetimedb_lib::{bsatn::DecodeError, de::DeserializeOwned};
33use spacetimedb_primitives::{ColId, ColList, IndexId, SequenceId};
34use spacetimedb_sats::{
35    algebraic_value::ser::ValueSerializer,
36    bsatn::{self, ser::BsatnError, ToBsatn},
37    i256,
38    product_value::InvalidFieldError,
39    satn::Satn,
40    ser::{Serialize, Serializer},
41    u256, AlgebraicValue, ProductType, ProductValue,
42};
43use spacetimedb_schema::{
44    def::IndexAlgorithm,
45    schema::{IndexSchema, TableSchema},
46    type_for_generate::PrimitiveType,
47};
48use std::{
49    collections::{btree_map, BTreeMap},
50    sync::Arc,
51};
52use thiserror::Error;
53
54/// The number of bytes used by, added to, or removed from a [`Table`]'s share of a [`BlobStore`].
55#[derive(Copy, Clone, PartialEq, Eq, Debug, Default, From, Add, Sub, AddAssign, SubAssign)]
56pub struct BlobNumBytes(usize);
57
58impl MemoryUsage for BlobNumBytes {}
59
60pub type SeqIdList = SmallVec<[SequenceId; 4]>;
61static_assert_size!(SeqIdList, 24);
62
63/// A database table containing the row schema, the rows, and indices.
64///
65/// The table stores the rows into a page manager
66/// and uses an internal map to ensure that no identical row is stored more than once.
67#[derive(Debug, PartialEq, Eq)]
68pub struct Table {
69    /// Page manager and row layout grouped together, for `RowRef` purposes.
70    inner: TableInner,
71    /// Maps `RowHash -> [RowPointer]` where a [`RowPointer`] points into `pages`.
72    /// A [`PointerMap`] is effectively a specialized unique index on all the columns.
73    ///
74    /// In tables without any other unique constraints,
75    /// the pointer map is used to enforce set semantics,
76    /// i.e. to prevent duplicate rows.
77    /// If `self.indexes` contains at least one unique index,
78    /// duplicate rows are impossible regardless, so this will be `None`.
79    pointer_map: Option<PointerMap>,
80    /// The indices associated with a set of columns of the table.
81    pub indexes: BTreeMap<IndexId, TableIndex>,
82    /// The schema of the table, from which the type, and other details are derived.
83    pub schema: Arc<TableSchema>,
84    /// `SquashedOffset::TX_STATE` or `SquashedOffset::COMMITTED_STATE`
85    /// depending on whether this is a tx scratchpad table
86    /// or a committed table.
87    squashed_offset: SquashedOffset,
88    /// Stores number of rows present in table.
89    pub row_count: u64,
90    /// Stores the sum total number of bytes that each blob object in the table occupies.
91    ///
92    /// Note that the [`HashMapBlobStore`] does ref-counting and de-duplication,
93    /// but this sum will count an object each time its hash is mentioned, rather than just once.
94    blob_store_bytes: BlobNumBytes,
95    /// Indicates whether this is a scheduler table or not.
96    ///
97    /// This is an optimization to avoid checking the schema in e.g., `InstanceEnv::{insert, update}`.
98    is_scheduler: bool,
99}
100
101/// The part of a `Table` concerned only with storing rows.
102///
103/// Separated from the "outer" parts of `Table`, especially the `indexes`,
104/// so that `RowRef` can borrow only the `TableInner`,
105/// while other mutable references to the `indexes` exist.
106/// This is necessary because index insertions and deletions take a `RowRef` as an argument,
107/// from which they [`ReadColumn::read_column`] their keys.
108#[derive(Debug, PartialEq, Eq)]
109pub(crate) struct TableInner {
110    /// The type of rows this table stores, with layout information included.
111    row_layout: RowTypeLayout,
112    /// A [`StaticLayout`] for fast BFLATN <-> BSATN conversion,
113    /// if the [`RowTypeLayout`] has a static BSATN length and layout.
114    ///
115    /// A [`StaticBsatnValidator`] is also included.
116    /// It's used to validate BSATN-encoded rows before converting to BFLATN.
117    static_layout: Option<(StaticLayout, StaticBsatnValidator)>,
118    /// The visitor program for `row_layout`.
119    ///
120    /// Must be in the `TableInner` so that [`RowRef::blob_store_bytes`] can use it.
121    visitor_prog: VarLenVisitorProgram,
122    /// The page manager that holds rows
123    /// including both their fixed and variable components.
124    pages: Pages,
125}
126
127impl TableInner {
128    /// Assumes `ptr` is a present row in `self` and returns a [`RowRef`] to it.
129    ///
130    /// # Safety
131    ///
132    /// The requirement is that `table.is_row_present(ptr)` must hold,
133    /// where `table` is the `Table` which contains this `TableInner`.
134    /// That is, `ptr` must refer to a row within `self`
135    /// which was previously inserted and has not been deleted since.
136    ///
137    /// This means:
138    /// - The `PageIndex` of `ptr` must be in-bounds for `self.pages`.
139    /// - The `PageOffset` of `ptr` must be properly aligned for the row type of `self`,
140    ///   and must refer to a valid, live row in that page.
141    /// - The `SquashedOffset` of `ptr` must match the enclosing table's `table.squashed_offset`.
142    ///
143    /// Showing that `ptr` was the result of a call to [`Table::insert(table, ..)`]
144    /// and has not been passed to [`Table::delete_internal_skip_pointer_map(table, ..)`]
145    /// is sufficient to demonstrate all of these properties.
146    unsafe fn get_row_ref_unchecked<'a>(
147        &'a self,
148        blob_store: &'a dyn BlobStore,
149        squashed_offset: SquashedOffset,
150        ptr: RowPointer,
151    ) -> RowRef<'a> {
152        // SAFETY: Forward caller requirements.
153        unsafe { RowRef::new(self, blob_store, squashed_offset, ptr) }
154    }
155
156    /// Returns whether the row at `ptr` is present or not.
157    // TODO: Remove all uses of this method,
158    //       or more likely, gate them behind `debug_assert!`
159    //       so they don't have semantic meaning.
160    //
161    //       Unlike the previous `locking_tx_datastore::Table`'s `RowId`,
162    //       `RowPointer` is not content-addressed.
163    //       This means it is possible to:
164    //       - have a `RowPointer` A* to row A,
165    //       - Delete row A,
166    //       - Insert row B into the same storage as freed from A,
167    //       - Test `is_row_present(A*)`, which falsely reports that row A is still present.
168    //
169    //       In the final interface, this method is superfluous anyways,
170    //       as `RowPointer` is not part of our public interface.
171    //       Instead, we will always discover a known-present `RowPointer`
172    //       during a table scan or index seek.
173    //       As such, our `delete` and `insert` methods can be `unsafe`
174    //       and trust that the `RowPointer` is valid.
175    fn is_row_present(&self, _squashed_offset: SquashedOffset, ptr: RowPointer) -> bool {
176        if _squashed_offset != ptr.squashed_offset() {
177            return false;
178        }
179        let Some((page, offset)) = self.try_page_and_offset(ptr) else {
180            return false;
181        };
182        page.has_row_offset(self.row_layout.size(), offset)
183    }
184
185    fn try_page_and_offset(&self, ptr: RowPointer) -> Option<(&Page, PageOffset)> {
186        (ptr.page_index().idx() < self.pages.len()).then(|| (&self.pages[ptr.page_index()], ptr.page_offset()))
187    }
188
189    /// Returns the page and page offset that `ptr` points to.
190    fn page_and_offset(&self, ptr: RowPointer) -> (&Page, PageOffset) {
191        self.try_page_and_offset(ptr).unwrap()
192    }
193}
194
195static_assert_size!(Table, 264);
196
197impl MemoryUsage for Table {
198    fn heap_usage(&self) -> usize {
199        let Self {
200            inner,
201            pointer_map,
202            indexes,
203            // MEMUSE: intentionally ignoring schema
204            schema: _,
205            squashed_offset,
206            row_count,
207            blob_store_bytes,
208            is_scheduler,
209        } = self;
210        inner.heap_usage()
211            + pointer_map.heap_usage()
212            + indexes.heap_usage()
213            + squashed_offset.heap_usage()
214            + row_count.heap_usage()
215            + blob_store_bytes.heap_usage()
216            + is_scheduler.heap_usage()
217    }
218}
219
220impl MemoryUsage for TableInner {
221    fn heap_usage(&self) -> usize {
222        let Self {
223            row_layout,
224            static_layout,
225            visitor_prog,
226            pages,
227        } = self;
228        row_layout.heap_usage() + static_layout.heap_usage() + visitor_prog.heap_usage() + pages.heap_usage()
229    }
230}
231
232/// There was already a row with the same value.
233#[derive(Error, Debug, PartialEq, Eq)]
234#[error("Duplicate insertion of row {0:?} violates set semantics")]
235pub struct DuplicateError(pub RowPointer);
236
237/// Various error that can happen on table insertion.
238#[derive(Error, Debug, PartialEq, Eq, EnumAsInner)]
239pub enum InsertError {
240    /// There was already a row with the same value.
241    #[error(transparent)]
242    Duplicate(#[from] DuplicateError),
243
244    /// Couldn't write the row to the page manager.
245    #[error(transparent)]
246    Bflatn(#[from] super::bflatn_to::Error),
247
248    /// Some index related error occurred.
249    #[error(transparent)]
250    IndexError(#[from] UniqueConstraintViolation),
251}
252
253/// Errors that can occur while trying to read a value via bsatn.
254#[derive(Error, Debug)]
255pub enum ReadViaBsatnError {
256    #[error(transparent)]
257    BSatnError(#[from] BsatnError),
258
259    #[error(transparent)]
260    DecodeError(#[from] DecodeError),
261}
262
263// Public API:
264impl Table {
265    /// Creates a new empty table with the given `schema` and `squashed_offset`.
266    pub fn new(schema: Arc<TableSchema>, squashed_offset: SquashedOffset) -> Self {
267        let row_layout: RowTypeLayout = schema.get_row_type().clone().into();
268        let static_layout = StaticLayout::for_row_type(&row_layout).map(|sl| (sl, static_bsatn_validator(&row_layout)));
269        let visitor_prog = row_type_visitor(&row_layout);
270        // By default, we start off with an empty pointer map,
271        // which is removed when the first unique index is added.
272        let pm = Some(PointerMap::default());
273        Self::new_raw(schema, row_layout, static_layout, visitor_prog, squashed_offset, pm)
274    }
275
276    /// Returns whether this is a scheduler table.
277    pub fn is_scheduler(&self) -> bool {
278        self.is_scheduler
279    }
280
281    /// Check if the `row` conflicts with any unique index on `self`,
282    /// and if there is a conflict, return `Err`.
283    ///
284    /// `is_deleted` is a predicate which, for a given row pointer,
285    /// returns true if and only if that row should be ignored.
286    /// While checking unique constraints against the committed state,
287    /// `MutTxId::insert` will ignore rows which are listed in the delete table.
288    ///
289    /// # Safety
290    ///
291    /// `row.row_layout() == self.row_layout()` must hold.
292    pub unsafe fn check_unique_constraints<'a, I: Iterator<Item = (&'a IndexId, &'a TableIndex)>>(
293        &'a self,
294        row: RowRef<'_>,
295        adapt: impl FnOnce(btree_map::Iter<'a, IndexId, TableIndex>) -> I,
296        mut is_deleted: impl FnMut(RowPointer) -> bool,
297    ) -> Result<(), UniqueConstraintViolation> {
298        for (&index_id, index) in adapt(self.indexes.iter()).filter(|(_, index)| index.is_unique()) {
299            // SAFETY: Caller promised that `row´ has the same layout as `self`.
300            // Thus, as `index.indexed_columns` is in-bounds of `self`'s layout,
301            // it's also in-bounds of `row`'s layout.
302            let value = unsafe { row.project_unchecked(&index.indexed_columns) };
303            if index.seek_point(&value).next().is_some_and(|ptr| !is_deleted(ptr)) {
304                return Err(self.build_error_unique(index, index_id, value));
305            }
306        }
307        Ok(())
308    }
309
310    /// Insert a `row` into this table, storing its large var-len members in the `blob_store`.
311    ///
312    /// On success, returns the hash, if any, of the newly-inserted row,
313    /// and a `RowRef` referring to the row.s
314    /// The hash is only computed if this table has a [`PointerMap`],
315    /// i.e., does not have any unique indexes.
316    /// If the table has unique indexes,
317    /// the returned `Option<RowHash>` will be `None`.
318    ///
319    /// When a row equal to `row` already exists in `self`,
320    /// returns `InsertError::Duplicate(existing_row_pointer)`,
321    /// where `existing_row_pointer` is a `RowPointer` which identifies the existing row.
322    /// In this case, the duplicate is not inserted,
323    /// but internal data structures may be altered in ways that affect performance and fragmentation.
324    ///
325    /// TODO(error-handling): describe errors from `write_row_to_pages` and return meaningful errors.
326    pub fn insert<'a>(
327        &'a mut self,
328        pool: &PagePool,
329        blob_store: &'a mut dyn BlobStore,
330        row: &ProductValue,
331    ) -> Result<(Option<RowHash>, RowRef<'a>), InsertError> {
332        // Optimistically insert the `row` before checking any constraints
333        // under the assumption that errors (unique constraint & set semantic violations) are rare.
334        let (row_ref, blob_bytes) = self.insert_physically_pv(pool, blob_store, row)?;
335        let row_ptr = row_ref.pointer();
336
337        // Confirm the insertion, checking any constraints, removing the physical row on error.
338        // SAFETY: We just inserted `ptr`, so it must be present.
339        // Re. `CHECK_SAME_ROW = true`,
340        // where `insert` is called, we are not dealing with transactions,
341        // and we already know there cannot be a duplicate row error,
342        // but we check just in case it isn't.
343        let (hash, row_ptr) = unsafe { self.confirm_insertion::<true>(blob_store, row_ptr, blob_bytes) }?;
344        // SAFETY: Per post-condition of `confirm_insertion`, `row_ptr` refers to a valid row.
345        let row_ref = unsafe { self.get_row_ref_unchecked(blob_store, row_ptr) };
346        Ok((hash, row_ref))
347    }
348
349    /// Physically inserts `row` into the page
350    /// without inserting it logically into the pointer map.
351    ///
352    /// This is useful when we need to insert a row temporarily to get back a `RowPointer`.
353    /// A call to this method should be followed by a call to [`delete_internal_skip_pointer_map`].
354    pub fn insert_physically_pv<'a>(
355        &'a mut self,
356        pool: &PagePool,
357        blob_store: &'a mut dyn BlobStore,
358        row: &ProductValue,
359    ) -> Result<(RowRef<'a>, BlobNumBytes), Error> {
360        // SAFETY: `self.pages` is known to be specialized for `self.row_layout`,
361        // as `self.pages` was constructed from `self.row_layout` in `Table::new`.
362        let (ptr, blob_bytes) = unsafe {
363            write_row_to_pages(
364                pool,
365                &mut self.inner.pages,
366                &self.inner.visitor_prog,
367                blob_store,
368                &self.inner.row_layout,
369                row,
370                self.squashed_offset,
371            )
372        }?;
373        // SAFETY: We just inserted `ptr`, so it must be present.
374        let row_ref = unsafe { self.inner.get_row_ref_unchecked(blob_store, self.squashed_offset, ptr) };
375
376        Ok((row_ref, blob_bytes))
377    }
378
379    /// Physically insert a `row`, encoded in BSATN, into this table,
380    /// storing its large var-len members in the `blob_store`.
381    ///
382    /// On success, returns the hash of the newly-inserted row,
383    /// and a `RowRef` referring to the row.
384    ///
385    /// This does not check for set semantic or unique constraints.
386    ///
387    /// This is also useful when we need to insert a row temporarily to get back a `RowPointer`.
388    /// In this case, A call to this method should be followed by a call to [`delete_internal_skip_pointer_map`].
389    ///
390    /// When `row` is not valid BSATN at the table's row type,
391    /// an error is returned and there will be nothing for the caller to revert.
392    pub fn insert_physically_bsatn<'a>(
393        &'a mut self,
394        pool: &PagePool,
395        blob_store: &'a mut dyn BlobStore,
396        row: &[u8],
397    ) -> Result<(RowRef<'a>, BlobNumBytes), Error> {
398        // Got a static layout? => Use fast-path insertion.
399        let (ptr, blob_bytes) = if let Some((static_layout, static_validator)) = self.inner.static_layout.as_ref() {
400            // Before inserting, validate the row, ensuring type safety.
401            // SAFETY: The `static_validator` was derived from the same row layout as the static layout.
402            unsafe { validate_bsatn(static_validator, static_layout, row) }.map_err(Error::Decode)?;
403
404            let fixed_row_size = self.inner.row_layout.size();
405            let squashed_offset = self.squashed_offset;
406            let res = self
407                .inner
408                .pages
409                .with_page_to_insert_row(pool, fixed_row_size, 0, |page| {
410                    // SAFETY: We've used the right `row_size` and we trust that others have too.
411                    // `RowTypeLayout` also ensures that we satisfy the minimum row size.
412                    let fixed_offset = unsafe { page.alloc_fixed_len(fixed_row_size) }.map_err(Error::PageError)?;
413                    let (mut fixed, _) = page.split_fixed_var_mut();
414                    let fixed_buf = fixed.get_row_mut(fixed_offset, fixed_row_size);
415                    // SAFETY:
416                    // - We've validated that `row` is of sufficient length.
417                    // - The `fixed_buf` is exactly the right `fixed_row_size`.
418                    unsafe { static_layout.deserialize_row_into(fixed_buf, row) };
419                    Ok(fixed_offset)
420                })
421                .map_err(Error::PagesError)?;
422            match res {
423                (page, Ok(offset)) => (RowPointer::new(false, page, offset, squashed_offset), 0.into()),
424                (_, Err(e)) => return Err(e),
425            }
426        } else {
427            // SAFETY: `self.pages` is known to be specialized for `self.row_layout`,
428            // as `self.pages` was constructed from `self.row_layout` in `Table::new`.
429            unsafe {
430                write_row_to_pages_bsatn(
431                    pool,
432                    &mut self.inner.pages,
433                    &self.inner.visitor_prog,
434                    blob_store,
435                    &self.inner.row_layout,
436                    row,
437                    self.squashed_offset,
438                )
439            }?
440        };
441
442        // SAFETY: We just inserted `ptr`, so it must be present.
443        let row_ref = unsafe { self.inner.get_row_ref_unchecked(blob_store, self.squashed_offset, ptr) };
444
445        Ok((row_ref, blob_bytes))
446    }
447
448    /// Returns all the columns with sequences that need generation for this `row`.
449    ///
450    /// # Safety
451    ///
452    /// `self.is_row_present(row)` must hold.
453    pub unsafe fn sequence_triggers_for<'a>(
454        &'a self,
455        blob_store: &'a dyn BlobStore,
456        row: RowPointer,
457    ) -> (ColList, SeqIdList) {
458        let sequences = &*self.get_schema().sequences;
459        let row_ty = self.row_layout().product();
460
461        // SAFETY: Caller promised that `self.is_row_present(row)` holds.
462        let row_ref = unsafe { self.get_row_ref_unchecked(blob_store, row) };
463
464        sequences
465            .iter()
466            // Find all the sequences that are triggered by this row.
467            .filter(|seq| {
468                // SAFETY: `seq.col_pos` is in-bounds of `row_ty.elements`
469                // as `row_ty` was derived from the same schema as `seq` is part of.
470                let elem_ty = unsafe { &row_ty.elements.get_unchecked(seq.col_pos.idx()) };
471                // SAFETY:
472                // - `elem_ty` appears as a column in the row type.
473                // - `AlgebraicValue` is compatible with all types.
474                let val = unsafe { AlgebraicValue::unchecked_read_column(row_ref, elem_ty) };
475                val.is_numeric_zero()
476            })
477            .map(|seq| (seq.col_pos, seq.sequence_id))
478            .unzip()
479    }
480
481    /// Writes `seq_val` to the column at `col_id` in the row identified by `ptr`.
482    ///
483    /// Truncates the `seq_val` to fit the type of the column.
484    ///
485    /// # Safety
486    ///
487    /// - `self.is_row_present(row)` must hold.
488    /// - `col_id` must be a valid column, with a primitive integer type, of the row type.
489    pub unsafe fn write_gen_val_to_col(&mut self, col_id: ColId, ptr: RowPointer, seq_val: i128) {
490        let row_ty = self.inner.row_layout.product();
491        // SAFETY: Caller promised that `col_id` was a valid column.
492        let elem_ty = unsafe { row_ty.elements.get_unchecked(col_id.idx()) };
493        let AlgebraicTypeLayout::Primitive(col_typ) = elem_ty.ty else {
494            // SAFETY: Columns with sequences must be primitive types.
495            unsafe { unreachable_unchecked() }
496        };
497
498        let fixed_row_size = self.inner.row_layout.size();
499        let fixed_buf = self.inner.pages[ptr.page_index()].get_fixed_row_data_mut(ptr.page_offset(), fixed_row_size);
500
501        fn write<const N: usize>(dst: &mut [u8], offset: u16, bytes: [u8; N]) {
502            let offset = offset as usize;
503            dst[offset..offset + N].copy_from_slice(&bytes);
504        }
505
506        match col_typ {
507            PrimitiveType::I8 => write(fixed_buf, elem_ty.offset, (seq_val as i8).to_le_bytes()),
508            PrimitiveType::U8 => write(fixed_buf, elem_ty.offset, (seq_val as u8).to_le_bytes()),
509            PrimitiveType::I16 => write(fixed_buf, elem_ty.offset, (seq_val as i16).to_le_bytes()),
510            PrimitiveType::U16 => write(fixed_buf, elem_ty.offset, (seq_val as u16).to_le_bytes()),
511            PrimitiveType::I32 => write(fixed_buf, elem_ty.offset, (seq_val as i32).to_le_bytes()),
512            PrimitiveType::U32 => write(fixed_buf, elem_ty.offset, (seq_val as u32).to_le_bytes()),
513            PrimitiveType::I64 => write(fixed_buf, elem_ty.offset, (seq_val as i64).to_le_bytes()),
514            PrimitiveType::U64 => write(fixed_buf, elem_ty.offset, (seq_val as u64).to_le_bytes()),
515            PrimitiveType::I128 => write(fixed_buf, elem_ty.offset, seq_val.to_le_bytes()),
516            PrimitiveType::U128 => write(fixed_buf, elem_ty.offset, (seq_val as u128).to_le_bytes()),
517            PrimitiveType::I256 => write(fixed_buf, elem_ty.offset, (i256::from(seq_val)).to_le_bytes()),
518            PrimitiveType::U256 => write(fixed_buf, elem_ty.offset, (u256::from(seq_val as u128)).to_le_bytes()),
519            // SAFETY: Columns with sequences must be integer types.
520            PrimitiveType::Bool | PrimitiveType::F32 | PrimitiveType::F64 => unsafe { unreachable_unchecked() },
521        }
522    }
523
524    /// Performs all the checks necessary after having fully decided on a rows contents.
525    ///
526    /// This includes inserting the row into any applicable indices and/or the pointer map.
527    ///
528    /// On `Ok(_)`, statistics of the table are also updated,
529    /// and the `ptr` still points to a valid row, and otherwise not.
530    ///
531    /// If `CHECK_SAME_ROW` holds, an identical row will be treated as a set-semantic duplicate.
532    /// Otherwise, it will be treated as a unique constraint violation.
533    /// However, `false` should only be passed if it's known beforehand that there is no identical row.
534    ///
535    /// # Safety
536    ///
537    /// `self.is_row_present(row)` must hold.
538    pub unsafe fn confirm_insertion<'a, const CHECK_SAME_ROW: bool>(
539        &'a mut self,
540        blob_store: &'a mut dyn BlobStore,
541        ptr: RowPointer,
542        blob_bytes: BlobNumBytes,
543    ) -> Result<(Option<RowHash>, RowPointer), InsertError> {
544        // SAFETY: Caller promised that `self.is_row_present(ptr)` holds.
545        let hash = unsafe { self.insert_into_pointer_map(blob_store, ptr) }?;
546        // SAFETY: Caller promised that `self.is_row_present(ptr)` holds.
547        unsafe { self.insert_into_indices::<CHECK_SAME_ROW>(blob_store, ptr) }?;
548
549        self.update_statistics_added_row(blob_bytes);
550        Ok((hash, ptr))
551    }
552
553    /// Confirms a row update, after first updating indices and checking constraints.
554    ///
555    /// On `Ok(_)`:
556    /// - the statistics of the table are also updated,
557    /// - the `ptr` still points to a valid row.
558    ///
559    /// Otherwise, on `Err(_)`:
560    /// - `ptr` will not point to a valid row,
561    /// - the statistics won't be updated.
562    ///
563    /// # Safety
564    ///
565    /// `self.is_row_present(new_row)` and `self.is_row_present(old_row)`  must hold.
566    pub unsafe fn confirm_update<'a>(
567        &'a mut self,
568        blob_store: &'a mut dyn BlobStore,
569        new_ptr: RowPointer,
570        old_ptr: RowPointer,
571        blob_bytes_added: BlobNumBytes,
572    ) -> Result<RowPointer, InsertError> {
573        // (1) Remove old row from indices.
574        // SAFETY: Caller promised that `self.is_row_present(old_ptr)` holds.
575        unsafe { self.delete_from_indices(blob_store, old_ptr) };
576
577        // Insert new row into indices.
578        // SAFETY: Caller promised that `self.is_row_present(ptr)` holds.
579        let res = unsafe { self.insert_into_indices::<true>(blob_store, new_ptr) };
580        if let Err(e) = res {
581            // Undo (1).
582            unsafe { self.insert_into_indices::<true>(blob_store, old_ptr) }
583                .expect("re-inserting the old row into indices should always work");
584            return Err(e);
585        }
586
587        // Remove the old row physically.
588        // SAFETY: The physical `old_ptr` still exists.
589        let blob_bytes_removed = unsafe { self.delete_internal_skip_pointer_map(blob_store, old_ptr) };
590        self.update_statistics_deleted_row(blob_bytes_removed);
591
592        // Update statistics.
593        self.update_statistics_added_row(blob_bytes_added);
594        Ok(new_ptr)
595    }
596
597    /// We've added a row, update the statistics to record this.
598    #[inline]
599    fn update_statistics_added_row(&mut self, blob_bytes: BlobNumBytes) {
600        self.row_count += 1;
601        self.blob_store_bytes += blob_bytes;
602    }
603
604    /// We've removed a row, update the statistics to record this.
605    #[inline]
606    fn update_statistics_deleted_row(&mut self, blob_bytes: BlobNumBytes) {
607        self.row_count -= 1;
608        self.blob_store_bytes -= blob_bytes;
609    }
610
611    /// Insert row identified by `new` into indices.
612    /// This also checks unique constraints.
613    /// Deletes the row if there were any violations.
614    ///
615    /// If `CHECK_SAME_ROW`, upon a unique constraint violation,
616    /// this will check if it's really a duplicate row.
617    /// Otherwise, the unique constraint violation is returned.
618    ///
619    /// SAFETY: `self.is_row_present(new)` must hold.
620    /// Post-condition: If this method returns `Ok(_)`, the row still exists.
621    unsafe fn insert_into_indices<'a, const CHECK_SAME_ROW: bool>(
622        &'a mut self,
623        blob_store: &'a mut dyn BlobStore,
624        new: RowPointer,
625    ) -> Result<(), InsertError> {
626        self.indexes
627            .iter_mut()
628            .try_for_each(|(index_id, index)| {
629                // SAFETY: We just inserted `ptr`, so it must be present.
630                let new = unsafe { self.inner.get_row_ref_unchecked(blob_store, self.squashed_offset, new) };
631                // SAFETY: any index in this table was constructed with the same row type as this table.
632                let violation = unsafe { index.check_and_insert(new) };
633                violation.map_err(|old| (*index_id, old, new))
634            })
635            .map_err(|(index_id, old, new)| {
636                // Found unique constraint violation!
637                if CHECK_SAME_ROW
638                    // If the index was added in this tx,
639                    // `old` could be a committed row,
640                    // which we want to avoid here.
641                    // TODO(centril): not 100% correct, could still be a duplicate,
642                    // but this is rather pathological and should be fixed when we restructure.
643                    && old.squashed_offset().is_tx_state()
644                    // SAFETY:
645                    // - The row layouts are the same as it's the same table.
646                    // - We know `old` exists in `self` as we just found it in an index.
647                    // - Caller promised that `new` is valid for `self`.
648                    && unsafe { Self::eq_row_in_page(self, old, self, new.pointer()) }
649                {
650                    return (index_id, DuplicateError(old).into());
651                }
652
653                let index = self.indexes.get(&index_id).unwrap();
654                let value = new.project(&index.indexed_columns).unwrap();
655                let error = self.build_error_unique(index, index_id, value).into();
656                (index_id, error)
657            })
658            .map_err(|(index_id, error)| {
659                // Delete row from indices.
660                // Do this before the actual deletion, as `index.delete` needs a `RowRef`
661                // so it can extract the appropriate value.
662                // SAFETY: We just inserted `new`, so it must be present.
663                unsafe { self.delete_from_indices_until(blob_store, new, index_id) };
664
665                // Cleanup, undo the row insertion of `new`s.
666                // SAFETY: We just inserted `new`, so it must be present.
667                unsafe { self.delete_internal(blob_store, new) };
668
669                error
670            })
671    }
672
673    /// Finds the [`RowPointer`] to the row in `target_table` equal, if any,
674    /// to the row `needle_ptr` in `needle_table`,
675    /// by any unique index in `target_table`.
676    ///
677    /// # Safety
678    ///
679    /// - `target_table` and `needle_table` must have the same `row_layout`.
680    /// - `needle_table.is_row_present(needle_ptr)` must hold.
681    unsafe fn find_same_row_via_unique_index(
682        target_table: &Table,
683        needle_table: &Table,
684        needle_bs: &dyn BlobStore,
685        needle_ptr: RowPointer,
686    ) -> Option<RowPointer> {
687        // Use some index (the one with the lowest `IndexId` currently).
688        // TODO(centril): this isn't what we actually want.
689        // Rather, we'd prefer the index with the simplest type,
690        // but this is left as future work as we don't have to optimize this method now.
691        let target_index = target_table
692            .indexes
693            .values()
694            .find(|idx| idx.is_unique())
695            .expect("there should be at least one unique index");
696        // Project the needle row to the columns of the index, and then seek.
697        // As this is a unique index, there are 0-1 rows for this key.
698        let needle_row = unsafe { needle_table.get_row_ref_unchecked(needle_bs, needle_ptr) };
699        let key = needle_row
700            .project(&target_index.indexed_columns)
701            .expect("needle row should be valid");
702        target_index.seek_point(&key).next().filter(|&target_ptr| {
703            // SAFETY:
704            // - Caller promised that the row layouts were the same.
705            // - We know `target_ptr` exists, as it was in `target_index`, belonging to `target_table`.
706            // - Caller promised that `needle_ptr` is valid for `needle_table`.
707            unsafe { Self::eq_row_in_page(target_table, target_ptr, needle_table, needle_ptr) }
708        })
709    }
710
711    /// Insert the row identified by `ptr` into the table's [`PointerMap`],
712    /// if the table has one.
713    ///
714    /// This checks for set semantic violations.
715    /// If a set semantic conflict (i.e. duplicate row) is detected by the pointer map,
716    /// the row will be deleted and an error returned.
717    /// If the pointer map confirms that the row was unique, returns the `RowHash` of that row.
718    ///
719    /// If this table has no `PointerMap`, returns `Ok(None)`.
720    /// In that case, the row's uniqueness will be verified by [`Self::insert_into_indices`],
721    /// as this table has at least one unique index.
722    ///
723    /// SAFETY: `self.is_row_present(row)` must hold.
724    /// Post-condition: If this method returns `Ok(_)`, the row still exists.
725    unsafe fn insert_into_pointer_map<'a>(
726        &'a mut self,
727        blob_store: &'a mut dyn BlobStore,
728        ptr: RowPointer,
729    ) -> Result<Option<RowHash>, DuplicateError> {
730        if self.pointer_map.is_none() {
731            // No pointer map? Set semantic constraint is checked by a unique index instead.
732            return Ok(None);
733        };
734
735        // SAFETY:
736        // - `self` trivially has the same `row_layout` as `self`.
737        // - Caller promised that `self.is_row_present(row)` holds.
738        let (hash, existing_row) = unsafe { Self::find_same_row_via_pointer_map(self, self, blob_store, ptr, None) };
739
740        if let Some(existing_row) = existing_row {
741            // If an equal row was already present,
742            // roll back our optimistic insert to avoid violating set semantics.
743
744            // SAFETY: Caller promised that `ptr` is a valid row in `self`.
745            unsafe {
746                self.inner
747                    .pages
748                    .delete_row(&self.inner.visitor_prog, self.row_size(), ptr, blob_store)
749            };
750            return Err(DuplicateError(existing_row));
751        }
752
753        // If the optimistic insertion was correct,
754        // i.e. this is not a set-semantic duplicate,
755        // add it to the `pointer_map`.
756        self.pointer_map
757            .as_mut()
758            .expect("pointer map should exist, as it did previously")
759            .insert(hash, ptr);
760
761        Ok(Some(hash))
762    }
763
764    /// Returns the list of pointers to rows which hash to `row_hash`.
765    ///
766    /// If `self` does not have a [`PointerMap`], always returns the empty slice.
767    fn pointers_for(&self, row_hash: RowHash) -> &[RowPointer] {
768        self.pointer_map.as_ref().map_or(&[], |pm| pm.pointers_for(row_hash))
769    }
770
771    /// Using the [`PointerMap`],
772    /// searches `target_table` for a row equal to `needle_table[needle_ptr]`.
773    ///
774    /// Rows are compared for equality by [`eq_row_in_page`].
775    ///
776    /// Lazily computes the row hash if needed and returns it, or uses the one provided, if any.
777    ///
778    /// Used for detecting set-semantic duplicates when inserting
779    /// into tables without any unique constraints.
780    ///
781    /// Does nothing and always returns `None` if `target_table` does not have a `PointerMap`,
782    /// in which case the caller should instead use [`Self::find_same_row_via_unique_index`].
783    ///
784    /// Note that we don't need the blob store to compute equality,
785    /// as content-addressing means it's sufficient to compare the hashes of large blobs.
786    /// (If we see a collision in `BlobHash` we have bigger problems.)
787    ///
788    /// # Safety
789    ///
790    /// - `target_table` and `needle_table` must have the same `row_layout`.
791    /// - `needle_table.is_row_present(needle_ptr)`.
792    pub unsafe fn find_same_row_via_pointer_map(
793        target_table: &Table,
794        needle_table: &Table,
795        needle_bs: &dyn BlobStore,
796        needle_ptr: RowPointer,
797        row_hash: Option<RowHash>,
798    ) -> (RowHash, Option<RowPointer>) {
799        let row_hash = row_hash.unwrap_or_else(|| {
800            // SAFETY: Caller promised that `needle_table.is_row_present(needle_ptr)`.
801            let row_ref = unsafe { needle_table.get_row_ref_unchecked(needle_bs, needle_ptr) };
802            row_ref.row_hash()
803        });
804
805        // Scan all the frow pointers with `row_hash` in the `committed_table`.
806        let row_ptr = target_table.pointers_for(row_hash).iter().copied().find(|&target_ptr| {
807            // SAFETY:
808            // - Caller promised that the row layouts were the same.
809            // - We know `target_ptr` exists, as it was found in a pointer map.
810            // - Caller promised that `needle_ptr` is valid for `needle_table`.
811            unsafe { Self::eq_row_in_page(target_table, target_ptr, needle_table, needle_ptr) }
812        });
813
814        (row_hash, row_ptr)
815    }
816
817    /// Returns whether the row `target_ptr` in `target_table`
818    /// is exactly equal to the row `needle_ptr` in `needle_ptr`.
819    ///
820    /// # Safety
821    ///
822    /// - `target_table` and `needle_table` must have the same `row_layout`.
823    /// - `target_table.is_row_present(target_ptr)`.
824    /// - `needle_table.is_row_present(needle_ptr)`.
825    pub unsafe fn eq_row_in_page(
826        target_table: &Table,
827        target_ptr: RowPointer,
828        needle_table: &Table,
829        needle_ptr: RowPointer,
830    ) -> bool {
831        let (target_page, target_offset) = target_table.inner.page_and_offset(target_ptr);
832        let (needle_page, needle_offset) = needle_table.inner.page_and_offset(needle_ptr);
833
834        // SAFETY:
835        // - Caller promised that `target_ptr` is valid, so `target_page` and `target_offset` are both valid.
836        // - Caller promised that `needle_ptr` is valid, so `needle_page` and `needle_offset` are both valid.
837        // - Caller promised that the layouts of `target_table` and `needle_table` are the same,
838        //   so `target_table` applies to both.
839        //   Moreover `(x: Table).inner.static_layout` is always derived from `x.row_layout`.
840        unsafe {
841            eq_row_in_page(
842                target_page,
843                needle_page,
844                target_offset,
845                needle_offset,
846                &target_table.inner.row_layout,
847                target_table.static_layout(),
848            )
849        }
850    }
851
852    /// Searches `target_table` for a row equal to `needle_table[needle_ptr]`,
853    /// and returns the [`RowPointer`] to that row in `target_table`, if it exists.
854    ///
855    /// Searches using the [`PointerMap`] or a unique index, as appropriate for the table.
856    ///
857    /// Lazily computes the row hash if needed and returns it, or uses the one provided, if any.
858    ///
859    /// # Safety
860    ///
861    /// - `target_table` and `needle_table` must have the same `row_layout`.
862    /// - `needle_table.is_row_present(needle_ptr)` must hold.
863    pub unsafe fn find_same_row(
864        target_table: &Table,
865        needle_table: &Table,
866        needle_bs: &dyn BlobStore,
867        needle_ptr: RowPointer,
868        row_hash: Option<RowHash>,
869    ) -> (Option<RowHash>, Option<RowPointer>) {
870        if target_table.pointer_map.is_some() {
871            // SAFETY: Caller promised that `target_table` and `needle_table` have the same `row_layout`.
872            // SAFETY: Caller promised that `needle_table.is_row_present(needle_ptr)`.
873            let (row_hash, row_ptr) = unsafe {
874                Self::find_same_row_via_pointer_map(target_table, needle_table, needle_bs, needle_ptr, row_hash)
875            };
876            (Some(row_hash), row_ptr)
877        } else {
878            (
879                row_hash,
880                // SAFETY: Caller promised that `target_table` and `needle_table` have the same `row_layout`.
881                // SAFETY: Caller promised that `needle_table.is_row_present(needle_ptr)`.
882                unsafe { Self::find_same_row_via_unique_index(target_table, needle_table, needle_bs, needle_ptr) },
883            )
884        }
885    }
886
887    /// Returns a [`RowRef`] for `ptr` or `None` if the row isn't present.
888    pub fn get_row_ref<'a>(&'a self, blob_store: &'a dyn BlobStore, ptr: RowPointer) -> Option<RowRef<'a>> {
889        self.is_row_present(ptr)
890            // SAFETY: We only call `get_row_ref_unchecked` when `is_row_present` holds.
891            .then(|| unsafe { self.get_row_ref_unchecked(blob_store, ptr) })
892    }
893
894    /// Assumes `ptr` is a present row in `self` and returns a [`RowRef`] to it.
895    ///
896    /// # Safety
897    ///
898    /// The requirement is that `self.is_row_present(ptr)` must hold.
899    /// That is, `ptr` must refer to a row within `self`
900    /// which was previously inserted and has not been deleted since.
901    ///
902    /// This means:
903    /// - The `PageIndex` of `ptr` must be in-bounds for `self.pages`.
904    /// - The `PageOffset` of `ptr` must be properly aligned for the row type of `self`,
905    ///   and must refer to a valid, live row in that page.
906    /// - The `SquashedOffset` of `ptr` must match `self.squashed_offset`.
907    ///
908    /// Showing that `ptr` was the result of a call to [`Table::insert(table, ..)`]
909    /// and has not been passed to [`Table::delete(table, ..)`]
910    /// is sufficient to demonstrate all of these properties.
911    pub unsafe fn get_row_ref_unchecked<'a>(&'a self, blob_store: &'a dyn BlobStore, ptr: RowPointer) -> RowRef<'a> {
912        // SAFETY: Caller promised that ^-- holds.
913        unsafe { self.inner.get_row_ref_unchecked(blob_store, self.squashed_offset, ptr) }
914    }
915
916    /// Deletes a row in the page manager
917    /// without deleting it logically in the pointer map.
918    ///
919    /// # Safety
920    ///
921    /// `ptr` must point to a valid, live row in this table.
922    pub unsafe fn delete_internal_skip_pointer_map(
923        &mut self,
924        blob_store: &mut dyn BlobStore,
925        ptr: RowPointer,
926    ) -> BlobNumBytes {
927        debug_assert!(self.is_row_present(ptr));
928        // Delete the physical row.
929        //
930        // SAFETY:
931        // - `ptr` points to a valid row in this table, per our invariants.
932        // - `self.row_size` known to be consistent with `self.pages`,
933        //    as the two are tied together in `Table::new`.
934        unsafe {
935            self.inner
936                .pages
937                .delete_row(&self.inner.visitor_prog, self.row_size(), ptr, blob_store)
938        }
939    }
940
941    /// Deletes the row identified by `ptr` from the table.
942    ///
943    /// Returns the number of blob bytes added. This method does not update statistics by itself.
944    ///
945    /// NOTE: This method skips updating indexes.
946    /// Use `delete_unchecked` or `delete` to delete a row with index updating.
947    ///
948    /// SAFETY: `self.is_row_present(row)` must hold.
949    unsafe fn delete_internal(&mut self, blob_store: &mut dyn BlobStore, ptr: RowPointer) -> BlobNumBytes {
950        // Remove the set semantic association.
951        if let Some(pointer_map) = &mut self.pointer_map {
952            // SAFETY: `self.is_row_present(row)` holds.
953            let row = unsafe { self.inner.get_row_ref_unchecked(blob_store, self.squashed_offset, ptr) };
954
955            let _remove_result = pointer_map.remove(row.row_hash(), ptr);
956            debug_assert!(_remove_result);
957        }
958
959        // Delete the physical row.
960        // SAFETY: `ptr` points to a valid row in this table as `self.is_row_present(row)` holds.
961        unsafe { self.delete_internal_skip_pointer_map(blob_store, ptr) }
962    }
963
964    /// Deletes the row identified by `ptr` from the table.
965    ///
966    /// Returns the number of blob bytes deleted. This method does not update statistics by itself.
967    ///
968    /// SAFETY: `self.is_row_present(row)` must hold.
969    unsafe fn delete_unchecked(&mut self, blob_store: &mut dyn BlobStore, ptr: RowPointer) -> BlobNumBytes {
970        // Delete row from indices.
971        // Do this before the actual deletion, as `index.delete` needs a `RowRef`
972        // so it can extract the appropriate value.
973        // SAFETY: Caller promised that `self.is_row_present(row)` holds.
974        unsafe { self.delete_from_indices(blob_store, ptr) };
975
976        // SAFETY: Caller promised that `self.is_row_present(row)` holds.
977        unsafe { self.delete_internal(blob_store, ptr) }
978    }
979
980    /// Delete `row_ref` from all the indices of this table until `index_id` is reached.
981    /// The range is exclusive of `index_id`.
982    ///
983    /// SAFETY: `self.is_row_present(row)` must hold.
984    unsafe fn delete_from_indices_until(&mut self, blob_store: &dyn BlobStore, ptr: RowPointer, index_id: IndexId) {
985        // SAFETY: Caller promised that `self.is_row_present(row)` holds.
986        let row_ref = unsafe { self.inner.get_row_ref_unchecked(blob_store, self.squashed_offset, ptr) };
987
988        for (_, index) in self.indexes.range_mut(..index_id) {
989            index.delete(row_ref).unwrap();
990        }
991    }
992
993    /// Delete `row_ref` from all the indices of this table.
994    ///
995    /// SAFETY: `self.is_row_present(row)` must hold.
996    unsafe fn delete_from_indices(&mut self, blob_store: &dyn BlobStore, ptr: RowPointer) {
997        // SAFETY: Caller promised that `self.is_row_present(row)` holds.
998        let row_ref = unsafe { self.inner.get_row_ref_unchecked(blob_store, self.squashed_offset, ptr) };
999
1000        for index in self.indexes.values_mut() {
1001            index.delete(row_ref).unwrap();
1002        }
1003    }
1004
1005    /// Deletes the row identified by `ptr` from the table.
1006    ///
1007    /// The function `before` is run on the to-be-deleted row,
1008    /// if it is present, before deleting.
1009    /// This enables callers to extract the deleted row.
1010    /// E.g. applying deletes when squashing/merging a transaction into the committed state
1011    /// passes `|row| row.to_product_value()` as `before`
1012    /// so that the resulting `ProductValue`s can be passed to the subscription evaluator.
1013    pub fn delete<'a, R>(
1014        &'a mut self,
1015        blob_store: &'a mut dyn BlobStore,
1016        ptr: RowPointer,
1017        before: impl for<'b> FnOnce(RowRef<'b>) -> R,
1018    ) -> Option<R> {
1019        if !self.is_row_present(ptr) {
1020            return None;
1021        };
1022
1023        // SAFETY: We only call `get_row_ref_unchecked` when `is_row_present` holds.
1024        let row_ref = unsafe { self.get_row_ref_unchecked(blob_store, ptr) };
1025
1026        let ret = before(row_ref);
1027
1028        // SAFETY: We've checked above that `self.is_row_present(ptr)`.
1029        let blob_bytes_deleted = unsafe { self.delete_unchecked(blob_store, ptr) };
1030        self.update_statistics_deleted_row(blob_bytes_deleted);
1031
1032        Some(ret)
1033    }
1034
1035    /// If a row exists in `self` which matches `row`
1036    /// by [`Table::find_same_row`],
1037    /// delete that row.
1038    ///
1039    /// If a matching row was found, returns the pointer to that row.
1040    /// The returned pointer is now invalid, as the row to which it referred has been deleted.
1041    ///
1042    /// This operation works by temporarily inserting the `row` into `self`,
1043    /// checking `find_same_row` on the newly-inserted row,
1044    /// deleting the matching row if it exists,
1045    /// then deleting the temporary insertion.
1046    pub fn delete_equal_row(
1047        &mut self,
1048        pool: &PagePool,
1049        blob_store: &mut dyn BlobStore,
1050        row: &ProductValue,
1051    ) -> Result<Option<RowPointer>, Error> {
1052        // Insert `row` temporarily so `temp_ptr` and `hash` can be used to find the row.
1053        // This must avoid consulting and inserting to the pointer map,
1054        // as the row is already present, set-semantically.
1055        let (temp_row, _) = self.insert_physically_pv(pool, blob_store, row)?;
1056        let temp_ptr = temp_row.pointer();
1057
1058        // Find the row equal to the passed-in `row`.
1059        // This uses one of two approaches.
1060        // Either there is a pointer map, so we use that,
1061        // or, here is at least one unique index, so we use one of them.
1062        //
1063        // SAFETY:
1064        // - `self` trivially has the same `row_layout` as `self`.
1065        // - We just inserted `temp_ptr`, so it's valid.
1066        let (_, existing_row_ptr) = unsafe { Self::find_same_row(self, self, blob_store, temp_ptr, None) };
1067
1068        // If an equal row was present, delete it.
1069        if let Some(existing_row_ptr) = existing_row_ptr {
1070            let blob_bytes_deleted = unsafe {
1071                // SAFETY: `find_same_row` ensures that the pointer is valid.
1072                self.delete_unchecked(blob_store, existing_row_ptr)
1073            };
1074            self.update_statistics_deleted_row(blob_bytes_deleted);
1075        }
1076
1077        // Remove the temporary row we inserted in the beginning.
1078        // Avoid the pointer map, since we don't want to delete it twice.
1079        // SAFETY: `ptr` is valid as we just inserted it.
1080        unsafe {
1081            self.delete_internal_skip_pointer_map(blob_store, temp_ptr);
1082        }
1083
1084        Ok(existing_row_ptr)
1085    }
1086
1087    /// Returns the row type for rows in this table.
1088    pub fn get_row_type(&self) -> &ProductType {
1089        self.get_schema().get_row_type()
1090    }
1091
1092    /// Returns the schema for this table.
1093    pub fn get_schema(&self) -> &Arc<TableSchema> {
1094        &self.schema
1095    }
1096
1097    /// Runs a mutation on the [`TableSchema`] of this table.
1098    ///
1099    /// This uses a clone-on-write mechanism.
1100    /// If none but `self` refers to the schema, then the mutation will be in-place.
1101    /// Otherwise, the schema must be cloned, mutated,
1102    /// and then the cloned version is written back to the table.
1103    pub fn with_mut_schema<R>(&mut self, with: impl FnOnce(&mut TableSchema) -> R) -> R {
1104        with(Arc::make_mut(&mut self.schema))
1105    }
1106
1107    /// Returns a new [`TableIndex`] for `table`.
1108    pub fn new_index(&self, algo: &IndexAlgorithm, is_unique: bool) -> Result<TableIndex, InvalidFieldError> {
1109        TableIndex::new(self.get_schema().get_row_type(), algo, is_unique)
1110    }
1111
1112    /// Inserts a new `index` into the table.
1113    ///
1114    /// The index will be populated using the rows of the table.
1115    ///
1116    /// # Panics
1117    ///
1118    /// Panics if any row would violate `index`'s unique constraint, if it has one.
1119    ///
1120    /// # Safety
1121    ///
1122    /// Caller must promise that `index` was constructed with the same row type/layout as this table.
1123    pub unsafe fn insert_index(&mut self, blob_store: &dyn BlobStore, index_id: IndexId, mut index: TableIndex) {
1124        let rows = self.scan_rows(blob_store);
1125        // SAFETY: Caller promised that table's row type/layout
1126        // matches that which `index` was constructed with.
1127        // It follows that this applies to any `rows`, as required.
1128        let violation = unsafe { index.build_from_rows(rows) };
1129        violation.unwrap_or_else(|ptr| {
1130            panic!("adding `index` should cause no unique constraint violations, but {ptr:?} would")
1131        });
1132        // SAFETY: Forward caller requirement.
1133        unsafe { self.add_index(index_id, index) };
1134    }
1135
1136    /// Adds an index to the table without populating.
1137    ///
1138    /// # Safety
1139    ///
1140    /// Caller must promise that `index` was constructed with the same row type/layout as this table.
1141    pub unsafe fn add_index(&mut self, index_id: IndexId, index: TableIndex) -> Option<PointerMap> {
1142        let is_unique = index.is_unique();
1143        self.indexes.insert(index_id, index);
1144
1145        // Remove the pointer map, if any.
1146        if is_unique {
1147            self.pointer_map.take()
1148        } else {
1149            None
1150        }
1151    }
1152
1153    /// Removes an index from the table.
1154    ///
1155    /// Returns whether an index existed with `index_id`.
1156    pub fn delete_index(
1157        &mut self,
1158        blob_store: &dyn BlobStore,
1159        index_id: IndexId,
1160        pointer_map: Option<PointerMap>,
1161    ) -> Option<(TableIndex, IndexSchema)> {
1162        let index = self.indexes.remove(&index_id)?;
1163
1164        // If we removed the last unique index, add a pointer map.
1165        if index.is_unique() && !self.indexes.values().any(|idx| idx.is_unique()) {
1166            self.pointer_map = Some(pointer_map.unwrap_or_else(|| self.rebuild_pointer_map(blob_store)));
1167        }
1168
1169        // Remove index from schema.
1170        //
1171        // This likely will do a clone-write as over time?
1172        // The schema might have found other referents.
1173        let schema = self
1174            .with_mut_schema(|s| s.remove_index(index_id))
1175            .expect("there should be an index with `index_id`");
1176        Some((index, schema))
1177    }
1178
1179    /// Returns an iterator over all the rows of `self`, yielded as [`RefRef`]s.
1180    pub fn scan_rows<'a>(&'a self, blob_store: &'a dyn BlobStore) -> TableScanIter<'a> {
1181        TableScanIter {
1182            current_page: None, // Will be filled by the iterator.
1183            current_page_idx: PageIndex(0),
1184            table: self,
1185            blob_store,
1186        }
1187    }
1188
1189    /// Returns this table combined with the index for [`IndexId`], if any.
1190    pub fn get_index_by_id_with_table<'a>(
1191        &'a self,
1192        blob_store: &'a dyn BlobStore,
1193        index_id: IndexId,
1194    ) -> Option<TableAndIndex<'a>> {
1195        Some(TableAndIndex {
1196            table: self,
1197            blob_store,
1198            index: self.get_index_by_id(index_id)?,
1199        })
1200    }
1201
1202    /// Returns the [`TableIndex`] for this [`IndexId`].
1203    pub fn get_index_by_id(&self, index_id: IndexId) -> Option<&TableIndex> {
1204        self.indexes.get(&index_id)
1205    }
1206
1207    /// Returns this table combined with the first index with `cols`, if any.
1208    pub fn get_index_by_cols_with_table<'a>(
1209        &'a self,
1210        blob_store: &'a dyn BlobStore,
1211        cols: &ColList,
1212    ) -> Option<TableAndIndex<'a>> {
1213        let (_, index) = self.get_index_by_cols(cols)?;
1214        Some(TableAndIndex {
1215            table: self,
1216            blob_store,
1217            index,
1218        })
1219    }
1220
1221    /// Returns the first [`TableIndex`] with the given [`ColList`].
1222    pub fn get_index_by_cols(&self, cols: &ColList) -> Option<(IndexId, &TableIndex)> {
1223        self.indexes
1224            .iter()
1225            .find(|(_, index)| &index.indexed_columns == cols)
1226            .map(|(id, idx)| (*id, idx))
1227    }
1228
1229    /// Clones the structure of this table into a new one with
1230    /// the same schema, visitor program, and indices.
1231    /// The new table will be completely empty
1232    /// and will use the given `squashed_offset` instead of that of `self`.
1233    pub fn clone_structure(&self, squashed_offset: SquashedOffset) -> Self {
1234        // Clone a bunch of static data.
1235        // NOTE(centril): It's important that these be cheap to clone.
1236        // This is why they are all `Arc`ed or have some sort of small-vec optimization.
1237        let schema = self.schema.clone();
1238        let layout = self.row_layout().clone();
1239        let sbl = self.inner.static_layout.clone();
1240        let visitor = self.inner.visitor_prog.clone();
1241
1242        // If we had a pointer map, we'll have one in the cloned one as well, but empty.
1243        let pm = self.pointer_map.as_ref().map(|_| PointerMap::default());
1244
1245        // Make the new table.
1246        let mut new = Table::new_raw(schema, layout, sbl, visitor, squashed_offset, pm);
1247
1248        // Clone the index structure. The table is empty, so no need to `build_from_rows`.
1249        for (&index_id, index) in self.indexes.iter() {
1250            new.indexes.insert(index_id, index.clone_structure());
1251        }
1252        new
1253    }
1254
1255    /// Returns the number of bytes occupied by the pages and the blob store.
1256    /// Note that result can be more than the actual physical size occupied by the table
1257    /// because the blob store implementation can do internal optimizations.
1258    /// For more details, refer to the documentation of `self.blob_store_bytes`.
1259    pub fn bytes_occupied_overestimate(&self) -> usize {
1260        (self.num_pages() * PAGE_DATA_SIZE) + (self.blob_store_bytes.0)
1261    }
1262
1263    /// Reset the internal storage of `self` to be `pages`.
1264    ///
1265    /// This recomputes the pointer map based on the `pages`,
1266    /// but does not recompute indexes.
1267    ///
1268    /// Used when restoring from a snapshot.
1269    ///
1270    /// # Safety
1271    ///
1272    /// The schema of rows stored in the `pages` must exactly match `self.schema` and `self.inner.row_layout`.
1273    pub unsafe fn set_pages(&mut self, pages: Vec<Box<Page>>, blob_store: &dyn BlobStore) {
1274        self.inner.pages.set_contents(pages, self.inner.row_layout.size());
1275
1276        // Recompute table metadata based on the new pages.
1277        // Compute the row count first, in case later computations want to use it as a capacity to pre-allocate.
1278        self.compute_row_count(blob_store);
1279        self.pointer_map = Some(self.rebuild_pointer_map(blob_store));
1280    }
1281
1282    /// Consumes the table, returning some constituents needed for merge.
1283    pub fn consume_for_merge(
1284        self,
1285    ) -> (
1286        Arc<TableSchema>,
1287        impl Iterator<Item = (IndexId, TableIndex)>,
1288        impl Iterator<Item = Box<Page>>,
1289    ) {
1290        (self.schema, self.indexes.into_iter(), self.inner.pages.into_page_iter())
1291    }
1292
1293    /// Returns the number of rows resident in this table.
1294    ///
1295    /// This method runs in constant time.
1296    pub fn num_rows(&self) -> u64 {
1297        self.row_count
1298    }
1299
1300    #[cfg(test)]
1301    fn reconstruct_num_rows(&self) -> u64 {
1302        self.pages().iter().map(|page| page.reconstruct_num_rows() as u64).sum()
1303    }
1304
1305    /// Returns the number of bytes used by rows resident in this table.
1306    ///
1307    /// This includes data bytes, padding bytes and some overhead bytes,
1308    /// as described in the docs for [`Page::bytes_used_by_rows`],
1309    /// but *does not* include:
1310    ///
1311    /// - Unallocated space within pages.
1312    /// - Per-page overhead (e.g. page headers).
1313    /// - Table overhead (e.g. the [`RowTypeLayout`], [`PointerMap`], [`Schema`] &c).
1314    /// - Indexes.
1315    /// - Large blobs in the [`BlobStore`].
1316    ///
1317    /// Of these, the caller should inspect the blob store in order to account for memory usage by large blobs,
1318    /// and call [`Self::bytes_used_by_index_keys`] to account for indexes,
1319    /// but we intend to eat all the other overheads when billing.
1320    ///
1321    // TODO(perf, centril): consider storing the total number of granules in the table instead
1322    // so that this runs in constant time rather than O(|Pages|).
1323    pub fn bytes_used_by_rows(&self) -> u64 {
1324        self.pages()
1325            .iter()
1326            .map(|page| page.bytes_used_by_rows(self.inner.row_layout.size()) as u64)
1327            .sum()
1328    }
1329
1330    #[cfg(test)]
1331    fn reconstruct_bytes_used_by_rows(&self) -> u64 {
1332        self.pages()
1333            .iter()
1334            .map(|page| unsafe {
1335                // Safety: `page` is in `self`, and was constructed using `self.innser.row_layout` and `self.inner.visitor_prog`,
1336                // so the three are mutually consistent.
1337                page.reconstruct_bytes_used_by_rows(self.inner.row_layout.size(), &self.inner.visitor_prog)
1338            } as u64)
1339            .sum()
1340    }
1341
1342    /// Returns the number of indices in this table.
1343    pub fn num_indices(&self) -> usize {
1344        self.indexes.len()
1345    }
1346
1347    /// Returns the number of rows (or [`RowPointer`]s, more accurately)
1348    /// stored in indexes by this table.
1349    ///
1350    /// This method runs in constant time.
1351    pub fn num_rows_in_indexes(&self) -> u64 {
1352        // Assume that each index contains all rows in the table.
1353        self.num_rows() * self.indexes.len() as u64
1354    }
1355
1356    /// Returns the number of bytes used by keys stored in indexes by this table.
1357    ///
1358    /// This method scales in runtime with the number of indexes in the table,
1359    /// but not with the number of pages or rows.
1360    ///
1361    /// Key size is measured using a metric called "key size" or "data size,"
1362    /// which is intended to capture the number of live user-supplied bytes,
1363    /// not including representational overhead.
1364    /// This is distinct from the BFLATN size measured by [`Self::bytes_used_by_rows`].
1365    /// See the trait [`crate::btree_index::KeySize`] for specifics on the metric measured.
1366    pub fn bytes_used_by_index_keys(&self) -> u64 {
1367        self.indexes.values().map(|idx| idx.num_key_bytes()).sum()
1368    }
1369}
1370
1371/// A reference to a single row within a table.
1372///
1373/// # Safety
1374///
1375/// Having a `r: RowRef` is a proof that [`r.pointer()`](RowRef::pointer) refers to a valid row.
1376/// This makes constructing a `RowRef`, i.e., `RowRef::new`, an `unsafe` operation.
1377#[derive(Copy, Clone)]
1378pub struct RowRef<'a> {
1379    /// The table that has the row at `self.pointer`.
1380    table: &'a TableInner,
1381    /// The blob store used in case there are blob hashes to resolve.
1382    blob_store: &'a dyn BlobStore,
1383    /// The pointer to the row in `self.table`.
1384    pointer: RowPointer,
1385}
1386
1387impl fmt::Debug for RowRef<'_> {
1388    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1389        fmt.debug_struct("RowRef")
1390            .field("pointer", &self.pointer)
1391            .field("value", &self.to_product_value())
1392            .finish_non_exhaustive()
1393    }
1394}
1395
1396impl<'a> RowRef<'a> {
1397    /// Construct a `RowRef` to the row at `pointer` within `table`.
1398    ///
1399    /// # Safety
1400    ///
1401    /// `pointer` must refer to a row within `table`
1402    /// which was previously inserted and has not been deleted since.
1403    ///
1404    /// This means:
1405    /// - The `PageIndex` of `pointer` must be in-bounds for `table.pages`.
1406    /// - The `PageOffset` of `pointer` must be properly aligned for the row type of `table`,
1407    ///   and must refer to a valid, live row in that page.
1408    /// - The `SquashedOffset` of `pointer` must match `table.squashed_offset`.
1409    ///
1410    /// Showing that `pointer` was the result of a call to `table.insert`
1411    /// and has not been passed to `table.delete`
1412    /// is sufficient to demonstrate all of these properties.
1413    unsafe fn new(
1414        table: &'a TableInner,
1415        blob_store: &'a dyn BlobStore,
1416        _squashed_offset: SquashedOffset,
1417        pointer: RowPointer,
1418    ) -> Self {
1419        debug_assert!(table.is_row_present(_squashed_offset, pointer));
1420        Self {
1421            table,
1422            blob_store,
1423            pointer,
1424        }
1425    }
1426
1427    /// Extract a `ProductValue` from the table.
1428    ///
1429    /// This is a potentially expensive operation,
1430    /// as it must walk the table's `ProductTypeLayout`
1431    /// and heap-allocate various substructures of the `ProductValue`.
1432    pub fn to_product_value(&self) -> ProductValue {
1433        let res = self
1434            .serialize(ValueSerializer)
1435            .unwrap_or_else(|x| match x {})
1436            .into_product();
1437        // SAFETY: the top layer of a row when serialized is always a product.
1438        unsafe { res.unwrap_unchecked() }
1439    }
1440
1441    /// Check that the `idx`th column of the row type stored by `self` is compatible with `T`,
1442    /// and read the value of that column from `self`.
1443    #[inline]
1444    pub fn read_col<T: ReadColumn>(self, col: impl Into<ColId>) -> Result<T, TypeError> {
1445        T::read_column(self, col.into().idx())
1446    }
1447
1448    /// Construct a projection of the row at `self` by extracting the `cols`.
1449    ///
1450    /// If `cols` contains zero or more than one column, the values of the projected columns are wrapped in a [`ProductValue`].
1451    /// If `cols` is a single column, the value of that column is returned without wrapping in a `ProductValue`.
1452    ///
1453    /// # Safety
1454    ///
1455    /// - `cols` must not specify any column which is out-of-bounds for the row `self´.
1456    pub unsafe fn project_unchecked(self, cols: &ColList) -> AlgebraicValue {
1457        let col_layouts = &self.row_layout().product().elements;
1458
1459        if let Some(head) = cols.as_singleton() {
1460            let head = head.idx();
1461            // SAFETY: caller promised that `head` is in-bounds of `col_layouts`.
1462            let col_layout = unsafe { col_layouts.get_unchecked(head) };
1463            // SAFETY:
1464            // - `col_layout` was just derived from the row layout.
1465            // - `AlgebraicValue` is compatible with any  `col_layout`.
1466            // - `self` is a valid row and offsetting to `col_layout` is valid.
1467            return unsafe { AlgebraicValue::unchecked_read_column(self, col_layout) };
1468        }
1469        let mut elements = Vec::with_capacity(cols.len() as usize);
1470        for col in cols.iter() {
1471            let col = col.idx();
1472            // SAFETY: caller promised that any `col` is in-bounds of `col_layouts`.
1473            let col_layout = unsafe { col_layouts.get_unchecked(col) };
1474            // SAFETY:
1475            // - `col_layout` was just derived from the row layout.
1476            // - `AlgebraicValue` is compatible with any  `col_layout`.
1477            // - `self` is a valid row and offsetting to `col_layout` is valid.
1478            elements.push(unsafe { AlgebraicValue::unchecked_read_column(self, col_layout) });
1479        }
1480        AlgebraicValue::product(elements)
1481    }
1482
1483    /// Construct a projection of the row at `self` by extracting the `cols`.
1484    ///
1485    /// Returns an error if `cols` specifies an index which is out-of-bounds for the row at `self`.
1486    ///
1487    /// If `cols` contains zero or more than one column, the values of the projected columns are wrapped in a [`ProductValue`].
1488    /// If `cols` is a single column, the value of that column is returned without wrapping in a `ProductValue`.
1489    pub fn project(self, cols: &ColList) -> Result<AlgebraicValue, InvalidFieldError> {
1490        if let Some(head) = cols.as_singleton() {
1491            return self.read_col(head).map_err(|_| head.into());
1492        }
1493        let mut elements = Vec::with_capacity(cols.len() as usize);
1494        for col in cols.iter() {
1495            let col_val = self.read_col(col).map_err(|err| match err {
1496                TypeError::WrongType { .. } => {
1497                    unreachable!("AlgebraicValue::read_column never returns a `TypeError::WrongType`")
1498                }
1499                TypeError::IndexOutOfBounds { .. } => col,
1500            })?;
1501            elements.push(col_val);
1502        }
1503        Ok(AlgebraicValue::product(elements))
1504    }
1505
1506    /// Returns the raw row pointer for this row reference.
1507    pub fn pointer(&self) -> RowPointer {
1508        self.pointer
1509    }
1510
1511    /// Returns the blob store that any [`crate::blob_store::BlobHash`]es within the row refer to.
1512    pub(crate) fn blob_store(&self) -> &dyn BlobStore {
1513        self.blob_store
1514    }
1515
1516    /// Return the layout of the row.
1517    ///
1518    /// All rows within the same table will have the same layout.
1519    pub fn row_layout(&self) -> &RowTypeLayout {
1520        &self.table.row_layout
1521    }
1522
1523    /// Returns the page the row is in and the offset of the row within that page.
1524    pub fn page_and_offset(&self) -> (&Page, PageOffset) {
1525        self.table.page_and_offset(self.pointer())
1526    }
1527
1528    /// Returns the bytes for the fixed portion of this row.
1529    pub(crate) fn get_row_data(&self) -> &Bytes {
1530        let (page, offset) = self.page_and_offset();
1531        page.get_row_data(offset, self.table.row_layout.size())
1532    }
1533
1534    /// Returns the row hash for `ptr`.
1535    pub fn row_hash(&self) -> RowHash {
1536        RowHash(RowHash::hasher_builder().hash_one(self))
1537    }
1538
1539    /// Returns the static layout for this row reference, if any.
1540    pub fn static_layout(&self) -> Option<&StaticLayout> {
1541        self.table.static_layout.as_ref().map(|(s, _)| s)
1542    }
1543
1544    /// Encode the row referred to by `self` into a `Vec<u8>` using BSATN and then deserialize it.
1545    pub fn read_via_bsatn<T>(&self, scratch: &mut Vec<u8>) -> Result<T, ReadViaBsatnError>
1546    where
1547        T: DeserializeOwned,
1548    {
1549        self.to_bsatn_extend(scratch)?;
1550        Ok(bsatn::from_slice::<T>(scratch)?)
1551    }
1552
1553    /// Return the number of bytes in the blob store to which this object holds a reference.
1554    ///
1555    /// Used to compute the table's `blob_store_bytes` when reconstructing a snapshot.
1556    ///
1557    /// Even within a single row, this is a conservative overestimate,
1558    /// as a row may contain multiple references to the same large blob.
1559    /// This seems unlikely to occur in practice.
1560    fn blob_store_bytes(&self) -> usize {
1561        let row_data = self.get_row_data();
1562        let (page, _) = self.page_and_offset();
1563        // SAFETY:
1564        // - Existence of a `RowRef` treated as proof
1565        //   of the row's validity and type information's correctness.
1566        unsafe { self.table.visitor_prog.visit_var_len(row_data) }
1567            .filter(|vlr| vlr.is_large_blob())
1568            .map(|vlr| {
1569                // SAFETY:
1570                // - Because `vlr.is_large_blob`, it points to exactly one granule.
1571                let granule = unsafe { page.iter_var_len_object(vlr.first_granule) }.next().unwrap();
1572                let blob_hash = granule.blob_hash();
1573                let blob = self.blob_store.retrieve_blob(&blob_hash).unwrap();
1574
1575                blob.len()
1576            })
1577            .sum()
1578    }
1579}
1580
1581impl Serialize for RowRef<'_> {
1582    fn serialize<S: Serializer>(&self, ser: S) -> Result<S::Ok, S::Error> {
1583        let table = self.table;
1584        let (page, offset) = table.page_and_offset(self.pointer);
1585        // SAFETY: `ptr` points to a valid row in this table per above check.
1586        unsafe { serialize_row_from_page(ser, page, self.blob_store, offset, &table.row_layout) }
1587    }
1588}
1589
1590impl ToBsatn for RowRef<'_> {
1591    /// BSATN-encode the row referred to by `self` into a freshly-allocated `Vec<u8>`.
1592    ///
1593    /// This method will use a [`StaticLayout`] if one is available,
1594    /// and may therefore be faster than calling [`bsatn::to_vec`].
1595    fn to_bsatn_vec(&self) -> Result<Vec<u8>, BsatnError> {
1596        if let Some(static_layout) = self.static_layout() {
1597            // Use fast path, by first fetching the row data and then using the static layout.
1598            let row = self.get_row_data();
1599            // SAFETY:
1600            // - Existence of a `RowRef` treated as proof
1601            //   of row's validity and type information's correctness.
1602            Ok(unsafe { static_layout.serialize_row_into_vec(row) })
1603        } else {
1604            bsatn::to_vec(self)
1605        }
1606    }
1607
1608    /// BSATN-encode the row referred to by `self` into `buf`,
1609    /// pushing `self`'s bytes onto the end of `buf`, similar to [`Vec::extend`].
1610    ///
1611    /// This method will use a [`StaticLayout`] if one is available,
1612    /// and may therefore be faster than calling [`bsatn::to_writer`].
1613    fn to_bsatn_extend(&self, buf: &mut Vec<u8>) -> Result<(), BsatnError> {
1614        if let Some(static_layout) = self.static_layout() {
1615            // Use fast path, by first fetching the row data and then using the static layout.
1616            let row = self.get_row_data();
1617            // SAFETY:
1618            // - Existence of a `RowRef` treated as proof
1619            //   of row's validity and type information's correctness.
1620            unsafe {
1621                static_layout.serialize_row_extend(buf, row);
1622            }
1623            Ok(())
1624        } else {
1625            // Use the slower, but more general, `bsatn_from` serializer to write the row.
1626            bsatn::to_writer(buf, self)
1627        }
1628    }
1629
1630    fn static_bsatn_size(&self) -> Option<u16> {
1631        self.static_layout().map(|sl| sl.bsatn_length)
1632    }
1633}
1634
1635impl Eq for RowRef<'_> {}
1636impl PartialEq for RowRef<'_> {
1637    fn eq(&self, other: &Self) -> bool {
1638        // Ensure that the layouts are the same
1639        // so that we can use `eq_row_in_page`.
1640        // To do this, we first try address equality on the layouts.
1641        // This should succeed when the rows originate from the same table.
1642        // Otherwise, actually compare the layouts, which is expensive, but unlikely to happen.
1643        let a_ty = self.row_layout();
1644        let b_ty = other.row_layout();
1645        if !(ptr::eq(a_ty, b_ty) || a_ty == b_ty) {
1646            return false;
1647        }
1648        let (page_a, offset_a) = self.page_and_offset();
1649        let (page_b, offset_b) = other.page_and_offset();
1650        let static_layout = self.static_layout();
1651        // SAFETY: `offset_a/b` are valid rows in `page_a/b` typed at `a_ty`
1652        // and `static_bsatn_layout` is derived from `a_ty`.
1653        unsafe { eq_row_in_page(page_a, page_b, offset_a, offset_b, a_ty, static_layout) }
1654    }
1655}
1656
1657impl PartialEq<ProductValue> for RowRef<'_> {
1658    fn eq(&self, rhs: &ProductValue) -> bool {
1659        let ty = self.row_layout();
1660        let (page, offset) = self.page_and_offset();
1661        // SAFETY: By having `RowRef`,
1662        // we know that `offset` is a valid offset for a row in `page` typed at `ty`.
1663        unsafe { eq_row_in_page_to_pv(self.blob_store, page, offset, rhs, ty) }
1664    }
1665}
1666
1667impl Hash for RowRef<'_> {
1668    fn hash<H: Hasher>(&self, state: &mut H) {
1669        let (page, offset) = self.table.page_and_offset(self.pointer);
1670        let ty = &self.table.row_layout;
1671        // SAFETY: A `RowRef` is a proof that `self.pointer` refers to a live fixed row in `self.table`, so:
1672        // 1. `offset` points at a row in `page` lasting `ty.size()` bytes.
1673        // 2. the row is valid for `ty`.
1674        // 3. for any `vlr: VarLenRef` stored in the row,
1675        //    `vlr.first_offset` is either `NULL` or points to a valid granule in `page`.
1676        unsafe { hash_row_in_page(state, page, self.blob_store, offset, ty) };
1677    }
1678}
1679
1680/// An iterator over all the rows, yielded as [`RowRef`]s, in a table.
1681pub struct TableScanIter<'table> {
1682    /// The current page we're yielding rows from.
1683    /// When `None`, the iterator will attempt to advance to the next page, if any.
1684    current_page: Option<FixedLenRowsIter<'table>>,
1685    /// The current page index we are or will visit.
1686    current_page_idx: PageIndex,
1687    /// The table the iterator is yielding rows from.
1688    pub(crate) table: &'table Table,
1689    /// The `BlobStore` that row references may refer into.
1690    pub(crate) blob_store: &'table dyn BlobStore,
1691}
1692
1693impl<'a> Iterator for TableScanIter<'a> {
1694    type Item = RowRef<'a>;
1695
1696    fn next(&mut self) -> Option<Self::Item> {
1697        // This could have been written using `.flat_map`,
1698        // but we don't have `type Foo = impl Iterator<...>;` on stable yet.
1699        loop {
1700            match &mut self.current_page {
1701                // We're currently visiting a page,
1702                Some(iter_fixed_len) => {
1703                    if let Some(page_offset) = iter_fixed_len.next() {
1704                        // There's still at least one row in that page to visit,
1705                        // return a ref to that row.
1706                        let ptr =
1707                            RowPointer::new(false, self.current_page_idx, page_offset, self.table.squashed_offset);
1708
1709                        // SAFETY: `offset` came from the `iter_fixed_len`, so it must point to a valid row.
1710                        let row_ref = unsafe { self.table.get_row_ref_unchecked(self.blob_store, ptr) };
1711                        return Some(row_ref);
1712                    } else {
1713                        // We've finished visiting that page, so set `current_page` to `None`,
1714                        // increment `self.current_page_idx` to the index of the next page,
1715                        // and go to the `None` case (1) in the match.
1716                        self.current_page = None;
1717                        self.current_page_idx.0 += 1;
1718                    }
1719                }
1720
1721                // (1) If we aren't currently visiting a page,
1722                // the `else` case in the `Some` match arm
1723                // already incremented `self.current_page_idx`,
1724                // or we're just beginning and so it was initialized as 0.
1725                None => {
1726                    // If there's another page, set `self.current_page` to it,
1727                    // and go to the `Some` case in the match.
1728                    let next_page = self.table.pages().get(self.current_page_idx.idx())?;
1729                    let iter = next_page.iter_fixed_len(self.table.row_size());
1730                    self.current_page = Some(iter);
1731                }
1732            }
1733        }
1734    }
1735}
1736
1737/// A combined table and index,
1738/// allowing direct extraction of a [`IndexScanIter`].
1739#[derive(Copy, Clone)]
1740pub struct TableAndIndex<'a> {
1741    table: &'a Table,
1742    blob_store: &'a dyn BlobStore,
1743    index: &'a TableIndex,
1744}
1745
1746impl<'a> TableAndIndex<'a> {
1747    pub fn table(&self) -> &'a Table {
1748        self.table
1749    }
1750
1751    pub fn index(&self) -> &'a TableIndex {
1752        self.index
1753    }
1754
1755    /// Wraps `ptr` in a [`RowRef`].
1756    ///
1757    /// # Safety
1758    ///
1759    /// The `self.table().is_row_present(ptr)` must hold.
1760    pub unsafe fn combine_with_ptr(&self, ptr: RowPointer) -> RowRef<'a> {
1761        // SAFETY: forward caller requirement.
1762        unsafe { self.table.get_row_ref_unchecked(self.blob_store, ptr) }
1763    }
1764
1765    /// Returns an iterator yielding all rows in this index for `key`.
1766    ///
1767    /// Matching is defined by `Ord for AlgebraicValue`.
1768    pub fn seek_point(&self, key: &AlgebraicValue) -> IndexScanPointIter<'a> {
1769        IndexScanPointIter {
1770            table: self.table,
1771            blob_store: self.blob_store,
1772            btree_index_iter: self.index.seek_point(key),
1773        }
1774    }
1775
1776    /// Returns an iterator yielding all rows in this index that fall within `range`.
1777    ///
1778    /// Matching is defined by `Ord for AlgebraicValue`.
1779    pub fn seek_range(&self, range: &impl RangeBounds<AlgebraicValue>) -> IndexScanRangeIter<'a> {
1780        IndexScanRangeIter {
1781            table: self.table,
1782            blob_store: self.blob_store,
1783            btree_index_iter: self.index.seek_range(range),
1784        }
1785    }
1786}
1787
1788/// An iterator using a [`TableIndex`] to scan a `table`
1789/// for all the [`RowRef`]s matching the specified `key` in the indexed column(s).
1790///
1791/// Matching is defined by `Ord for AlgebraicValue`.
1792pub struct IndexScanPointIter<'a> {
1793    /// The table being scanned for rows.
1794    table: &'a Table,
1795    /// The blob store; passed on to the [`RowRef`]s in case they need it.
1796    blob_store: &'a dyn BlobStore,
1797    /// The iterator performing the index scan yielding row pointers.
1798    btree_index_iter: TableIndexPointIter<'a>,
1799}
1800
1801impl<'a> IndexScanPointIter<'a> {
1802    /// Consume the iterator, returning the inner one.
1803    pub fn index(self) -> TableIndexPointIter<'a> {
1804        self.btree_index_iter
1805    }
1806}
1807
1808impl<'a> Iterator for IndexScanPointIter<'a> {
1809    type Item = RowRef<'a>;
1810
1811    fn next(&mut self) -> Option<Self::Item> {
1812        self.btree_index_iter.next().map(|ptr| {
1813            // SAFETY: `ptr` came from the index, which always holds pointers to valid rows for its table.
1814            unsafe { self.table.get_row_ref_unchecked(self.blob_store, ptr) }
1815        })
1816    }
1817}
1818
1819/// An iterator using a [`TableIndex`] to scan a `table`
1820/// for all the [`RowRef`]s matching the specified `range` in the indexed column(s).
1821///
1822/// Matching is defined by `Ord for AlgebraicValue`.
1823pub struct IndexScanRangeIter<'a> {
1824    /// The table being scanned for rows.
1825    table: &'a Table,
1826    /// The blob store; passed on to the [`RowRef`]s in case they need it.
1827    blob_store: &'a dyn BlobStore,
1828    /// The iterator performing the index scan yielding row pointers.
1829    btree_index_iter: TableIndexRangeIter<'a>,
1830}
1831
1832impl<'a> Iterator for IndexScanRangeIter<'a> {
1833    type Item = RowRef<'a>;
1834
1835    fn next(&mut self) -> Option<Self::Item> {
1836        self.btree_index_iter.next().map(|ptr| {
1837            // SAFETY: `ptr` came from the index, which always holds pointers to valid rows for its table.
1838            unsafe { self.table.get_row_ref_unchecked(self.blob_store, ptr) }
1839        })
1840    }
1841}
1842
1843#[derive(Error, Debug, PartialEq, Eq)]
1844#[error("Unique constraint violation '{}' in table '{}': column(s): '{:?}' value: {}", constraint_name, table_name, cols, value.to_satn())]
1845pub struct UniqueConstraintViolation {
1846    pub constraint_name: Box<str>,
1847    pub table_name: Box<str>,
1848    pub cols: Vec<Box<str>>,
1849    pub value: AlgebraicValue,
1850}
1851
1852impl UniqueConstraintViolation {
1853    /// Returns a unique constraint violation error for the given `index`
1854    /// and the `value` that would have been duplicated.
1855    ///
1856    /// In this version, the [`IndexSchema`] is looked up in `schema` based on `index_id`.
1857    #[cold]
1858    fn build(schema: &TableSchema, index: &TableIndex, index_id: IndexId, value: AlgebraicValue) -> Self {
1859        let index_schema = schema.indexes.iter().find(|i| i.index_id == index_id).unwrap();
1860        Self::build_with_index_schema(schema, index, index_schema, value)
1861    }
1862
1863    /// Returns a unique constraint violation error for the given `index`
1864    /// and the `value` that would have been duplicated.
1865    ///
1866    /// In this version, the `index_schema` is explicitly passed.
1867    #[cold]
1868    pub fn build_with_index_schema(
1869        schema: &TableSchema,
1870        index: &TableIndex,
1871        index_schema: &IndexSchema,
1872        value: AlgebraicValue,
1873    ) -> Self {
1874        // Fetch the table name.
1875        let table_name = schema.table_name.clone();
1876
1877        // Fetch the names of the columns used in the index.
1878        let cols = schema
1879            .get_columns(&index.indexed_columns)
1880            .map(|(_, cs)| cs.unwrap().col_name.clone())
1881            .collect();
1882
1883        // Fetch the name of the index.
1884        let constraint_name = index_schema.index_name.clone();
1885
1886        Self {
1887            constraint_name,
1888            table_name,
1889            cols,
1890            value,
1891        }
1892    }
1893}
1894
1895// Private API:
1896impl Table {
1897    /// Returns a unique constraint violation error for the given `index`
1898    /// and the `value` that would have been duplicated.
1899    #[cold]
1900    pub fn build_error_unique(
1901        &self,
1902        index: &TableIndex,
1903        index_id: IndexId,
1904        value: AlgebraicValue,
1905    ) -> UniqueConstraintViolation {
1906        let schema = self.get_schema();
1907        UniqueConstraintViolation::build(schema, index, index_id, value)
1908    }
1909
1910    /// Returns a new empty table using the particulars passed.
1911    fn new_raw(
1912        schema: Arc<TableSchema>,
1913        row_layout: RowTypeLayout,
1914        static_layout: Option<(StaticLayout, StaticBsatnValidator)>,
1915        visitor_prog: VarLenVisitorProgram,
1916        squashed_offset: SquashedOffset,
1917        pointer_map: Option<PointerMap>,
1918    ) -> Self {
1919        Self {
1920            inner: TableInner {
1921                row_layout,
1922                static_layout,
1923                visitor_prog,
1924                pages: Pages::default(),
1925            },
1926            is_scheduler: schema.schedule.is_some(),
1927            schema,
1928            indexes: BTreeMap::new(),
1929            pointer_map,
1930            squashed_offset,
1931            row_count: 0,
1932            blob_store_bytes: BlobNumBytes::default(),
1933        }
1934    }
1935
1936    /// Returns whether the row at `ptr` is present or not.
1937    // TODO: Remove all uses of this method,
1938    //       or more likely, gate them behind `debug_assert!`
1939    //       so they don't have semantic meaning.
1940    //
1941    //       Unlike the previous `locking_tx_datastore::Table`'s `RowId`,
1942    //       `RowPointer` is not content-addressed.
1943    //       This means it is possible to:
1944    //       - have a `RowPointer` A* to row A,
1945    //       - Delete row A,
1946    //       - Insert row B into the same storage as freed from A,
1947    //       - Test `is_row_present(A*)`, which falsely reports that row A is still present.
1948    //
1949    //       In the final interface, this method is superfluous anyways,
1950    //       as `RowPointer` is not part of our public interface.
1951    //       Instead, we will always discover a known-present `RowPointer`
1952    //       during a table scan or index seek.
1953    //       As such, our `delete` and `insert` methods can be `unsafe`
1954    //       and trust that the `RowPointer` is valid.
1955    fn is_row_present(&self, ptr: RowPointer) -> bool {
1956        if self.squashed_offset != ptr.squashed_offset() {
1957            return false;
1958        }
1959        let Some((page, offset)) = self.inner.try_page_and_offset(ptr) else {
1960            return false;
1961        };
1962        page.has_row_offset(self.row_size(), offset)
1963    }
1964
1965    /// Returns the row size for a row in the table.
1966    pub fn row_size(&self) -> Size {
1967        self.row_layout().size()
1968    }
1969
1970    /// Returns the layout for a row in the table.
1971    fn row_layout(&self) -> &RowTypeLayout {
1972        &self.inner.row_layout
1973    }
1974
1975    /// Returns the pages storing the physical rows of this table.
1976    fn pages(&self) -> &Pages {
1977        &self.inner.pages
1978    }
1979
1980    /// Iterates over each [`Page`] in this table, ensuring that its hash is computed before yielding it.
1981    ///
1982    /// Used when capturing a snapshot.
1983    pub fn iter_pages_with_hashes(&mut self) -> impl Iterator<Item = (blake3::Hash, &Page)> {
1984        self.inner.pages.iter_mut().map(|page| {
1985            let hash = page.save_or_get_content_hash();
1986            (hash, &**page)
1987        })
1988    }
1989
1990    /// Returns the number of pages storing the physical rows of this table.
1991    fn num_pages(&self) -> usize {
1992        self.inner.pages.len()
1993    }
1994
1995    /// Returns the [`StaticLayout`] for this table,
1996    pub(crate) fn static_layout(&self) -> Option<&StaticLayout> {
1997        self.inner.static_layout.as_ref().map(|(s, _)| s)
1998    }
1999
2000    /// Rebuild the [`PointerMap`] by iterating over all the rows in `self` and inserting them.
2001    ///
2002    /// Called when restoring from a snapshot after installing the pages,
2003    /// but after computing the row count,
2004    /// since snapshots do not save the pointer map..
2005    fn rebuild_pointer_map(&mut self, blob_store: &dyn BlobStore) -> PointerMap {
2006        // TODO(perf): Pre-allocate `PointerMap.map` with capacity `self.row_count`.
2007        // Alternatively, do this at the same time as `compute_row_count`.
2008        self.scan_rows(blob_store)
2009            .map(|row_ref| (row_ref.row_hash(), row_ref.pointer()))
2010            .collect()
2011    }
2012
2013    /// Compute and store `self.row_count` and `self.blob_store_bytes`
2014    /// by iterating over all the rows in `self` and counting them.
2015    ///
2016    /// Called when restoring from a snapshot after installing the pages,
2017    /// since snapshots do not save this metadata.
2018    fn compute_row_count(&mut self, blob_store: &dyn BlobStore) {
2019        let mut row_count = 0;
2020        let mut blob_store_bytes = 0;
2021        for row in self.scan_rows(blob_store) {
2022            row_count += 1;
2023            blob_store_bytes += row.blob_store_bytes();
2024        }
2025        self.row_count = row_count as u64;
2026        self.blob_store_bytes = blob_store_bytes.into();
2027    }
2028}
2029
2030#[cfg(test)]
2031pub(crate) mod test {
2032    use super::*;
2033    use crate::blob_store::{HashMapBlobStore, NullBlobStore};
2034    use crate::page::tests::hash_unmodified_save_get;
2035    use crate::var_len::VarLenGranule;
2036    use proptest::prelude::*;
2037    use proptest::test_runner::TestCaseResult;
2038    use spacetimedb_lib::db::raw_def::v9::{RawIndexAlgorithm, RawModuleDefV9Builder};
2039    use spacetimedb_primitives::{col_list, TableId};
2040    use spacetimedb_sats::bsatn::to_vec;
2041    use spacetimedb_sats::proptest::{generate_typed_row, generate_typed_row_vec};
2042    use spacetimedb_sats::{product, AlgebraicType, ArrayValue};
2043    use spacetimedb_schema::def::{BTreeAlgorithm, ModuleDef};
2044    use spacetimedb_schema::schema::Schema as _;
2045
2046    /// Create a `Table` from a `ProductType` without validation.
2047    pub(crate) fn table(ty: ProductType) -> Table {
2048        // Use a fast path here to avoid slowing down Miri in the proptests.
2049        // Does not perform validation.
2050        let schema = TableSchema::from_product_type(ty);
2051        Table::new(schema.into(), SquashedOffset::COMMITTED_STATE)
2052    }
2053
2054    #[test]
2055    fn unique_violation_error() {
2056        let table_name = "UniqueIndexed";
2057        let index_name = "UniqueIndexed_unique_col_idx_btree";
2058        let mut builder = RawModuleDefV9Builder::new();
2059        builder
2060            .build_table_with_new_type(
2061                table_name,
2062                ProductType::from([("unique_col", AlgebraicType::I32), ("other_col", AlgebraicType::I32)]),
2063                true,
2064            )
2065            .with_unique_constraint(0)
2066            .with_index(
2067                RawIndexAlgorithm::BTree { columns: col_list![0] },
2068                "accessor_name_doesnt_matter",
2069            );
2070
2071        let def: ModuleDef = builder.finish().try_into().expect("Failed to build schema");
2072
2073        let schema = TableSchema::from_module_def(&def, def.table(table_name).unwrap(), (), TableId::SENTINEL);
2074        assert_eq!(schema.indexes.len(), 1);
2075        let index_schema = schema.indexes[0].clone();
2076
2077        let mut table = Table::new(schema.into(), SquashedOffset::COMMITTED_STATE);
2078        let pool = PagePool::new_for_test();
2079        let cols = ColList::new(0.into());
2080        let algo = BTreeAlgorithm { columns: cols.clone() }.into();
2081
2082        let index = table.new_index(&algo, true).unwrap();
2083        // SAFETY: Index was derived from `table`.
2084        unsafe { table.insert_index(&NullBlobStore, index_schema.index_id, index) };
2085
2086        // Reserve a page so that we can check the hash.
2087        let pi = table.inner.pages.reserve_empty_page(&pool, table.row_size()).unwrap();
2088        let hash_pre_ins = hash_unmodified_save_get(&mut table.inner.pages[pi]);
2089
2090        // Insert the row (0, 0).
2091        table
2092            .insert(&pool, &mut NullBlobStore, &product![0i32, 0i32])
2093            .expect("Initial insert failed");
2094
2095        // Inserting cleared the hash.
2096        let hash_post_ins = hash_unmodified_save_get(&mut table.inner.pages[pi]);
2097        assert_ne!(hash_pre_ins, hash_post_ins);
2098
2099        // Try to insert the row (0, 1), and assert that we get the expected error.
2100        match table.insert(&pool, &mut NullBlobStore, &product![0i32, 1i32]) {
2101            Ok(_) => panic!("Second insert with same unique value succeeded"),
2102            Err(InsertError::IndexError(UniqueConstraintViolation {
2103                constraint_name,
2104                table_name,
2105                cols,
2106                value,
2107            })) => {
2108                assert_eq!(&*constraint_name, index_name);
2109                assert_eq!(&*table_name, "UniqueIndexed");
2110                assert_eq!(cols.iter().map(|c| c.to_string()).collect::<Vec<_>>(), &["unique_col"]);
2111                assert_eq!(value, AlgebraicValue::I32(0));
2112            }
2113            Err(e) => panic!("Expected UniqueConstraintViolation but found {:?}", e),
2114        }
2115
2116        // Second insert did clear the hash while we had a constraint violation,
2117        // as constraint checking is done after insertion and then rolled back.
2118        assert_eq!(table.inner.pages[pi].unmodified_hash(), None);
2119    }
2120
2121    fn insert_retrieve_body(ty: impl Into<ProductType>, val: impl Into<ProductValue>) -> TestCaseResult {
2122        let val = val.into();
2123        let pool = PagePool::new_for_test();
2124        let mut blob_store = HashMapBlobStore::default();
2125        let mut table = table(ty.into());
2126        let (hash, row) = table.insert(&pool, &mut blob_store, &val).unwrap();
2127        let hash = hash.unwrap();
2128        prop_assert_eq!(row.row_hash(), hash);
2129        let ptr = row.pointer();
2130        prop_assert_eq!(table.pointers_for(hash), &[ptr]);
2131
2132        prop_assert_eq!(table.inner.pages.len(), 1);
2133        prop_assert_eq!(table.inner.pages[PageIndex(0)].num_rows(), 1);
2134
2135        let row_ref = table.get_row_ref(&blob_store, ptr).unwrap();
2136        prop_assert_eq!(row_ref.to_product_value(), val.clone());
2137        let bsatn_val = to_vec(&val).unwrap();
2138        prop_assert_eq!(&bsatn_val, &to_vec(&row_ref).unwrap());
2139        prop_assert_eq!(&bsatn_val, &row_ref.to_bsatn_vec().unwrap());
2140
2141        prop_assert_eq!(
2142            &table.scan_rows(&blob_store).map(|r| r.pointer()).collect::<Vec<_>>(),
2143            &[ptr]
2144        );
2145
2146        Ok(())
2147    }
2148
2149    #[test]
2150    fn repro_serialize_bsatn_empty_array() {
2151        let ty = AlgebraicType::array(AlgebraicType::U64);
2152        let arr = ArrayValue::from(Vec::<u64>::new().into_boxed_slice());
2153        insert_retrieve_body(ty, AlgebraicValue::from(arr)).unwrap();
2154    }
2155
2156    #[test]
2157    fn repro_serialize_bsatn_debug_assert() {
2158        let ty = AlgebraicType::array(AlgebraicType::U64);
2159        let arr = ArrayValue::from((0..130u64).collect::<Box<_>>());
2160        insert_retrieve_body(ty, AlgebraicValue::from(arr)).unwrap();
2161    }
2162
2163    fn reconstruct_index_num_key_bytes(table: &Table, blob_store: &dyn BlobStore, index_id: IndexId) -> u64 {
2164        let index = table.get_index_by_id(index_id).unwrap();
2165
2166        index
2167            .seek_range(&(..))
2168            .map(|row_ptr| {
2169                let row_ref = table.get_row_ref(blob_store, row_ptr).unwrap();
2170                let key = row_ref.project(&index.indexed_columns).unwrap();
2171                crate::table_index::KeySize::key_size_in_bytes(&key) as u64
2172            })
2173            .sum()
2174    }
2175
2176    /// Given a row type `ty`, a set of rows of that type `vals`,
2177    /// and a set of columns within that type `indexed_columns`,
2178    /// populate a table with `vals`, add an index on the `indexed_columns`,
2179    /// and perform various assertions that the reported index size metrics are correct.
2180    fn test_index_size_reporting(
2181        ty: ProductType,
2182        vals: Vec<ProductValue>,
2183        indexed_columns: ColList,
2184    ) -> Result<(), TestCaseError> {
2185        let pool = PagePool::new_for_test();
2186        let mut blob_store = HashMapBlobStore::default();
2187        let mut table = table(ty.clone());
2188
2189        for row in &vals {
2190            prop_assume!(table.insert(&pool, &mut blob_store, row).is_ok());
2191        }
2192
2193        // We haven't added any indexes yet, so there should be 0 rows in indexes.
2194        prop_assert_eq!(table.num_rows_in_indexes(), 0);
2195
2196        let index_id = IndexId(0);
2197
2198        let algo = BTreeAlgorithm {
2199            columns: indexed_columns.clone(),
2200        }
2201        .into();
2202        let index = TableIndex::new(&ty, &algo, false).unwrap();
2203        // Add an index on column 0.
2204        // Safety:
2205        // We're using `ty` as the row type for both `table` and the new index.
2206        unsafe { table.insert_index(&blob_store, index_id, index) };
2207
2208        // We have one index, which should be fully populated,
2209        // so in total we should have the same number of rows in indexes as we have rows.
2210        prop_assert_eq!(table.num_rows_in_indexes(), table.num_rows());
2211
2212        let index = table.get_index_by_id(index_id).unwrap();
2213
2214        // One index, so table's reporting of bytes used should match that index's reporting.
2215        prop_assert_eq!(table.bytes_used_by_index_keys(), index.num_key_bytes());
2216
2217        // Walk all the rows in the index, sum their key size,
2218        // and assert it matches the `index.num_key_bytes()`
2219        prop_assert_eq!(
2220            index.num_key_bytes(),
2221            reconstruct_index_num_key_bytes(&table, &blob_store, index_id)
2222        );
2223
2224        // Walk all the rows we inserted, project them to the cols that will be their keys,
2225        // sum their key size,
2226        // and assert it matches the `index.num_key_bytes()`
2227        let key_size_in_pvs = vals
2228            .iter()
2229            .map(|row| crate::table_index::KeySize::key_size_in_bytes(&row.project(&indexed_columns).unwrap()) as u64)
2230            .sum();
2231        prop_assert_eq!(index.num_key_bytes(), key_size_in_pvs);
2232
2233        let algo = BTreeAlgorithm {
2234            columns: indexed_columns,
2235        }
2236        .into();
2237        let index = TableIndex::new(&ty, &algo, false).unwrap();
2238        // Add a duplicate of the same index, so we can check that all above quantities double.
2239        // Safety:
2240        // As above, we're using `ty` as the row type for both `table` and the new index.
2241        unsafe { table.insert_index(&blob_store, IndexId(1), index) };
2242
2243        prop_assert_eq!(table.num_rows_in_indexes(), table.num_rows() * 2);
2244        prop_assert_eq!(table.bytes_used_by_index_keys(), key_size_in_pvs * 2);
2245
2246        Ok(())
2247    }
2248
2249    proptest! {
2250        #![proptest_config(ProptestConfig { max_shrink_iters: 0x10000000, ..Default::default() })]
2251
2252        #[test]
2253        fn insert_retrieve((ty, val) in generate_typed_row()) {
2254            insert_retrieve_body(ty, val)?;
2255        }
2256
2257        #[test]
2258        fn insert_delete_removed_from_pointer_map((ty, val) in generate_typed_row()) {
2259            let pool = PagePool::new_for_test();
2260            let mut blob_store = HashMapBlobStore::default();
2261            let mut table = table(ty);
2262            let (hash, row) = table.insert(&pool, &mut blob_store, &val).unwrap();
2263            let hash = hash.unwrap();
2264            prop_assert_eq!(row.row_hash(), hash);
2265            let ptr = row.pointer();
2266            prop_assert_eq!(table.pointers_for(hash), &[ptr]);
2267
2268            prop_assert_eq!(table.inner.pages.len(), 1);
2269            prop_assert_eq!(table.inner.pages[PageIndex(0)].num_rows(), 1);
2270            prop_assert_eq!(&table.scan_rows(&blob_store).map(|r| r.pointer()).collect::<Vec<_>>(), &[ptr]);
2271            prop_assert_eq!(table.row_count, 1);
2272
2273            let hash_pre_del = hash_unmodified_save_get(&mut table.inner.pages[ptr.page_index()]);
2274
2275            table.delete(&mut blob_store, ptr, |_| ());
2276
2277            let hash_post_del = hash_unmodified_save_get(&mut table.inner.pages[ptr.page_index()]);
2278            assert_ne!(hash_pre_del, hash_post_del);
2279
2280            prop_assert_eq!(table.pointers_for(hash), &[]);
2281
2282            prop_assert_eq!(table.inner.pages.len(), 1);
2283            prop_assert_eq!(table.inner.pages[PageIndex(0)].num_rows(), 0);
2284            prop_assert_eq!(table.row_count, 0);
2285
2286            prop_assert!(&table.scan_rows(&blob_store).next().is_none());
2287        }
2288
2289        #[test]
2290        fn insert_duplicate_set_semantic((ty, val) in generate_typed_row()) {
2291            let pool = PagePool::new_for_test();
2292            let mut blob_store = HashMapBlobStore::default();
2293            let mut table = table(ty);
2294
2295            let (hash, row) = table.insert(&pool, &mut blob_store, &val).unwrap();
2296            let hash = hash.unwrap();
2297            prop_assert_eq!(row.row_hash(), hash);
2298            let ptr = row.pointer();
2299            prop_assert_eq!(table.inner.pages.len(), 1);
2300            prop_assert_eq!(table.pointers_for(hash), &[ptr]);
2301            prop_assert_eq!(table.row_count, 1);
2302            prop_assert_eq!(&table.scan_rows(&blob_store).map(|r| r.pointer()).collect::<Vec<_>>(), &[ptr]);
2303
2304            let blob_uses = blob_store.usage_counter();
2305
2306            let hash_pre_ins = hash_unmodified_save_get(&mut table.inner.pages[ptr.page_index()]);
2307
2308            prop_assert!(table.insert(&pool, &mut blob_store, &val).is_err());
2309
2310            // Hash was cleared and is different despite failure to insert.
2311            let hash_post_ins = hash_unmodified_save_get(&mut table.inner.pages[ptr.page_index()]);
2312            assert_ne!(hash_pre_ins, hash_post_ins);
2313
2314            prop_assert_eq!(table.row_count, 1);
2315            prop_assert_eq!(table.inner.pages.len(), 1);
2316            prop_assert_eq!(table.pointers_for(hash), &[ptr]);
2317
2318            let blob_uses_after = blob_store.usage_counter();
2319
2320            prop_assert_eq!(blob_uses_after, blob_uses);
2321            prop_assert_eq!(table.inner.pages[PageIndex(0)].num_rows(), 1);
2322            prop_assert_eq!(&table.scan_rows(&blob_store).map(|r| r.pointer()).collect::<Vec<_>>(), &[ptr]);
2323        }
2324
2325        #[test]
2326        fn insert_bsatn_same_as_pv((ty, val) in generate_typed_row()) {
2327            let pool = PagePool::new_for_test();
2328            let mut bs_pv = HashMapBlobStore::default();
2329            let mut table_pv = table(ty.clone());
2330            let res_pv = table_pv.insert(&pool, &mut bs_pv, &val);
2331
2332            let mut bs_bsatn = HashMapBlobStore::default();
2333            let mut table_bsatn = table(ty);
2334            let res_bsatn = insert_bsatn(&mut table_bsatn, &mut bs_bsatn, &val);
2335
2336            prop_assert_eq!(res_pv, res_bsatn);
2337            prop_assert_eq!(bs_pv, bs_bsatn);
2338            prop_assert_eq!(table_pv, table_bsatn);
2339        }
2340
2341        #[test]
2342        fn row_size_reporting_matches_slow_implementations((ty, vals) in generate_typed_row_vec(128, 2048)) {
2343            let pool = PagePool::new_for_test();
2344            let mut blob_store = HashMapBlobStore::default();
2345            let mut table = table(ty.clone());
2346
2347            for row in &vals {
2348                prop_assume!(table.insert(&pool, &mut blob_store, row).is_ok());
2349            }
2350
2351            prop_assert_eq!(table.bytes_used_by_rows(), table.reconstruct_bytes_used_by_rows());
2352            prop_assert_eq!(table.num_rows(), table.reconstruct_num_rows());
2353            prop_assert_eq!(table.num_rows(), vals.len() as u64);
2354
2355            // TODO(testing): Determine if there's a meaningful way to test that the blob store reporting is correct.
2356            // I (pgoldman 2025-01-27) doubt it, as the test would be "visit every blob and sum their size,"
2357            // which is already what the actual implementation does.
2358        }
2359
2360        #[test]
2361        fn index_size_reporting_matches_slow_implementations_single_column((ty, vals) in generate_typed_row_vec(128, 2048)) {
2362            prop_assume!(!ty.elements.is_empty());
2363
2364            test_index_size_reporting(ty, vals, ColList::from(ColId(0)))?;
2365        }
2366
2367        #[test]
2368        fn index_size_reporting_matches_slow_implementations_two_column((ty, vals) in generate_typed_row_vec(128, 2048)) {
2369            prop_assume!(ty.elements.len() >= 2);
2370
2371
2372            test_index_size_reporting(ty, vals, ColList::from([ColId(0), ColId(1)]))?;
2373        }
2374    }
2375
2376    fn insert_bsatn<'a>(
2377        table: &'a mut Table,
2378        blob_store: &'a mut dyn BlobStore,
2379        val: &ProductValue,
2380    ) -> Result<(Option<RowHash>, RowRef<'a>), InsertError> {
2381        let row = &to_vec(&val).unwrap();
2382
2383        // Optimistically insert the `row` before checking any constraints
2384        // under the assumption that errors (unique constraint & set semantic violations) are rare.
2385        let pool = PagePool::new_for_test();
2386        let (row_ref, blob_bytes) = table.insert_physically_bsatn(&pool, blob_store, row)?;
2387        let row_ptr = row_ref.pointer();
2388
2389        // Confirm the insertion, checking any constraints, removing the physical row on error.
2390        // SAFETY: We just inserted `ptr`, so it must be present.
2391        let (hash, row_ptr) = unsafe { table.confirm_insertion::<true>(blob_store, row_ptr, blob_bytes) }?;
2392        // SAFETY: Per post-condition of `confirm_insertion`, `row_ptr` refers to a valid row.
2393        let row_ref = unsafe { table.get_row_ref_unchecked(blob_store, row_ptr) };
2394        Ok((hash, row_ref))
2395    }
2396
2397    // Compare `scan_rows` against a simpler implementation.
2398    #[test]
2399    fn table_scan_iter_eq_flatmap() {
2400        let pool = PagePool::new_for_test();
2401        let mut blob_store = HashMapBlobStore::default();
2402        let mut table = table(AlgebraicType::U64.into());
2403        for v in 0..2u64.pow(14) {
2404            table.insert(&pool, &mut blob_store, &product![v]).unwrap();
2405        }
2406
2407        let complex = table.scan_rows(&blob_store).map(|r| r.pointer());
2408        let simple = table
2409            .inner
2410            .pages
2411            .iter()
2412            .zip((0..).map(PageIndex))
2413            .flat_map(|(page, pi)| {
2414                page.iter_fixed_len(table.row_size())
2415                    .map(move |po| RowPointer::new(false, pi, po, table.squashed_offset))
2416            });
2417        assert!(complex.eq(simple));
2418    }
2419
2420    #[test]
2421    #[should_panic]
2422    fn read_row_unaligned_page_offset_soundness() {
2423        // Insert a `u64` into a table.
2424        let pt = AlgebraicType::U64.into();
2425        let pv = product![42u64];
2426        let mut table = table(pt);
2427        let pool = &PagePool::new_for_test();
2428        let blob_store = &mut NullBlobStore;
2429        let (_, row_ref) = table.insert(pool, blob_store, &pv).unwrap();
2430
2431        // Manipulate the page offset to 1 instead of 0.
2432        // This now points into the "middle" of a row.
2433        let ptr = row_ref.pointer().with_page_offset(PageOffset(1));
2434
2435        // We expect this to panic.
2436        // Miri should not have any issue with this call either.
2437        table.get_row_ref(&NullBlobStore, ptr).unwrap().to_product_value();
2438    }
2439
2440    #[test]
2441    fn test_blob_store_bytes() {
2442        let pt: ProductType = [AlgebraicType::String, AlgebraicType::I32].into();
2443        let pool = &PagePool::new_for_test();
2444        let blob_store = &mut HashMapBlobStore::default();
2445        let mut insert = |table: &mut Table, string, num| {
2446            table
2447                .insert(pool, blob_store, &product![string, num])
2448                .unwrap()
2449                .1
2450                .pointer()
2451        };
2452        let mut table1 = table(pt.clone());
2453
2454        // Insert short string, `blob_store_bytes` should be 0.
2455        let short_str = std::str::from_utf8(&[98; 6]).unwrap();
2456        let short_row_ptr = insert(&mut table1, short_str, 0);
2457        assert_eq!(table1.blob_store_bytes.0, 0);
2458
2459        // Insert long string, `blob_store_bytes` should be the length of the string.
2460        const BLOB_OBJ_LEN: BlobNumBytes = BlobNumBytes(VarLenGranule::OBJECT_SIZE_BLOB_THRESHOLD + 1);
2461        let long_str = std::str::from_utf8(&[98; BLOB_OBJ_LEN.0]).unwrap();
2462        let long_row_ptr = insert(&mut table1, long_str, 0);
2463        assert_eq!(table1.blob_store_bytes, BLOB_OBJ_LEN);
2464
2465        // Insert previous long string in the same table,
2466        // `blob_store_bytes` should count the length twice,
2467        // even though `HashMapBlobStore` deduplicates it.
2468        let long_row_ptr2 = insert(&mut table1, long_str, 1);
2469        const BLOB_OBJ_LEN_2X: BlobNumBytes = BlobNumBytes(BLOB_OBJ_LEN.0 * 2);
2470        assert_eq!(table1.blob_store_bytes, BLOB_OBJ_LEN_2X);
2471
2472        // Insert previous long string in a new table,
2473        // `blob_store_bytes` should show the length,
2474        // even though `HashMapBlobStore` deduplicates it.
2475        let mut table2 = table(pt);
2476        let _ = insert(&mut table2, long_str, 0);
2477        assert_eq!(table2.blob_store_bytes, BLOB_OBJ_LEN);
2478
2479        // Delete `short_str` row. This should not affect the byte count.
2480        table1.delete(blob_store, short_row_ptr, |_| ()).unwrap();
2481        assert_eq!(table1.blob_store_bytes, BLOB_OBJ_LEN_2X);
2482
2483        // Delete the first long string row. This gets us down to `BLOB_OBJ_LEN` (we had 2x before).
2484        table1.delete(blob_store, long_row_ptr, |_| ()).unwrap();
2485        assert_eq!(table1.blob_store_bytes, BLOB_OBJ_LEN);
2486
2487        // Delete the first long string row. This gets us down to 0 (we've now deleted 2x).
2488        table1.delete(blob_store, long_row_ptr2, |_| ()).unwrap();
2489        assert_eq!(table1.blob_store_bytes, 0.into());
2490    }
2491
2492    /// Assert that calling `get_row_ref` to get a row ref to a non-existent `RowPointer`
2493    /// does not panic.
2494    #[test]
2495    fn get_row_ref_no_panic() {
2496        let blob_store = &mut HashMapBlobStore::default();
2497        let table = table([AlgebraicType::String, AlgebraicType::I32].into());
2498
2499        // This row pointer has an incorrect `SquashedOffset`, and so does not point into `table`.
2500        assert!(table
2501            .get_row_ref(
2502                blob_store,
2503                RowPointer::new(false, PageIndex(0), PageOffset(0), SquashedOffset::TX_STATE),
2504            )
2505            .is_none());
2506
2507        // This row pointer has the correct `SquashedOffset`, but points out-of-bounds within `table`.
2508        assert!(table
2509            .get_row_ref(
2510                blob_store,
2511                RowPointer::new(false, PageIndex(0), PageOffset(0), SquashedOffset::COMMITTED_STATE),
2512            )
2513            .is_none());
2514    }
2515}