spacetimedb_table/
table.rs

1use super::{
2    bflatn_from::serialize_row_from_page,
3    bflatn_to::{write_row_to_pages, write_row_to_pages_bsatn, Error},
4    blob_store::BlobStore,
5    eq::eq_row_in_page,
6    eq_to_pv::eq_row_in_page_to_pv,
7    indexes::{Bytes, PageIndex, PageOffset, RowHash, RowPointer, SquashedOffset, PAGE_DATA_SIZE},
8    page::{FixedLenRowsIter, Page},
9    page_pool::PagePool,
10    pages::Pages,
11    pointer_map::PointerMap,
12    read_column::{ReadColumn, TypeError},
13    row_hash::hash_row_in_page,
14    row_type_visitor::{row_type_visitor, VarLenVisitorProgram},
15    static_assert_size,
16    static_bsatn_validator::{static_bsatn_validator, validate_bsatn, StaticBsatnValidator},
17    static_layout::StaticLayout,
18    table_index::{TableIndex, TableIndexPointIter, TableIndexRangeIter},
19    var_len::VarLenMembers,
20};
21use core::{fmt, ptr};
22use core::{
23    hash::{Hash, Hasher},
24    hint::unreachable_unchecked,
25};
26use core::{mem, ops::RangeBounds};
27use derive_more::{Add, AddAssign, From, Sub, SubAssign};
28use enum_as_inner::EnumAsInner;
29use smallvec::SmallVec;
30use spacetimedb_lib::{bsatn::DecodeError, de::DeserializeOwned};
31use spacetimedb_primitives::{ColId, ColList, IndexId, SequenceId, TableId};
32use spacetimedb_sats::layout::{AlgebraicTypeLayout, PrimitiveType, RowTypeLayout, Size};
33use spacetimedb_sats::memory_usage::MemoryUsage;
34use spacetimedb_sats::{
35    algebraic_value::ser::ValueSerializer,
36    bsatn::{self, ser::BsatnError, ToBsatn},
37    i256,
38    product_value::InvalidFieldError,
39    satn::Satn,
40    ser::{Serialize, Serializer},
41    u256, AlgebraicValue, ProductType, ProductValue,
42};
43use spacetimedb_schema::{
44    def::IndexAlgorithm,
45    schema::{columns_to_row_type, ColumnSchema, IndexSchema, TableSchema},
46};
47use std::{
48    collections::{btree_map, BTreeMap},
49    sync::Arc,
50};
51use thiserror::Error;
52
53/// The number of bytes used by, added to, or removed from a [`Table`]'s share of a [`BlobStore`].
54#[derive(Copy, Clone, PartialEq, Eq, Debug, Default, From, Add, Sub, AddAssign, SubAssign)]
55pub struct BlobNumBytes(usize);
56
57impl MemoryUsage for BlobNumBytes {}
58
59pub type SeqIdList = SmallVec<[SequenceId; 4]>;
60static_assert_size!(SeqIdList, 24);
61
62/// A database table containing the row schema, the rows, and indices.
63///
64/// The table stores the rows into a page manager
65/// and uses an internal map to ensure that no identical row is stored more than once.
66#[derive(Debug, PartialEq, Eq)]
67pub struct Table {
68    /// Page manager and row layout grouped together, for `RowRef` purposes.
69    inner: TableInner,
70    /// Maps `RowHash -> [RowPointer]` where a [`RowPointer`] points into `pages`.
71    /// A [`PointerMap`] is effectively a specialized unique index on all the columns.
72    ///
73    /// In tables without any other unique constraints,
74    /// the pointer map is used to enforce set semantics,
75    /// i.e. to prevent duplicate rows.
76    /// If `self.indexes` contains at least one unique index,
77    /// duplicate rows are impossible regardless, so this will be `None`.
78    pointer_map: Option<PointerMap>,
79    /// The indices associated with a set of columns of the table.
80    pub indexes: BTreeMap<IndexId, TableIndex>,
81    /// The schema of the table, from which the type, and other details are derived.
82    pub schema: Arc<TableSchema>,
83    /// `SquashedOffset::TX_STATE` or `SquashedOffset::COMMITTED_STATE`
84    /// depending on whether this is a tx scratchpad table
85    /// or a committed table.
86    squashed_offset: SquashedOffset,
87    /// Stores number of rows present in table.
88    pub row_count: u64,
89    /// Stores the sum total number of bytes that each blob object in the table occupies.
90    ///
91    /// Note that the [`HashMapBlobStore`] does ref-counting and de-duplication,
92    /// but this sum will count an object each time its hash is mentioned, rather than just once.
93    blob_store_bytes: BlobNumBytes,
94    /// Indicates whether this is a scheduler table or not.
95    ///
96    /// This is an optimization to avoid checking the schema in e.g., `InstanceEnv::{insert, update}`.
97    is_scheduler: bool,
98}
99
100type StaticLayoutInTable = Option<(StaticLayout, StaticBsatnValidator)>;
101
102/// The part of a `Table` concerned only with storing rows.
103///
104/// Separated from the "outer" parts of `Table`, especially the `indexes`,
105/// so that `RowRef` can borrow only the `TableInner`,
106/// while other mutable references to the `indexes` exist.
107/// This is necessary because index insertions and deletions take a `RowRef` as an argument,
108/// from which they [`ReadColumn::read_column`] their keys.
109#[derive(Debug, PartialEq, Eq)]
110pub(crate) struct TableInner {
111    /// The type of rows this table stores, with layout information included.
112    row_layout: RowTypeLayout,
113    /// A [`StaticLayout`] for fast BFLATN <-> BSATN conversion,
114    /// if the [`RowTypeLayout`] has a static BSATN length and layout.
115    ///
116    /// A [`StaticBsatnValidator`] is also included.
117    /// It's used to validate BSATN-encoded rows before converting to BFLATN.
118    static_layout: StaticLayoutInTable,
119    /// The visitor program for `row_layout`.
120    ///
121    /// Must be in the `TableInner` so that [`RowRef::blob_store_bytes`] can use it.
122    visitor_prog: VarLenVisitorProgram,
123    /// The page manager that holds rows
124    /// including both their fixed and variable components.
125    pages: Pages,
126}
127
128impl TableInner {
129    /// Assumes `ptr` is a present row in `self` and returns a [`RowRef`] to it.
130    ///
131    /// # Safety
132    ///
133    /// The requirement is that `table.is_row_present(ptr)` must hold,
134    /// where `table` is the `Table` which contains this `TableInner`.
135    /// That is, `ptr` must refer to a row within `self`
136    /// which was previously inserted and has not been deleted since.
137    ///
138    /// This means:
139    /// - The `PageIndex` of `ptr` must be in-bounds for `self.pages`.
140    /// - The `PageOffset` of `ptr` must be properly aligned for the row type of `self`,
141    ///   and must refer to a valid, live row in that page.
142    /// - The `SquashedOffset` of `ptr` must match the enclosing table's `table.squashed_offset`.
143    ///
144    /// Showing that `ptr` was the result of a call to [`Table::insert(table, ..)`]
145    /// and has not been passed to [`Table::delete_internal_skip_pointer_map(table, ..)`]
146    /// is sufficient to demonstrate all of these properties.
147    unsafe fn get_row_ref_unchecked<'a>(
148        &'a self,
149        blob_store: &'a dyn BlobStore,
150        squashed_offset: SquashedOffset,
151        ptr: RowPointer,
152    ) -> RowRef<'a> {
153        // SAFETY: Forward caller requirements.
154        unsafe { RowRef::new(self, blob_store, squashed_offset, ptr) }
155    }
156
157    /// Returns whether the row at `ptr` is present or not.
158    // TODO: Remove all uses of this method,
159    //       or more likely, gate them behind `debug_assert!`
160    //       so they don't have semantic meaning.
161    //
162    //       Unlike the previous `locking_tx_datastore::Table`'s `RowId`,
163    //       `RowPointer` is not content-addressed.
164    //       This means it is possible to:
165    //       - have a `RowPointer` A* to row A,
166    //       - Delete row A,
167    //       - Insert row B into the same storage as freed from A,
168    //       - Test `is_row_present(A*)`, which falsely reports that row A is still present.
169    //
170    //       In the final interface, this method is superfluous anyways,
171    //       as `RowPointer` is not part of our public interface.
172    //       Instead, we will always discover a known-present `RowPointer`
173    //       during a table scan or index seek.
174    //       As such, our `delete` and `insert` methods can be `unsafe`
175    //       and trust that the `RowPointer` is valid.
176    fn is_row_present(&self, _squashed_offset: SquashedOffset, ptr: RowPointer) -> bool {
177        if _squashed_offset != ptr.squashed_offset() {
178            return false;
179        }
180        let Some((page, offset)) = self.try_page_and_offset(ptr) else {
181            return false;
182        };
183        page.has_row_offset(self.row_layout.size(), offset)
184    }
185
186    fn try_page_and_offset(&self, ptr: RowPointer) -> Option<(&Page, PageOffset)> {
187        (ptr.page_index().idx() < self.pages.len()).then(|| (&self.pages[ptr.page_index()], ptr.page_offset()))
188    }
189
190    /// Returns the page and page offset that `ptr` points to.
191    fn page_and_offset(&self, ptr: RowPointer) -> (&Page, PageOffset) {
192        self.try_page_and_offset(ptr).unwrap()
193    }
194}
195
196static_assert_size!(Table, 264);
197
198impl MemoryUsage for Table {
199    fn heap_usage(&self) -> usize {
200        let Self {
201            inner,
202            pointer_map,
203            indexes,
204            // MEMUSE: intentionally ignoring schema
205            schema: _,
206            squashed_offset,
207            row_count,
208            blob_store_bytes,
209            is_scheduler,
210        } = self;
211        inner.heap_usage()
212            + pointer_map.heap_usage()
213            + indexes.heap_usage()
214            + squashed_offset.heap_usage()
215            + row_count.heap_usage()
216            + blob_store_bytes.heap_usage()
217            + is_scheduler.heap_usage()
218    }
219}
220
221impl MemoryUsage for TableInner {
222    fn heap_usage(&self) -> usize {
223        let Self {
224            row_layout,
225            static_layout,
226            visitor_prog,
227            pages,
228        } = self;
229        row_layout.heap_usage() + static_layout.heap_usage() + visitor_prog.heap_usage() + pages.heap_usage()
230    }
231}
232
233/// There was already a row with the same value.
234#[derive(Error, Debug, PartialEq, Eq)]
235#[error("Duplicate insertion of row {0:?} violates set semantics")]
236pub struct DuplicateError(pub RowPointer);
237
238/// Various error that can happen on table insertion.
239#[derive(Error, Debug, PartialEq, Eq, EnumAsInner)]
240pub enum InsertError {
241    /// There was already a row with the same value.
242    #[error(transparent)]
243    Duplicate(#[from] DuplicateError),
244
245    /// Couldn't write the row to the page manager.
246    #[error(transparent)]
247    Bflatn(#[from] super::bflatn_to::Error),
248
249    /// Some index related error occurred.
250    #[error(transparent)]
251    IndexError(#[from] UniqueConstraintViolation),
252}
253
254/// Errors that can occur while trying to read a value via bsatn.
255#[derive(Error, Debug)]
256pub enum ReadViaBsatnError {
257    #[error(transparent)]
258    BSatnError(#[from] BsatnError),
259
260    #[error(transparent)]
261    DecodeError(#[from] DecodeError),
262}
263
264#[derive(Error, Debug)]
265#[error("Cannot change the columns of table `{table_name}` with id {table_id} from `{old:?}` to `{new:?}`")]
266pub struct ChangeColumnsError {
267    table_id: TableId,
268    table_name: Box<str>,
269    old: Vec<ColumnSchema>,
270    new: Vec<ColumnSchema>,
271}
272
273/// Computes the parts of a table definition, that are row type dependent, from the row type.
274fn table_row_type_dependents(row_type: ProductType) -> (RowTypeLayout, StaticLayoutInTable, VarLenVisitorProgram) {
275    let row_layout: RowTypeLayout = row_type.into();
276    let static_layout = StaticLayout::for_row_type(&row_layout).map(|sl| (sl, static_bsatn_validator(&row_layout)));
277    let visitor_prog = row_type_visitor(&row_layout);
278
279    (row_layout, static_layout, visitor_prog)
280}
281
282// Public API:
283impl Table {
284    /// Creates a new empty table with the given `schema` and `squashed_offset`.
285    pub fn new(schema: Arc<TableSchema>, squashed_offset: SquashedOffset) -> Self {
286        let (row_layout, static_layout, visitor_prog) = table_row_type_dependents(schema.get_row_type().clone());
287
288        // By default, we start off with an empty pointer map,
289        // which is removed when the first unique index is added.
290        let pm = Some(PointerMap::default());
291        Self::new_raw(schema, row_layout, static_layout, visitor_prog, squashed_offset, pm)
292    }
293
294    /// Change the columns of `self` to those in `column_schemas`
295    /// and returns the old column schemas.
296    ///
297    /// Returns an error if the new list of column is incompatible with the old.
298    pub fn change_columns_to(
299        &mut self,
300        column_schemas: Vec<ColumnSchema>,
301    ) -> Result<Vec<ColumnSchema>, ChangeColumnsError> {
302        fn validate(
303            this: &Table,
304            new_row_layout: &RowTypeLayout,
305            column_schemas: &[ColumnSchema],
306        ) -> Result<(), ChangeColumnsError> {
307            // Validate that the old row type layout can be changed to the new.
308            let schema = this.get_schema();
309            let row_layout = this.row_layout();
310
311            // Require that a scheduler table doesn't change the `id` and `at` fields.
312            let schedule_compat = schema.schedule.as_ref().zip(schema.pk()).is_none_or(|(schedule, pk)| {
313                let at_col = schedule.at_column.idx();
314                let id_col = pk.col_pos.idx();
315                row_layout[at_col] == new_row_layout[at_col] && row_layout[id_col] == new_row_layout[id_col]
316            });
317
318            // The `row_layout` must also be compatible with the new.
319            if schedule_compat && row_layout.is_compatible_with(new_row_layout) {
320                return Ok(());
321            }
322
323            Err(ChangeColumnsError {
324                table_id: schema.table_id,
325                table_name: schema.table_name.clone(),
326                old: schema.columns().to_vec(),
327                new: column_schemas.to_vec(),
328            })
329        }
330
331        unsafe { self.change_columns_to_unchecked(column_schemas, validate) }
332    }
333
334    /// Change the columns of `self` to those in `column_schemas`
335    /// and returns the old column schemas.
336    ///
337    /// Returns an error if the new list of column is incompatible with the old.
338    ///
339    /// # Safety
340    ///
341    /// The caller must ensure, using `validate`,
342    /// that `new_row_layout` is compatible with the rows existing in `self`.
343    pub unsafe fn change_columns_to_unchecked<E>(
344        &mut self,
345        column_schemas: Vec<ColumnSchema>,
346        validate: impl FnOnce(&Self, &RowTypeLayout, &[ColumnSchema]) -> Result<(), E>,
347    ) -> Result<Vec<ColumnSchema>, E> {
348        // Compute the new row type, layout, and related stuff.
349        let new_row_type: ProductType = columns_to_row_type(&column_schemas);
350        let (new_row_layout, new_static_layout, new_visitor_prog) = table_row_type_dependents(new_row_type.clone());
351
352        validate(self, &new_row_layout, &column_schemas)?;
353
354        // Set the new layout and friends.
355        self.inner.row_layout = new_row_layout;
356        self.inner.static_layout = new_static_layout;
357        self.inner.visitor_prog = new_visitor_prog;
358
359        // Update the schema.
360        let old_column_schemas = self.with_mut_schema(|s| {
361            s.row_type = new_row_type;
362            mem::replace(&mut s.columns, column_schemas)
363        });
364
365        // Recompute the index types.
366        self.compute_index_types();
367
368        Ok(old_column_schemas)
369    }
370
371    /// Change the row layout and schema to the one of `other`.
372    ///
373    /// # Safety
374    ///
375    /// This is safe when a `ChangeColumnsError` would not occur
376    /// when using `other.get_schema().columns.clone()`.
377    /// The actual safety requirements are more complex but the above
378    /// is a super-set of the actual requirements.
379    pub unsafe fn set_layout_and_schema_to(&mut self, other: &Table) {
380        self.inner.row_layout = other.inner.row_layout.clone();
381        self.inner.static_layout = other.inner.static_layout.clone();
382        self.inner.visitor_prog = other.inner.visitor_prog.clone();
383
384        self.use_schema_of(other);
385        self.compute_index_types();
386    }
387
388    /// Re-computes the index key types.
389    fn compute_index_types(&mut self) {
390        let schema = self.get_schema().clone();
391        let row_type = schema.get_row_type();
392        for index in self.indexes.values_mut() {
393            index.key_type = row_type
394                .project(&index.indexed_columns)
395                .expect("new row type should have as many columns as before")
396        }
397    }
398
399    /// Returns whether this is a scheduler table.
400    pub fn is_scheduler(&self) -> bool {
401        self.is_scheduler
402    }
403
404    /// Check if the `row` conflicts with any unique index on `self`,
405    /// and if there is a conflict, return `Err`.
406    ///
407    /// `is_deleted` is a predicate which, for a given row pointer,
408    /// returns true if and only if that row should be ignored.
409    /// While checking unique constraints against the committed state,
410    /// `MutTxId::insert` will ignore rows which are listed in the delete table.
411    ///
412    /// # Safety
413    ///
414    /// `row.row_layout() == self.row_layout()` must hold.
415    pub unsafe fn check_unique_constraints<'a, I: Iterator<Item = (&'a IndexId, &'a TableIndex)>>(
416        &'a self,
417        row: RowRef<'_>,
418        adapt: impl FnOnce(btree_map::Iter<'a, IndexId, TableIndex>) -> I,
419        mut is_deleted: impl FnMut(RowPointer) -> bool,
420    ) -> Result<(), UniqueConstraintViolation> {
421        for (&index_id, index) in adapt(self.indexes.iter()).filter(|(_, index)| index.is_unique()) {
422            // SAFETY: Caller promised that `row´ has the same layout as `self`.
423            // Thus, as `index.indexed_columns` is in-bounds of `self`'s layout,
424            // it's also in-bounds of `row`'s layout.
425            let value = unsafe { row.project_unchecked(&index.indexed_columns) };
426            if index.seek_point(&value).next().is_some_and(|ptr| !is_deleted(ptr)) {
427                return Err(self.build_error_unique(index, index_id, value));
428            }
429        }
430        Ok(())
431    }
432
433    /// Insert a `row` into this table, storing its large var-len members in the `blob_store`.
434    ///
435    /// On success, returns the hash, if any, of the newly-inserted row,
436    /// and a `RowRef` referring to the row.s
437    /// The hash is only computed if this table has a [`PointerMap`],
438    /// i.e., does not have any unique indexes.
439    /// If the table has unique indexes,
440    /// the returned `Option<RowHash>` will be `None`.
441    ///
442    /// When a row equal to `row` already exists in `self`,
443    /// returns `InsertError::Duplicate(existing_row_pointer)`,
444    /// where `existing_row_pointer` is a `RowPointer` which identifies the existing row.
445    /// In this case, the duplicate is not inserted,
446    /// but internal data structures may be altered in ways that affect performance and fragmentation.
447    ///
448    /// TODO(error-handling): describe errors from `write_row_to_pages` and return meaningful errors.
449    pub fn insert<'a>(
450        &'a mut self,
451        pool: &PagePool,
452        blob_store: &'a mut dyn BlobStore,
453        row: &ProductValue,
454    ) -> Result<(Option<RowHash>, RowRef<'a>), InsertError> {
455        // Optimistically insert the `row` before checking any constraints
456        // under the assumption that errors (unique constraint & set semantic violations) are rare.
457        let (row_ref, blob_bytes) = self.insert_physically_pv(pool, blob_store, row)?;
458        let row_ptr = row_ref.pointer();
459
460        // Confirm the insertion, checking any constraints, removing the physical row on error.
461        // SAFETY: We just inserted `ptr`, so it must be present.
462        // Re. `CHECK_SAME_ROW = true`,
463        // where `insert` is called, we are not dealing with transactions,
464        // and we already know there cannot be a duplicate row error,
465        // but we check just in case it isn't.
466        let (hash, row_ptr) = unsafe { self.confirm_insertion::<true>(blob_store, row_ptr, blob_bytes) }?;
467        // SAFETY: Per post-condition of `confirm_insertion`, `row_ptr` refers to a valid row.
468        let row_ref = unsafe { self.get_row_ref_unchecked(blob_store, row_ptr) };
469        Ok((hash, row_ref))
470    }
471
472    /// Physically inserts `row` into the page
473    /// without inserting it logically into the pointer map.
474    ///
475    /// This is useful when we need to insert a row temporarily to get back a `RowPointer`.
476    /// A call to this method should be followed by a call to [`delete_internal_skip_pointer_map`].
477    pub fn insert_physically_pv<'a>(
478        &'a mut self,
479        pool: &PagePool,
480        blob_store: &'a mut dyn BlobStore,
481        row: &ProductValue,
482    ) -> Result<(RowRef<'a>, BlobNumBytes), Error> {
483        // SAFETY: `self.pages` is known to be specialized for `self.row_layout`,
484        // as `self.pages` was constructed from `self.row_layout` in `Table::new`.
485        let (ptr, blob_bytes) = unsafe {
486            write_row_to_pages(
487                pool,
488                &mut self.inner.pages,
489                &self.inner.visitor_prog,
490                blob_store,
491                &self.inner.row_layout,
492                row,
493                self.squashed_offset,
494            )
495        }?;
496        // SAFETY: We just inserted `ptr`, so it must be present.
497        let row_ref = unsafe { self.inner.get_row_ref_unchecked(blob_store, self.squashed_offset, ptr) };
498
499        Ok((row_ref, blob_bytes))
500    }
501
502    /// Physically insert a `row`, encoded in BSATN, into this table,
503    /// storing its large var-len members in the `blob_store`.
504    ///
505    /// On success, returns the hash of the newly-inserted row,
506    /// and a `RowRef` referring to the row.
507    ///
508    /// This does not check for set semantic or unique constraints.
509    ///
510    /// This is also useful when we need to insert a row temporarily to get back a `RowPointer`.
511    /// In this case, A call to this method should be followed by a call to [`delete_internal_skip_pointer_map`].
512    ///
513    /// When `row` is not valid BSATN at the table's row type,
514    /// an error is returned and there will be nothing for the caller to revert.
515    pub fn insert_physically_bsatn<'a>(
516        &'a mut self,
517        pool: &PagePool,
518        blob_store: &'a mut dyn BlobStore,
519        row: &[u8],
520    ) -> Result<(RowRef<'a>, BlobNumBytes), Error> {
521        // Got a static layout? => Use fast-path insertion.
522        let (ptr, blob_bytes) = if let Some((static_layout, static_validator)) = self.inner.static_layout.as_ref() {
523            // Before inserting, validate the row, ensuring type safety.
524            // SAFETY: The `static_validator` was derived from the same row layout as the static layout.
525            unsafe { validate_bsatn(static_validator, static_layout, row) }.map_err(Error::Decode)?;
526
527            let fixed_row_size = self.inner.row_layout.size();
528            let squashed_offset = self.squashed_offset;
529            let res = self
530                .inner
531                .pages
532                .with_page_to_insert_row(pool, fixed_row_size, 0, |page| {
533                    // SAFETY: We've used the right `row_size` and we trust that others have too.
534                    // `RowTypeLayout` also ensures that we satisfy the minimum row size.
535                    let fixed_offset = unsafe { page.alloc_fixed_len(fixed_row_size) }.map_err(Error::PageError)?;
536                    let (mut fixed, _) = page.split_fixed_var_mut();
537                    let fixed_buf = fixed.get_row_mut(fixed_offset, fixed_row_size);
538                    // SAFETY:
539                    // - We've validated that `row` is of sufficient length.
540                    // - The `fixed_buf` is exactly the right `fixed_row_size`.
541                    unsafe { static_layout.deserialize_row_into(fixed_buf, row) };
542                    Ok(fixed_offset)
543                })
544                .map_err(Error::PagesError)?;
545            match res {
546                (page, Ok(offset)) => (RowPointer::new(false, page, offset, squashed_offset), 0.into()),
547                (_, Err(e)) => return Err(e),
548            }
549        } else {
550            // SAFETY: `self.pages` is known to be specialized for `self.row_layout`,
551            // as `self.pages` was constructed from `self.row_layout` in `Table::new`.
552            unsafe {
553                write_row_to_pages_bsatn(
554                    pool,
555                    &mut self.inner.pages,
556                    &self.inner.visitor_prog,
557                    blob_store,
558                    &self.inner.row_layout,
559                    row,
560                    self.squashed_offset,
561                )
562            }?
563        };
564
565        // SAFETY: We just inserted `ptr`, so it must be present.
566        let row_ref = unsafe { self.inner.get_row_ref_unchecked(blob_store, self.squashed_offset, ptr) };
567
568        Ok((row_ref, blob_bytes))
569    }
570
571    /// Returns all the columns with sequences that need generation for this `row`.
572    ///
573    /// # Safety
574    ///
575    /// `self.is_row_present(row)` must hold.
576    pub unsafe fn sequence_triggers_for<'a>(
577        &'a self,
578        blob_store: &'a dyn BlobStore,
579        row: RowPointer,
580    ) -> (ColList, SeqIdList) {
581        let sequences = &*self.get_schema().sequences;
582        let row_ty = self.row_layout().product();
583
584        // SAFETY: Caller promised that `self.is_row_present(row)` holds.
585        let row_ref = unsafe { self.get_row_ref_unchecked(blob_store, row) };
586
587        sequences
588            .iter()
589            // Find all the sequences that are triggered by this row.
590            .filter(|seq| {
591                // SAFETY: `seq.col_pos` is in-bounds of `row_ty.elements`
592                // as `row_ty` was derived from the same schema as `seq` is part of.
593                let elem_ty = unsafe { &row_ty.elements.get_unchecked(seq.col_pos.idx()) };
594                // SAFETY:
595                // - `elem_ty` appears as a column in the row type.
596                // - `AlgebraicValue` is compatible with all types.
597                let val = unsafe { AlgebraicValue::unchecked_read_column(row_ref, elem_ty) };
598                val.is_numeric_zero()
599            })
600            .map(|seq| (seq.col_pos, seq.sequence_id))
601            .unzip()
602    }
603
604    /// Writes `seq_val` to the column at `col_id` in the row identified by `ptr`.
605    ///
606    /// Truncates the `seq_val` to fit the type of the column.
607    ///
608    /// # Safety
609    ///
610    /// - `self.is_row_present(row)` must hold.
611    /// - `col_id` must be a valid column, with a primitive integer type, of the row type.
612    pub unsafe fn write_gen_val_to_col(&mut self, col_id: ColId, ptr: RowPointer, seq_val: i128) {
613        let row_ty = self.inner.row_layout.product();
614        // SAFETY: Caller promised that `col_id` was a valid column.
615        let elem_ty = unsafe { row_ty.elements.get_unchecked(col_id.idx()) };
616        let AlgebraicTypeLayout::Primitive(col_typ) = elem_ty.ty else {
617            // SAFETY: Columns with sequences must be primitive types.
618            unsafe { unreachable_unchecked() }
619        };
620
621        let fixed_row_size = self.inner.row_layout.size();
622        let fixed_buf = self.inner.pages[ptr.page_index()].get_fixed_row_data_mut(ptr.page_offset(), fixed_row_size);
623
624        fn write<const N: usize>(dst: &mut [u8], offset: u16, bytes: [u8; N]) {
625            let offset = offset as usize;
626            dst[offset..offset + N].copy_from_slice(&bytes);
627        }
628
629        match col_typ {
630            PrimitiveType::I8 => write(fixed_buf, elem_ty.offset, (seq_val as i8).to_le_bytes()),
631            PrimitiveType::U8 => write(fixed_buf, elem_ty.offset, (seq_val as u8).to_le_bytes()),
632            PrimitiveType::I16 => write(fixed_buf, elem_ty.offset, (seq_val as i16).to_le_bytes()),
633            PrimitiveType::U16 => write(fixed_buf, elem_ty.offset, (seq_val as u16).to_le_bytes()),
634            PrimitiveType::I32 => write(fixed_buf, elem_ty.offset, (seq_val as i32).to_le_bytes()),
635            PrimitiveType::U32 => write(fixed_buf, elem_ty.offset, (seq_val as u32).to_le_bytes()),
636            PrimitiveType::I64 => write(fixed_buf, elem_ty.offset, (seq_val as i64).to_le_bytes()),
637            PrimitiveType::U64 => write(fixed_buf, elem_ty.offset, (seq_val as u64).to_le_bytes()),
638            PrimitiveType::I128 => write(fixed_buf, elem_ty.offset, seq_val.to_le_bytes()),
639            PrimitiveType::U128 => write(fixed_buf, elem_ty.offset, (seq_val as u128).to_le_bytes()),
640            PrimitiveType::I256 => write(fixed_buf, elem_ty.offset, (i256::from(seq_val)).to_le_bytes()),
641            PrimitiveType::U256 => write(fixed_buf, elem_ty.offset, (u256::from(seq_val as u128)).to_le_bytes()),
642            // SAFETY: Columns with sequences must be integer types.
643            PrimitiveType::Bool | PrimitiveType::F32 | PrimitiveType::F64 => unsafe { unreachable_unchecked() },
644        }
645    }
646
647    /// Performs all the checks necessary after having fully decided on a rows contents.
648    ///
649    /// This includes inserting the row into any applicable indices and/or the pointer map.
650    ///
651    /// On `Ok(_)`, statistics of the table are also updated,
652    /// and the `ptr` still points to a valid row, and otherwise not.
653    ///
654    /// If `CHECK_SAME_ROW` holds, an identical row will be treated as a set-semantic duplicate.
655    /// Otherwise, it will be treated as a unique constraint violation.
656    /// However, `false` should only be passed if it's known beforehand that there is no identical row.
657    ///
658    /// # Safety
659    ///
660    /// `self.is_row_present(row)` must hold.
661    pub unsafe fn confirm_insertion<'a, const CHECK_SAME_ROW: bool>(
662        &'a mut self,
663        blob_store: &'a mut dyn BlobStore,
664        ptr: RowPointer,
665        blob_bytes: BlobNumBytes,
666    ) -> Result<(Option<RowHash>, RowPointer), InsertError> {
667        // SAFETY: Caller promised that `self.is_row_present(ptr)` holds.
668        let hash = unsafe { self.insert_into_pointer_map(blob_store, ptr) }?;
669        // SAFETY: Caller promised that `self.is_row_present(ptr)` holds.
670        unsafe { self.insert_into_indices::<CHECK_SAME_ROW>(blob_store, ptr) }?;
671
672        self.update_statistics_added_row(blob_bytes);
673        Ok((hash, ptr))
674    }
675
676    /// Confirms a row update, after first updating indices and checking constraints.
677    ///
678    /// On `Ok(_)`:
679    /// - the statistics of the table are also updated,
680    /// - the `ptr` still points to a valid row.
681    ///
682    /// Otherwise, on `Err(_)`:
683    /// - `ptr` will not point to a valid row,
684    /// - the statistics won't be updated.
685    ///
686    /// # Safety
687    ///
688    /// `self.is_row_present(new_row)` and `self.is_row_present(old_row)`  must hold.
689    pub unsafe fn confirm_update<'a>(
690        &'a mut self,
691        blob_store: &'a mut dyn BlobStore,
692        new_ptr: RowPointer,
693        old_ptr: RowPointer,
694        blob_bytes_added: BlobNumBytes,
695    ) -> Result<RowPointer, InsertError> {
696        // (1) Remove old row from indices.
697        // SAFETY: Caller promised that `self.is_row_present(old_ptr)` holds.
698        unsafe { self.delete_from_indices(blob_store, old_ptr) };
699
700        // Insert new row into indices.
701        // SAFETY: Caller promised that `self.is_row_present(ptr)` holds.
702        let res = unsafe { self.insert_into_indices::<true>(blob_store, new_ptr) };
703        if let Err(e) = res {
704            // Undo (1).
705            unsafe { self.insert_into_indices::<true>(blob_store, old_ptr) }
706                .expect("re-inserting the old row into indices should always work");
707            return Err(e);
708        }
709
710        // Remove the old row physically.
711        // SAFETY: The physical `old_ptr` still exists.
712        let blob_bytes_removed = unsafe { self.delete_internal_skip_pointer_map(blob_store, old_ptr) };
713        self.update_statistics_deleted_row(blob_bytes_removed);
714
715        // Update statistics.
716        self.update_statistics_added_row(blob_bytes_added);
717        Ok(new_ptr)
718    }
719
720    /// We've added a row, update the statistics to record this.
721    #[inline]
722    fn update_statistics_added_row(&mut self, blob_bytes: BlobNumBytes) {
723        self.row_count += 1;
724        self.blob_store_bytes += blob_bytes;
725    }
726
727    /// We've removed a row, update the statistics to record this.
728    #[inline]
729    fn update_statistics_deleted_row(&mut self, blob_bytes: BlobNumBytes) {
730        self.row_count -= 1;
731        self.blob_store_bytes -= blob_bytes;
732    }
733
734    /// Insert row identified by `new` into indices.
735    /// This also checks unique constraints.
736    /// Deletes the row if there were any violations.
737    ///
738    /// If `CHECK_SAME_ROW`, upon a unique constraint violation,
739    /// this will check if it's really a duplicate row.
740    /// Otherwise, the unique constraint violation is returned.
741    ///
742    /// SAFETY: `self.is_row_present(new)` must hold.
743    /// Post-condition: If this method returns `Ok(_)`, the row still exists.
744    unsafe fn insert_into_indices<'a, const CHECK_SAME_ROW: bool>(
745        &'a mut self,
746        blob_store: &'a mut dyn BlobStore,
747        new: RowPointer,
748    ) -> Result<(), InsertError> {
749        self.indexes
750            .iter_mut()
751            .try_for_each(|(index_id, index)| {
752                // SAFETY: We just inserted `ptr`, so it must be present.
753                let new = unsafe { self.inner.get_row_ref_unchecked(blob_store, self.squashed_offset, new) };
754                // SAFETY: any index in this table was constructed with the same row type as this table.
755                let violation = unsafe { index.check_and_insert(new) };
756                violation.map_err(|old| (*index_id, old, new))
757            })
758            .map_err(|(index_id, old, new)| {
759                // Found unique constraint violation!
760                if CHECK_SAME_ROW
761                    // If the index was added in this tx,
762                    // `old` could be a committed row,
763                    // which we want to avoid here.
764                    // TODO(centril): not 100% correct, could still be a duplicate,
765                    // but this is rather pathological and should be fixed when we restructure.
766                    && old.squashed_offset().is_tx_state()
767                    // SAFETY:
768                    // - The row layouts are the same as it's the same table.
769                    // - We know `old` exists in `self` as we just found it in an index.
770                    // - Caller promised that `new` is valid for `self`.
771                    && unsafe { Self::eq_row_in_page(self, old, self, new.pointer()) }
772                {
773                    return (index_id, DuplicateError(old).into());
774                }
775
776                let index = self.indexes.get(&index_id).unwrap();
777                let value = new.project(&index.indexed_columns).unwrap();
778                let error = self.build_error_unique(index, index_id, value).into();
779                (index_id, error)
780            })
781            .map_err(|(index_id, error)| {
782                // Delete row from indices.
783                // Do this before the actual deletion, as `index.delete` needs a `RowRef`
784                // so it can extract the appropriate value.
785                // SAFETY: We just inserted `new`, so it must be present.
786                unsafe { self.delete_from_indices_until(blob_store, new, index_id) };
787
788                // Cleanup, undo the row insertion of `new`s.
789                // SAFETY: We just inserted `new`, so it must be present.
790                unsafe { self.delete_internal(blob_store, new) };
791
792                error
793            })
794    }
795
796    /// Finds the [`RowPointer`] to the row in `target_table` equal, if any,
797    /// to the row `needle_ptr` in `needle_table`,
798    /// by any unique index in `target_table`.
799    ///
800    /// # Safety
801    ///
802    /// - `target_table` and `needle_table` must have the same `row_layout`.
803    /// - `needle_table.is_row_present(needle_ptr)` must hold.
804    unsafe fn find_same_row_via_unique_index(
805        target_table: &Table,
806        needle_table: &Table,
807        needle_bs: &dyn BlobStore,
808        needle_ptr: RowPointer,
809    ) -> Option<RowPointer> {
810        // Use some index (the one with the lowest `IndexId` currently).
811        // TODO(centril): this isn't what we actually want.
812        // Rather, we'd prefer the index with the simplest type,
813        // but this is left as future work as we don't have to optimize this method now.
814        let target_index = target_table
815            .indexes
816            .values()
817            .find(|idx| idx.is_unique())
818            .expect("there should be at least one unique index");
819        // Project the needle row to the columns of the index, and then seek.
820        // As this is a unique index, there are 0-1 rows for this key.
821        let needle_row = unsafe { needle_table.get_row_ref_unchecked(needle_bs, needle_ptr) };
822        let key = needle_row
823            .project(&target_index.indexed_columns)
824            .expect("needle row should be valid");
825        target_index.seek_point(&key).next().filter(|&target_ptr| {
826            // SAFETY:
827            // - Caller promised that the row layouts were the same.
828            // - We know `target_ptr` exists, as it was in `target_index`, belonging to `target_table`.
829            // - Caller promised that `needle_ptr` is valid for `needle_table`.
830            unsafe { Self::eq_row_in_page(target_table, target_ptr, needle_table, needle_ptr) }
831        })
832    }
833
834    /// Insert the row identified by `ptr` into the table's [`PointerMap`],
835    /// if the table has one.
836    ///
837    /// This checks for set semantic violations.
838    /// If a set semantic conflict (i.e. duplicate row) is detected by the pointer map,
839    /// the row will be deleted and an error returned.
840    /// If the pointer map confirms that the row was unique, returns the `RowHash` of that row.
841    ///
842    /// If this table has no `PointerMap`, returns `Ok(None)`.
843    /// In that case, the row's uniqueness will be verified by [`Self::insert_into_indices`],
844    /// as this table has at least one unique index.
845    ///
846    /// SAFETY: `self.is_row_present(row)` must hold.
847    /// Post-condition: If this method returns `Ok(_)`, the row still exists.
848    unsafe fn insert_into_pointer_map<'a>(
849        &'a mut self,
850        blob_store: &'a mut dyn BlobStore,
851        ptr: RowPointer,
852    ) -> Result<Option<RowHash>, DuplicateError> {
853        if self.pointer_map.is_none() {
854            // No pointer map? Set semantic constraint is checked by a unique index instead.
855            return Ok(None);
856        };
857
858        // SAFETY:
859        // - `self` trivially has the same `row_layout` as `self`.
860        // - Caller promised that `self.is_row_present(row)` holds.
861        let (hash, existing_row) = unsafe { Self::find_same_row_via_pointer_map(self, self, blob_store, ptr, None) };
862
863        if let Some(existing_row) = existing_row {
864            // If an equal row was already present,
865            // roll back our optimistic insert to avoid violating set semantics.
866
867            // SAFETY: Caller promised that `ptr` is a valid row in `self`.
868            unsafe {
869                self.inner
870                    .pages
871                    .delete_row(&self.inner.visitor_prog, self.row_size(), ptr, blob_store)
872            };
873            return Err(DuplicateError(existing_row));
874        }
875
876        // If the optimistic insertion was correct,
877        // i.e. this is not a set-semantic duplicate,
878        // add it to the `pointer_map`.
879        self.pointer_map
880            .as_mut()
881            .expect("pointer map should exist, as it did previously")
882            .insert(hash, ptr);
883
884        Ok(Some(hash))
885    }
886
887    /// Returns the list of pointers to rows which hash to `row_hash`.
888    ///
889    /// If `self` does not have a [`PointerMap`], always returns the empty slice.
890    fn pointers_for(&self, row_hash: RowHash) -> &[RowPointer] {
891        self.pointer_map.as_ref().map_or(&[], |pm| pm.pointers_for(row_hash))
892    }
893
894    /// Using the [`PointerMap`],
895    /// searches `target_table` for a row equal to `needle_table[needle_ptr]`.
896    ///
897    /// Rows are compared for equality by [`eq_row_in_page`].
898    ///
899    /// Lazily computes the row hash if needed and returns it, or uses the one provided, if any.
900    ///
901    /// Used for detecting set-semantic duplicates when inserting
902    /// into tables without any unique constraints.
903    ///
904    /// Does nothing and always returns `None` if `target_table` does not have a `PointerMap`,
905    /// in which case the caller should instead use [`Self::find_same_row_via_unique_index`].
906    ///
907    /// Note that we don't need the blob store to compute equality,
908    /// as content-addressing means it's sufficient to compare the hashes of large blobs.
909    /// (If we see a collision in `BlobHash` we have bigger problems.)
910    ///
911    /// # Safety
912    ///
913    /// - `target_table` and `needle_table` must have the same `row_layout`.
914    /// - `needle_table.is_row_present(needle_ptr)`.
915    pub unsafe fn find_same_row_via_pointer_map(
916        target_table: &Table,
917        needle_table: &Table,
918        needle_bs: &dyn BlobStore,
919        needle_ptr: RowPointer,
920        row_hash: Option<RowHash>,
921    ) -> (RowHash, Option<RowPointer>) {
922        let row_hash = row_hash.unwrap_or_else(|| {
923            // SAFETY: Caller promised that `needle_table.is_row_present(needle_ptr)`.
924            let row_ref = unsafe { needle_table.get_row_ref_unchecked(needle_bs, needle_ptr) };
925            row_ref.row_hash()
926        });
927
928        // Scan all the frow pointers with `row_hash` in the `committed_table`.
929        let row_ptr = target_table.pointers_for(row_hash).iter().copied().find(|&target_ptr| {
930            // SAFETY:
931            // - Caller promised that the row layouts were the same.
932            // - We know `target_ptr` exists, as it was found in a pointer map.
933            // - Caller promised that `needle_ptr` is valid for `needle_table`.
934            unsafe { Self::eq_row_in_page(target_table, target_ptr, needle_table, needle_ptr) }
935        });
936
937        (row_hash, row_ptr)
938    }
939
940    /// Returns whether the row `target_ptr` in `target_table`
941    /// is exactly equal to the row `needle_ptr` in `needle_ptr`.
942    ///
943    /// # Safety
944    ///
945    /// - `target_table` and `needle_table` must have the same `row_layout`.
946    /// - `target_table.is_row_present(target_ptr)`.
947    /// - `needle_table.is_row_present(needle_ptr)`.
948    pub unsafe fn eq_row_in_page(
949        target_table: &Table,
950        target_ptr: RowPointer,
951        needle_table: &Table,
952        needle_ptr: RowPointer,
953    ) -> bool {
954        let (target_page, target_offset) = target_table.inner.page_and_offset(target_ptr);
955        let (needle_page, needle_offset) = needle_table.inner.page_and_offset(needle_ptr);
956
957        // SAFETY:
958        // - Caller promised that `target_ptr` is valid, so `target_page` and `target_offset` are both valid.
959        // - Caller promised that `needle_ptr` is valid, so `needle_page` and `needle_offset` are both valid.
960        // - Caller promised that the layouts of `target_table` and `needle_table` are the same,
961        //   so `target_table` applies to both.
962        //   Moreover `(x: Table).inner.static_layout` is always derived from `x.row_layout`.
963        unsafe {
964            eq_row_in_page(
965                target_page,
966                needle_page,
967                target_offset,
968                needle_offset,
969                &target_table.inner.row_layout,
970                target_table.static_layout(),
971            )
972        }
973    }
974
975    /// Searches `target_table` for a row equal to `needle_table[needle_ptr]`,
976    /// and returns the [`RowPointer`] to that row in `target_table`, if it exists.
977    ///
978    /// Searches using the [`PointerMap`] or a unique index, as appropriate for the table.
979    ///
980    /// Lazily computes the row hash if needed and returns it, or uses the one provided, if any.
981    ///
982    /// # Safety
983    ///
984    /// - `target_table` and `needle_table` must have the same `row_layout`.
985    /// - `needle_table.is_row_present(needle_ptr)` must hold.
986    pub unsafe fn find_same_row(
987        target_table: &Table,
988        needle_table: &Table,
989        needle_bs: &dyn BlobStore,
990        needle_ptr: RowPointer,
991        row_hash: Option<RowHash>,
992    ) -> (Option<RowHash>, Option<RowPointer>) {
993        if target_table.pointer_map.is_some() {
994            // SAFETY: Caller promised that `target_table` and `needle_table` have the same `row_layout`.
995            // SAFETY: Caller promised that `needle_table.is_row_present(needle_ptr)`.
996            let (row_hash, row_ptr) = unsafe {
997                Self::find_same_row_via_pointer_map(target_table, needle_table, needle_bs, needle_ptr, row_hash)
998            };
999            (Some(row_hash), row_ptr)
1000        } else {
1001            (
1002                row_hash,
1003                // SAFETY: Caller promised that `target_table` and `needle_table` have the same `row_layout`.
1004                // SAFETY: Caller promised that `needle_table.is_row_present(needle_ptr)`.
1005                unsafe { Self::find_same_row_via_unique_index(target_table, needle_table, needle_bs, needle_ptr) },
1006            )
1007        }
1008    }
1009
1010    /// Returns a [`RowRef`] for `ptr` or `None` if the row isn't present.
1011    pub fn get_row_ref<'a>(&'a self, blob_store: &'a dyn BlobStore, ptr: RowPointer) -> Option<RowRef<'a>> {
1012        self.is_row_present(ptr)
1013            // SAFETY: We only call `get_row_ref_unchecked` when `is_row_present` holds.
1014            .then(|| unsafe { self.get_row_ref_unchecked(blob_store, ptr) })
1015    }
1016
1017    /// Assumes `ptr` is a present row in `self` and returns a [`RowRef`] to it.
1018    ///
1019    /// # Safety
1020    ///
1021    /// The requirement is that `self.is_row_present(ptr)` must hold.
1022    /// That is, `ptr` must refer to a row within `self`
1023    /// which was previously inserted and has not been deleted since.
1024    ///
1025    /// This means:
1026    /// - The `PageIndex` of `ptr` must be in-bounds for `self.pages`.
1027    /// - The `PageOffset` of `ptr` must be properly aligned for the row type of `self`,
1028    ///   and must refer to a valid, live row in that page.
1029    /// - The `SquashedOffset` of `ptr` must match `self.squashed_offset`.
1030    ///
1031    /// Showing that `ptr` was the result of a call to [`Table::insert(table, ..)`]
1032    /// and has not been passed to [`Table::delete(table, ..)`]
1033    /// is sufficient to demonstrate all of these properties.
1034    pub unsafe fn get_row_ref_unchecked<'a>(&'a self, blob_store: &'a dyn BlobStore, ptr: RowPointer) -> RowRef<'a> {
1035        // SAFETY: Caller promised that ^-- holds.
1036        unsafe { self.inner.get_row_ref_unchecked(blob_store, self.squashed_offset, ptr) }
1037    }
1038
1039    /// Deletes a row in the page manager
1040    /// without deleting it logically in the pointer map.
1041    ///
1042    /// # Safety
1043    ///
1044    /// `ptr` must point to a valid, live row in this table.
1045    pub unsafe fn delete_internal_skip_pointer_map(
1046        &mut self,
1047        blob_store: &mut dyn BlobStore,
1048        ptr: RowPointer,
1049    ) -> BlobNumBytes {
1050        debug_assert!(self.is_row_present(ptr));
1051        // Delete the physical row.
1052        //
1053        // SAFETY:
1054        // - `ptr` points to a valid row in this table, per our invariants.
1055        // - `self.row_size` known to be consistent with `self.pages`,
1056        //    as the two are tied together in `Table::new`.
1057        unsafe {
1058            self.inner
1059                .pages
1060                .delete_row(&self.inner.visitor_prog, self.row_size(), ptr, blob_store)
1061        }
1062    }
1063
1064    /// Deletes the row identified by `ptr` from the table.
1065    ///
1066    /// Returns the number of blob bytes added. This method does not update statistics by itself.
1067    ///
1068    /// NOTE: This method skips updating indexes.
1069    /// Use `delete_unchecked` or `delete` to delete a row with index updating.
1070    ///
1071    /// SAFETY: `self.is_row_present(row)` must hold.
1072    unsafe fn delete_internal(&mut self, blob_store: &mut dyn BlobStore, ptr: RowPointer) -> BlobNumBytes {
1073        // Remove the set semantic association.
1074        if let Some(pointer_map) = &mut self.pointer_map {
1075            // SAFETY: `self.is_row_present(row)` holds.
1076            let row = unsafe { self.inner.get_row_ref_unchecked(blob_store, self.squashed_offset, ptr) };
1077
1078            let _remove_result = pointer_map.remove(row.row_hash(), ptr);
1079            debug_assert!(_remove_result);
1080        }
1081
1082        // Delete the physical row.
1083        // SAFETY: `ptr` points to a valid row in this table as `self.is_row_present(row)` holds.
1084        unsafe { self.delete_internal_skip_pointer_map(blob_store, ptr) }
1085    }
1086
1087    /// Deletes the row identified by `ptr` from the table.
1088    ///
1089    /// Returns the number of blob bytes deleted. This method does not update statistics by itself.
1090    ///
1091    /// SAFETY: `self.is_row_present(row)` must hold.
1092    unsafe fn delete_unchecked(&mut self, blob_store: &mut dyn BlobStore, ptr: RowPointer) -> BlobNumBytes {
1093        // Delete row from indices.
1094        // Do this before the actual deletion, as `index.delete` needs a `RowRef`
1095        // so it can extract the appropriate value.
1096        // SAFETY: Caller promised that `self.is_row_present(row)` holds.
1097        unsafe { self.delete_from_indices(blob_store, ptr) };
1098
1099        // SAFETY: Caller promised that `self.is_row_present(row)` holds.
1100        unsafe { self.delete_internal(blob_store, ptr) }
1101    }
1102
1103    /// Delete `row_ref` from all the indices of this table until `index_id` is reached.
1104    /// The range is exclusive of `index_id`.
1105    ///
1106    /// SAFETY: `self.is_row_present(row)` must hold.
1107    unsafe fn delete_from_indices_until(&mut self, blob_store: &dyn BlobStore, ptr: RowPointer, index_id: IndexId) {
1108        // SAFETY: Caller promised that `self.is_row_present(row)` holds.
1109        let row_ref = unsafe { self.inner.get_row_ref_unchecked(blob_store, self.squashed_offset, ptr) };
1110
1111        for (_, index) in self.indexes.range_mut(..index_id) {
1112            index.delete(row_ref).unwrap();
1113        }
1114    }
1115
1116    /// Delete `row_ref` from all the indices of this table.
1117    ///
1118    /// SAFETY: `self.is_row_present(row)` must hold.
1119    unsafe fn delete_from_indices(&mut self, blob_store: &dyn BlobStore, ptr: RowPointer) {
1120        // SAFETY: Caller promised that `self.is_row_present(row)` holds.
1121        let row_ref = unsafe { self.inner.get_row_ref_unchecked(blob_store, self.squashed_offset, ptr) };
1122
1123        for index in self.indexes.values_mut() {
1124            index.delete(row_ref).unwrap();
1125        }
1126    }
1127
1128    /// Deletes the row identified by `ptr` from the table.
1129    ///
1130    /// The function `before` is run on the to-be-deleted row,
1131    /// if it is present, before deleting.
1132    /// This enables callers to extract the deleted row.
1133    /// E.g. applying deletes when squashing/merging a transaction into the committed state
1134    /// passes `|row| row.to_product_value()` as `before`
1135    /// so that the resulting `ProductValue`s can be passed to the subscription evaluator.
1136    pub fn delete<'a, R>(
1137        &'a mut self,
1138        blob_store: &'a mut dyn BlobStore,
1139        ptr: RowPointer,
1140        before: impl for<'b> FnOnce(RowRef<'b>) -> R,
1141    ) -> Option<R> {
1142        if !self.is_row_present(ptr) {
1143            return None;
1144        };
1145
1146        // SAFETY: We only call `get_row_ref_unchecked` when `is_row_present` holds.
1147        let row_ref = unsafe { self.get_row_ref_unchecked(blob_store, ptr) };
1148
1149        let ret = before(row_ref);
1150
1151        // SAFETY: We've checked above that `self.is_row_present(ptr)`.
1152        let blob_bytes_deleted = unsafe { self.delete_unchecked(blob_store, ptr) };
1153        self.update_statistics_deleted_row(blob_bytes_deleted);
1154
1155        Some(ret)
1156    }
1157
1158    /// If a row exists in `self` which matches `row`
1159    /// by [`Table::find_same_row`],
1160    /// delete that row.
1161    ///
1162    /// If a matching row was found, returns the pointer to that row.
1163    /// The returned pointer is now invalid, as the row to which it referred has been deleted.
1164    ///
1165    /// This operation works by temporarily inserting the `row` into `self`,
1166    /// checking `find_same_row` on the newly-inserted row,
1167    /// deleting the matching row if it exists,
1168    /// then deleting the temporary insertion.
1169    pub fn delete_equal_row(
1170        &mut self,
1171        pool: &PagePool,
1172        blob_store: &mut dyn BlobStore,
1173        row: &ProductValue,
1174    ) -> Result<Option<RowPointer>, Error> {
1175        // Insert `row` temporarily so `temp_ptr` and `hash` can be used to find the row.
1176        // This must avoid consulting and inserting to the pointer map,
1177        // as the row is already present, set-semantically.
1178        let (temp_row, _) = self.insert_physically_pv(pool, blob_store, row)?;
1179        let temp_ptr = temp_row.pointer();
1180
1181        // Find the row equal to the passed-in `row`.
1182        // This uses one of two approaches.
1183        // Either there is a pointer map, so we use that,
1184        // or, here is at least one unique index, so we use one of them.
1185        //
1186        // SAFETY:
1187        // - `self` trivially has the same `row_layout` as `self`.
1188        // - We just inserted `temp_ptr`, so it's valid.
1189        let (_, existing_row_ptr) = unsafe { Self::find_same_row(self, self, blob_store, temp_ptr, None) };
1190
1191        // If an equal row was present, delete it.
1192        if let Some(existing_row_ptr) = existing_row_ptr {
1193            let blob_bytes_deleted = unsafe {
1194                // SAFETY: `find_same_row` ensures that the pointer is valid.
1195                self.delete_unchecked(blob_store, existing_row_ptr)
1196            };
1197            self.update_statistics_deleted_row(blob_bytes_deleted);
1198        }
1199
1200        // Remove the temporary row we inserted in the beginning.
1201        // Avoid the pointer map, since we don't want to delete it twice.
1202        // SAFETY: `ptr` is valid as we just inserted it.
1203        unsafe {
1204            self.delete_internal_skip_pointer_map(blob_store, temp_ptr);
1205        }
1206
1207        Ok(existing_row_ptr)
1208    }
1209
1210    /// Returns the row type for rows in this table.
1211    pub fn get_row_type(&self) -> &ProductType {
1212        self.get_schema().get_row_type()
1213    }
1214
1215    /// Returns the schema for this table.
1216    pub fn get_schema(&self) -> &Arc<TableSchema> {
1217        &self.schema
1218    }
1219
1220    /// Runs a mutation on the [`TableSchema`] of this table.
1221    ///
1222    /// This uses a clone-on-write mechanism.
1223    /// If none but `self` refers to the schema, then the mutation will be in-place.
1224    /// Otherwise, the schema must be cloned, mutated,
1225    /// and then the cloned version is written back to the table.
1226    pub fn with_mut_schema<R>(&mut self, with: impl FnOnce(&mut TableSchema) -> R) -> R {
1227        with(Arc::make_mut(&mut self.schema))
1228    }
1229
1230    /// Runs a mutation on the [`TableSchema`] of this table
1231    /// and then sets the schema of `other` to that of `self`.
1232    pub fn with_mut_schema_and_clone<R>(&mut self, other: &mut Table, with: impl FnOnce(&mut TableSchema) -> R) -> R {
1233        let ret = self.with_mut_schema(with);
1234        other.use_schema_of(self);
1235        ret
1236    }
1237
1238    /// Makes `self` use the schema of `other`.
1239    ///
1240    /// Here, `self` will typically be a commit table and `other` a tx table, or the reverse.
1241    fn use_schema_of(&mut self, other: &Self) {
1242        self.schema = other.get_schema().clone();
1243    }
1244
1245    /// Returns a new [`TableIndex`] for `table`.
1246    pub fn new_index(&self, algo: &IndexAlgorithm, is_unique: bool) -> Result<TableIndex, InvalidFieldError> {
1247        TableIndex::new(self.get_schema().get_row_type(), algo, is_unique)
1248    }
1249
1250    /// Inserts a new `index` into the table.
1251    ///
1252    /// The index will be populated using the rows of the table.
1253    ///
1254    /// # Panics
1255    ///
1256    /// Panics if any row would violate `index`'s unique constraint, if it has one.
1257    ///
1258    /// # Safety
1259    ///
1260    /// Caller must promise that `index` was constructed with the same row type/layout as this table.
1261    pub unsafe fn insert_index(&mut self, blob_store: &dyn BlobStore, index_id: IndexId, mut index: TableIndex) {
1262        let rows = self.scan_rows(blob_store);
1263        // SAFETY: Caller promised that table's row type/layout
1264        // matches that which `index` was constructed with.
1265        // It follows that this applies to any `rows`, as required.
1266        let violation = unsafe { index.build_from_rows(rows) };
1267        violation.unwrap_or_else(|ptr| {
1268            panic!("adding `index` should cause no unique constraint violations, but {ptr:?} would")
1269        });
1270        // SAFETY: Forward caller requirement.
1271        unsafe { self.add_index(index_id, index) };
1272    }
1273
1274    /// Adds an index to the table without populating.
1275    ///
1276    /// # Safety
1277    ///
1278    /// Caller must promise that `index` was constructed with the same row type/layout as this table.
1279    pub unsafe fn add_index(&mut self, index_id: IndexId, index: TableIndex) -> Option<PointerMap> {
1280        let is_unique = index.is_unique();
1281        self.indexes.insert(index_id, index);
1282
1283        // Remove the pointer map, if any.
1284        if is_unique {
1285            self.pointer_map.take()
1286        } else {
1287            None
1288        }
1289    }
1290
1291    /// Removes an index from the table.
1292    ///
1293    /// Returns whether an index existed with `index_id`.
1294    pub fn delete_index(
1295        &mut self,
1296        blob_store: &dyn BlobStore,
1297        index_id: IndexId,
1298        pointer_map: Option<PointerMap>,
1299    ) -> Option<TableIndex> {
1300        let index = self.indexes.remove(&index_id)?;
1301
1302        // If we removed the last unique index, add a pointer map.
1303        if index.is_unique() && !self.indexes.values().any(|idx| idx.is_unique()) {
1304            self.pointer_map = Some(pointer_map.unwrap_or_else(|| self.rebuild_pointer_map(blob_store)));
1305        }
1306
1307        Some(index)
1308    }
1309
1310    /// Returns an iterator over all the rows of `self`, yielded as [`RefRef`]s.
1311    pub fn scan_rows<'a>(&'a self, blob_store: &'a dyn BlobStore) -> TableScanIter<'a> {
1312        TableScanIter {
1313            current_page: None, // Will be filled by the iterator.
1314            current_page_idx: PageIndex(0),
1315            table: self,
1316            blob_store,
1317        }
1318    }
1319
1320    /// Returns this table combined with the index for [`IndexId`], if any.
1321    pub fn get_index_by_id_with_table<'a>(
1322        &'a self,
1323        blob_store: &'a dyn BlobStore,
1324        index_id: IndexId,
1325    ) -> Option<TableAndIndex<'a>> {
1326        Some(TableAndIndex {
1327            table: self,
1328            blob_store,
1329            index: self.get_index_by_id(index_id)?,
1330        })
1331    }
1332
1333    /// Returns the [`TableIndex`] for this [`IndexId`].
1334    pub fn get_index_by_id(&self, index_id: IndexId) -> Option<&TableIndex> {
1335        self.indexes.get(&index_id)
1336    }
1337
1338    /// Returns this table combined with the first index with `cols`, if any.
1339    pub fn get_index_by_cols_with_table<'a>(
1340        &'a self,
1341        blob_store: &'a dyn BlobStore,
1342        cols: &ColList,
1343    ) -> Option<TableAndIndex<'a>> {
1344        let (_, index) = self.get_index_by_cols(cols)?;
1345        Some(TableAndIndex {
1346            table: self,
1347            blob_store,
1348            index,
1349        })
1350    }
1351
1352    /// Returns the first [`TableIndex`] with the given [`ColList`].
1353    pub fn get_index_by_cols(&self, cols: &ColList) -> Option<(IndexId, &TableIndex)> {
1354        self.indexes
1355            .iter()
1356            .find(|(_, index)| &index.indexed_columns == cols)
1357            .map(|(id, idx)| (*id, idx))
1358    }
1359
1360    /// Clones the structure of this table into a new one with
1361    /// the same schema, visitor program, and indices.
1362    /// The new table will be completely empty
1363    /// and will use the given `squashed_offset` instead of that of `self`.
1364    pub fn clone_structure(&self, squashed_offset: SquashedOffset) -> Self {
1365        // Clone a bunch of static data.
1366        // NOTE(centril): It's important that these be cheap to clone.
1367        // This is why they are all `Arc`ed or have some sort of small-vec optimization.
1368        let schema = self.schema.clone();
1369        let layout = self.row_layout().clone();
1370        let sbl = self.inner.static_layout.clone();
1371        let visitor = self.inner.visitor_prog.clone();
1372
1373        // If we had a pointer map, we'll have one in the cloned one as well, but empty.
1374        let pm = self.pointer_map.as_ref().map(|_| PointerMap::default());
1375
1376        // Make the new table.
1377        let mut new = Table::new_raw(schema, layout, sbl, visitor, squashed_offset, pm);
1378
1379        // Clone the index structure. The table is empty, so no need to `build_from_rows`.
1380        for (&index_id, index) in self.indexes.iter() {
1381            new.indexes.insert(index_id, index.clone_structure());
1382        }
1383        new
1384    }
1385
1386    /// Returns the number of bytes occupied by the pages and the blob store.
1387    /// Note that result can be more than the actual physical size occupied by the table
1388    /// because the blob store implementation can do internal optimizations.
1389    /// For more details, refer to the documentation of `self.blob_store_bytes`.
1390    pub fn bytes_occupied_overestimate(&self) -> usize {
1391        (self.num_pages() * PAGE_DATA_SIZE) + (self.blob_store_bytes.0)
1392    }
1393
1394    /// Reset the internal storage of `self` to be `pages`.
1395    ///
1396    /// This recomputes the pointer map based on the `pages`,
1397    /// but does not recompute indexes.
1398    ///
1399    /// Used when restoring from a snapshot.
1400    ///
1401    /// # Safety
1402    ///
1403    /// The schema of rows stored in the `pages` must exactly match `self.schema` and `self.inner.row_layout`.
1404    pub unsafe fn set_pages(&mut self, pages: Vec<Box<Page>>, blob_store: &dyn BlobStore) {
1405        self.inner.pages.set_contents(pages, self.inner.row_layout.size());
1406
1407        // Recompute table metadata based on the new pages.
1408        // Compute the row count first, in case later computations want to use it as a capacity to pre-allocate.
1409        self.compute_row_count(blob_store);
1410        self.pointer_map = Some(self.rebuild_pointer_map(blob_store));
1411    }
1412
1413    /// Consumes the table, returning some constituents needed for merge.
1414    pub fn consume_for_merge(
1415        self,
1416    ) -> (
1417        Arc<TableSchema>,
1418        impl Iterator<Item = (IndexId, TableIndex)>,
1419        impl Iterator<Item = Box<Page>>,
1420    ) {
1421        (self.schema, self.indexes.into_iter(), self.inner.pages.into_page_iter())
1422    }
1423
1424    /// Returns the number of rows resident in this table.
1425    ///
1426    /// This method runs in constant time.
1427    pub fn num_rows(&self) -> u64 {
1428        self.row_count
1429    }
1430
1431    #[cfg(test)]
1432    fn reconstruct_num_rows(&self) -> u64 {
1433        self.pages().iter().map(|page| page.reconstruct_num_rows() as u64).sum()
1434    }
1435
1436    /// Returns the number of bytes used by rows resident in this table.
1437    ///
1438    /// This includes data bytes, padding bytes and some overhead bytes,
1439    /// as described in the docs for [`Page::bytes_used_by_rows`],
1440    /// but *does not* include:
1441    ///
1442    /// - Unallocated space within pages.
1443    /// - Per-page overhead (e.g. page headers).
1444    /// - Table overhead (e.g. the [`RowTypeLayout`], [`PointerMap`], [`Schema`] &c).
1445    /// - Indexes.
1446    /// - Large blobs in the [`BlobStore`].
1447    ///
1448    /// Of these, the caller should inspect the blob store in order to account for memory usage by large blobs,
1449    /// and call [`Self::bytes_used_by_index_keys`] to account for indexes,
1450    /// but we intend to eat all the other overheads when billing.
1451    ///
1452    // TODO(perf, centril): consider storing the total number of granules in the table instead
1453    // so that this runs in constant time rather than O(|Pages|).
1454    pub fn bytes_used_by_rows(&self) -> u64 {
1455        self.pages()
1456            .iter()
1457            .map(|page| page.bytes_used_by_rows(self.inner.row_layout.size()) as u64)
1458            .sum()
1459    }
1460
1461    #[cfg(test)]
1462    fn reconstruct_bytes_used_by_rows(&self) -> u64 {
1463        self.pages()
1464            .iter()
1465            .map(|page| unsafe {
1466                // Safety: `page` is in `self`, and was constructed using `self.innser.row_layout` and `self.inner.visitor_prog`,
1467                // so the three are mutually consistent.
1468                page.reconstruct_bytes_used_by_rows(self.inner.row_layout.size(), &self.inner.visitor_prog)
1469            } as u64)
1470            .sum()
1471    }
1472
1473    /// Returns the number of indices in this table.
1474    pub fn num_indices(&self) -> usize {
1475        self.indexes.len()
1476    }
1477
1478    /// Returns the number of rows (or [`RowPointer`]s, more accurately)
1479    /// stored in indexes by this table.
1480    ///
1481    /// This method runs in constant time.
1482    pub fn num_rows_in_indexes(&self) -> u64 {
1483        // Assume that each index contains all rows in the table.
1484        self.num_rows() * self.indexes.len() as u64
1485    }
1486
1487    /// Returns the number of bytes used by keys stored in indexes by this table.
1488    ///
1489    /// This method scales in runtime with the number of indexes in the table,
1490    /// but not with the number of pages or rows.
1491    ///
1492    /// Key size is measured using a metric called "key size" or "data size,"
1493    /// which is intended to capture the number of live user-supplied bytes,
1494    /// not including representational overhead.
1495    /// This is distinct from the BFLATN size measured by [`Self::bytes_used_by_rows`].
1496    /// See the trait [`crate::btree_index::KeySize`] for specifics on the metric measured.
1497    pub fn bytes_used_by_index_keys(&self) -> u64 {
1498        self.indexes.values().map(|idx| idx.num_key_bytes()).sum()
1499    }
1500}
1501
1502/// A reference to a single row within a table.
1503///
1504/// # Safety
1505///
1506/// Having a `r: RowRef` is a proof that [`r.pointer()`](RowRef::pointer) refers to a valid row.
1507/// This makes constructing a `RowRef`, i.e., `RowRef::new`, an `unsafe` operation.
1508#[derive(Copy, Clone)]
1509pub struct RowRef<'a> {
1510    /// The table that has the row at `self.pointer`.
1511    table: &'a TableInner,
1512    /// The blob store used in case there are blob hashes to resolve.
1513    blob_store: &'a dyn BlobStore,
1514    /// The pointer to the row in `self.table`.
1515    pointer: RowPointer,
1516}
1517
1518impl fmt::Debug for RowRef<'_> {
1519    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1520        fmt.debug_struct("RowRef")
1521            .field("pointer", &self.pointer)
1522            .field("value", &self.to_product_value())
1523            .finish_non_exhaustive()
1524    }
1525}
1526
1527impl<'a> RowRef<'a> {
1528    /// Construct a `RowRef` to the row at `pointer` within `table`.
1529    ///
1530    /// # Safety
1531    ///
1532    /// `pointer` must refer to a row within `table`
1533    /// which was previously inserted and has not been deleted since.
1534    ///
1535    /// This means:
1536    /// - The `PageIndex` of `pointer` must be in-bounds for `table.pages`.
1537    /// - The `PageOffset` of `pointer` must be properly aligned for the row type of `table`,
1538    ///   and must refer to a valid, live row in that page.
1539    /// - The `SquashedOffset` of `pointer` must match `table.squashed_offset`.
1540    ///
1541    /// Showing that `pointer` was the result of a call to `table.insert`
1542    /// and has not been passed to `table.delete`
1543    /// is sufficient to demonstrate all of these properties.
1544    unsafe fn new(
1545        table: &'a TableInner,
1546        blob_store: &'a dyn BlobStore,
1547        _squashed_offset: SquashedOffset,
1548        pointer: RowPointer,
1549    ) -> Self {
1550        debug_assert!(table.is_row_present(_squashed_offset, pointer));
1551        Self {
1552            table,
1553            blob_store,
1554            pointer,
1555        }
1556    }
1557
1558    /// Extract a `ProductValue` from the table.
1559    ///
1560    /// This is a potentially expensive operation,
1561    /// as it must walk the table's `ProductTypeLayout`
1562    /// and heap-allocate various substructures of the `ProductValue`.
1563    pub fn to_product_value(&self) -> ProductValue {
1564        let res = self
1565            .serialize(ValueSerializer)
1566            .unwrap_or_else(|x| match x {})
1567            .into_product();
1568        // SAFETY: the top layer of a row when serialized is always a product.
1569        unsafe { res.unwrap_unchecked() }
1570    }
1571
1572    /// Check that the `idx`th column of the row type stored by `self` is compatible with `T`,
1573    /// and read the value of that column from `self`.
1574    #[inline]
1575    pub fn read_col<T: ReadColumn>(self, col: impl Into<ColId>) -> Result<T, TypeError> {
1576        T::read_column(self, col.into().idx())
1577    }
1578
1579    /// Construct a projection of the row at `self` by extracting the `cols`.
1580    ///
1581    /// If `cols` contains zero or more than one column, the values of the projected columns are wrapped in a [`ProductValue`].
1582    /// If `cols` is a single column, the value of that column is returned without wrapping in a `ProductValue`.
1583    ///
1584    /// # Safety
1585    ///
1586    /// - `cols` must not specify any column which is out-of-bounds for the row `self´.
1587    pub unsafe fn project_unchecked(self, cols: &ColList) -> AlgebraicValue {
1588        let col_layouts = &self.row_layout().product().elements;
1589
1590        if let Some(head) = cols.as_singleton() {
1591            let head = head.idx();
1592            // SAFETY: caller promised that `head` is in-bounds of `col_layouts`.
1593            let col_layout = unsafe { col_layouts.get_unchecked(head) };
1594            // SAFETY:
1595            // - `col_layout` was just derived from the row layout.
1596            // - `AlgebraicValue` is compatible with any  `col_layout`.
1597            // - `self` is a valid row and offsetting to `col_layout` is valid.
1598            return unsafe { AlgebraicValue::unchecked_read_column(self, col_layout) };
1599        }
1600        let mut elements = Vec::with_capacity(cols.len() as usize);
1601        for col in cols.iter() {
1602            let col = col.idx();
1603            // SAFETY: caller promised that any `col` is in-bounds of `col_layouts`.
1604            let col_layout = unsafe { col_layouts.get_unchecked(col) };
1605            // SAFETY:
1606            // - `col_layout` was just derived from the row layout.
1607            // - `AlgebraicValue` is compatible with any  `col_layout`.
1608            // - `self` is a valid row and offsetting to `col_layout` is valid.
1609            elements.push(unsafe { AlgebraicValue::unchecked_read_column(self, col_layout) });
1610        }
1611        AlgebraicValue::product(elements)
1612    }
1613
1614    /// Construct a projection of the row at `self` by extracting the `cols`.
1615    ///
1616    /// Returns an error if `cols` specifies an index which is out-of-bounds for the row at `self`.
1617    ///
1618    /// If `cols` contains zero or more than one column, the values of the projected columns are wrapped in a [`ProductValue`].
1619    /// If `cols` is a single column, the value of that column is returned without wrapping in a `ProductValue`.
1620    pub fn project(self, cols: &ColList) -> Result<AlgebraicValue, InvalidFieldError> {
1621        if let Some(head) = cols.as_singleton() {
1622            return self.read_col(head).map_err(|_| head.into());
1623        }
1624        let mut elements = Vec::with_capacity(cols.len() as usize);
1625        for col in cols.iter() {
1626            let col_val = self.read_col(col).map_err(|err| match err {
1627                TypeError::WrongType { .. } => {
1628                    unreachable!("AlgebraicValue::read_column never returns a `TypeError::WrongType`")
1629                }
1630                TypeError::IndexOutOfBounds { .. } => col,
1631            })?;
1632            elements.push(col_val);
1633        }
1634        Ok(AlgebraicValue::product(elements))
1635    }
1636
1637    /// Returns the raw row pointer for this row reference.
1638    pub fn pointer(&self) -> RowPointer {
1639        self.pointer
1640    }
1641
1642    /// Returns the blob store that any [`crate::blob_store::BlobHash`]es within the row refer to.
1643    pub(crate) fn blob_store(&self) -> &dyn BlobStore {
1644        self.blob_store
1645    }
1646
1647    /// Return the layout of the row.
1648    ///
1649    /// All rows within the same table will have the same layout.
1650    pub fn row_layout(&self) -> &RowTypeLayout {
1651        &self.table.row_layout
1652    }
1653
1654    /// Returns the page the row is in and the offset of the row within that page.
1655    pub fn page_and_offset(&self) -> (&Page, PageOffset) {
1656        self.table.page_and_offset(self.pointer())
1657    }
1658
1659    /// Returns the bytes for the fixed portion of this row.
1660    pub(crate) fn get_row_data(&self) -> &Bytes {
1661        let (page, offset) = self.page_and_offset();
1662        page.get_row_data(offset, self.table.row_layout.size())
1663    }
1664
1665    /// Returns the row hash for `ptr`.
1666    pub fn row_hash(&self) -> RowHash {
1667        RowHash(RowHash::hasher_builder().hash_one(self))
1668    }
1669
1670    /// Returns the static layout for this row reference, if any.
1671    pub fn static_layout(&self) -> Option<&StaticLayout> {
1672        self.table.static_layout.as_ref().map(|(s, _)| s)
1673    }
1674
1675    /// Encode the row referred to by `self` into a `Vec<u8>` using BSATN and then deserialize it.
1676    pub fn read_via_bsatn<T>(&self, scratch: &mut Vec<u8>) -> Result<T, ReadViaBsatnError>
1677    where
1678        T: DeserializeOwned,
1679    {
1680        self.to_bsatn_extend(scratch)?;
1681        Ok(bsatn::from_slice::<T>(scratch)?)
1682    }
1683
1684    /// Return the number of bytes in the blob store to which this object holds a reference.
1685    ///
1686    /// Used to compute the table's `blob_store_bytes` when reconstructing a snapshot.
1687    ///
1688    /// Even within a single row, this is a conservative overestimate,
1689    /// as a row may contain multiple references to the same large blob.
1690    /// This seems unlikely to occur in practice.
1691    fn blob_store_bytes(&self) -> usize {
1692        let row_data = self.get_row_data();
1693        let (page, _) = self.page_and_offset();
1694        // SAFETY:
1695        // - Existence of a `RowRef` treated as proof
1696        //   of the row's validity and type information's correctness.
1697        unsafe { self.table.visitor_prog.visit_var_len(row_data) }
1698            .filter(|vlr| vlr.is_large_blob())
1699            .map(|vlr| {
1700                // SAFETY:
1701                // - Because `vlr.is_large_blob`, it points to exactly one granule.
1702                let granule = unsafe { page.iter_var_len_object(vlr.first_granule) }.next().unwrap();
1703                let blob_hash = granule.blob_hash();
1704                let blob = self.blob_store.retrieve_blob(&blob_hash).unwrap();
1705
1706                blob.len()
1707            })
1708            .sum()
1709    }
1710}
1711
1712impl Serialize for RowRef<'_> {
1713    fn serialize<S: Serializer>(&self, ser: S) -> Result<S::Ok, S::Error> {
1714        let table = self.table;
1715        let (page, offset) = table.page_and_offset(self.pointer);
1716        // SAFETY: `ptr` points to a valid row in this table per above check.
1717        unsafe { serialize_row_from_page(ser, page, self.blob_store, offset, &table.row_layout) }
1718    }
1719}
1720
1721impl ToBsatn for RowRef<'_> {
1722    /// BSATN-encode the row referred to by `self` into a freshly-allocated `Vec<u8>`.
1723    ///
1724    /// This method will use a [`StaticLayout`] if one is available,
1725    /// and may therefore be faster than calling [`bsatn::to_vec`].
1726    fn to_bsatn_vec(&self) -> Result<Vec<u8>, BsatnError> {
1727        if let Some(static_layout) = self.static_layout() {
1728            // Use fast path, by first fetching the row data and then using the static layout.
1729            let row = self.get_row_data();
1730            // SAFETY:
1731            // - Existence of a `RowRef` treated as proof
1732            //   of row's validity and type information's correctness.
1733            Ok(unsafe { static_layout.serialize_row_into_vec(row) })
1734        } else {
1735            bsatn::to_vec(self)
1736        }
1737    }
1738
1739    /// BSATN-encode the row referred to by `self` into `buf`,
1740    /// pushing `self`'s bytes onto the end of `buf`, similar to [`Vec::extend`].
1741    ///
1742    /// This method will use a [`StaticLayout`] if one is available,
1743    /// and may therefore be faster than calling [`bsatn::to_writer`].
1744    fn to_bsatn_extend(&self, buf: &mut Vec<u8>) -> Result<(), BsatnError> {
1745        if let Some(static_layout) = self.static_layout() {
1746            // Use fast path, by first fetching the row data and then using the static layout.
1747            let row = self.get_row_data();
1748            // SAFETY:
1749            // - Existence of a `RowRef` treated as proof
1750            //   of row's validity and type information's correctness.
1751            unsafe {
1752                static_layout.serialize_row_extend(buf, row);
1753            }
1754            Ok(())
1755        } else {
1756            // Use the slower, but more general, `bsatn_from` serializer to write the row.
1757            bsatn::to_writer(buf, self)
1758        }
1759    }
1760
1761    fn static_bsatn_size(&self) -> Option<u16> {
1762        self.static_layout().map(|sl| sl.bsatn_length)
1763    }
1764}
1765
1766impl Eq for RowRef<'_> {}
1767impl PartialEq for RowRef<'_> {
1768    fn eq(&self, other: &Self) -> bool {
1769        // Ensure that the layouts are the same
1770        // so that we can use `eq_row_in_page`.
1771        // To do this, we first try address equality on the layouts.
1772        // This should succeed when the rows originate from the same table.
1773        // Otherwise, actually compare the layouts, which is expensive, but unlikely to happen.
1774        let a_ty = self.row_layout();
1775        let b_ty = other.row_layout();
1776        if !(ptr::eq(a_ty, b_ty) || a_ty == b_ty) {
1777            return false;
1778        }
1779        let (page_a, offset_a) = self.page_and_offset();
1780        let (page_b, offset_b) = other.page_and_offset();
1781        let static_layout = self.static_layout();
1782        // SAFETY: `offset_a/b` are valid rows in `page_a/b` typed at `a_ty`
1783        // and `static_bsatn_layout` is derived from `a_ty`.
1784        unsafe { eq_row_in_page(page_a, page_b, offset_a, offset_b, a_ty, static_layout) }
1785    }
1786}
1787
1788impl PartialEq<ProductValue> for RowRef<'_> {
1789    fn eq(&self, rhs: &ProductValue) -> bool {
1790        let ty = self.row_layout();
1791        let (page, offset) = self.page_and_offset();
1792        // SAFETY: By having `RowRef`,
1793        // we know that `offset` is a valid offset for a row in `page` typed at `ty`.
1794        unsafe { eq_row_in_page_to_pv(self.blob_store, page, offset, rhs, ty) }
1795    }
1796}
1797
1798impl Hash for RowRef<'_> {
1799    fn hash<H: Hasher>(&self, state: &mut H) {
1800        let (page, offset) = self.table.page_and_offset(self.pointer);
1801        let ty = &self.table.row_layout;
1802        // SAFETY: A `RowRef` is a proof that `self.pointer` refers to a live fixed row in `self.table`, so:
1803        // 1. `offset` points at a row in `page` lasting `ty.size()` bytes.
1804        // 2. the row is valid for `ty`.
1805        // 3. for any `vlr: VarLenRef` stored in the row,
1806        //    `vlr.first_offset` is either `NULL` or points to a valid granule in `page`.
1807        unsafe { hash_row_in_page(state, page, self.blob_store, offset, ty) };
1808    }
1809}
1810
1811/// An iterator over all the rows, yielded as [`RowRef`]s, in a table.
1812pub struct TableScanIter<'table> {
1813    /// The current page we're yielding rows from.
1814    /// When `None`, the iterator will attempt to advance to the next page, if any.
1815    current_page: Option<FixedLenRowsIter<'table>>,
1816    /// The current page index we are or will visit.
1817    current_page_idx: PageIndex,
1818    /// The table the iterator is yielding rows from.
1819    pub(crate) table: &'table Table,
1820    /// The `BlobStore` that row references may refer into.
1821    pub(crate) blob_store: &'table dyn BlobStore,
1822}
1823
1824impl<'a> Iterator for TableScanIter<'a> {
1825    type Item = RowRef<'a>;
1826
1827    fn next(&mut self) -> Option<Self::Item> {
1828        // This could have been written using `.flat_map`,
1829        // but we don't have `type Foo = impl Iterator<...>;` on stable yet.
1830        loop {
1831            match &mut self.current_page {
1832                // We're currently visiting a page,
1833                Some(iter_fixed_len) => {
1834                    if let Some(page_offset) = iter_fixed_len.next() {
1835                        // There's still at least one row in that page to visit,
1836                        // return a ref to that row.
1837                        let ptr =
1838                            RowPointer::new(false, self.current_page_idx, page_offset, self.table.squashed_offset);
1839
1840                        // SAFETY: `offset` came from the `iter_fixed_len`, so it must point to a valid row.
1841                        let row_ref = unsafe { self.table.get_row_ref_unchecked(self.blob_store, ptr) };
1842                        return Some(row_ref);
1843                    } else {
1844                        // We've finished visiting that page, so set `current_page` to `None`,
1845                        // increment `self.current_page_idx` to the index of the next page,
1846                        // and go to the `None` case (1) in the match.
1847                        self.current_page = None;
1848                        self.current_page_idx.0 += 1;
1849                    }
1850                }
1851
1852                // (1) If we aren't currently visiting a page,
1853                // the `else` case in the `Some` match arm
1854                // already incremented `self.current_page_idx`,
1855                // or we're just beginning and so it was initialized as 0.
1856                None => {
1857                    // If there's another page, set `self.current_page` to it,
1858                    // and go to the `Some` case in the match.
1859                    let next_page = self.table.pages().get(self.current_page_idx.idx())?;
1860                    let iter = next_page.iter_fixed_len(self.table.row_size());
1861                    self.current_page = Some(iter);
1862                }
1863            }
1864        }
1865    }
1866}
1867
1868/// A combined table and index,
1869/// allowing direct extraction of a [`IndexScanIter`].
1870#[derive(Copy, Clone)]
1871pub struct TableAndIndex<'a> {
1872    table: &'a Table,
1873    blob_store: &'a dyn BlobStore,
1874    index: &'a TableIndex,
1875}
1876
1877impl<'a> TableAndIndex<'a> {
1878    pub fn table(&self) -> &'a Table {
1879        self.table
1880    }
1881
1882    pub fn index(&self) -> &'a TableIndex {
1883        self.index
1884    }
1885
1886    /// Wraps `ptr` in a [`RowRef`].
1887    ///
1888    /// # Safety
1889    ///
1890    /// The `self.table().is_row_present(ptr)` must hold.
1891    pub unsafe fn combine_with_ptr(&self, ptr: RowPointer) -> RowRef<'a> {
1892        // SAFETY: forward caller requirement.
1893        unsafe { self.table.get_row_ref_unchecked(self.blob_store, ptr) }
1894    }
1895
1896    /// Returns an iterator yielding all rows in this index for `key`.
1897    ///
1898    /// Matching is defined by `Ord for AlgebraicValue`.
1899    pub fn seek_point(&self, key: &AlgebraicValue) -> IndexScanPointIter<'a> {
1900        IndexScanPointIter {
1901            table: self.table,
1902            blob_store: self.blob_store,
1903            btree_index_iter: self.index.seek_point(key),
1904        }
1905    }
1906
1907    /// Returns an iterator yielding all rows in this index that fall within `range`.
1908    ///
1909    /// Matching is defined by `Ord for AlgebraicValue`.
1910    pub fn seek_range(&self, range: &impl RangeBounds<AlgebraicValue>) -> IndexScanRangeIter<'a> {
1911        IndexScanRangeIter {
1912            table: self.table,
1913            blob_store: self.blob_store,
1914            btree_index_iter: self.index.seek_range(range),
1915        }
1916    }
1917}
1918
1919/// An iterator using a [`TableIndex`] to scan a `table`
1920/// for all the [`RowRef`]s matching the specified `key` in the indexed column(s).
1921///
1922/// Matching is defined by `Ord for AlgebraicValue`.
1923pub struct IndexScanPointIter<'a> {
1924    /// The table being scanned for rows.
1925    table: &'a Table,
1926    /// The blob store; passed on to the [`RowRef`]s in case they need it.
1927    blob_store: &'a dyn BlobStore,
1928    /// The iterator performing the index scan yielding row pointers.
1929    btree_index_iter: TableIndexPointIter<'a>,
1930}
1931
1932impl<'a> IndexScanPointIter<'a> {
1933    /// Consume the iterator, returning the inner one.
1934    pub fn index(self) -> TableIndexPointIter<'a> {
1935        self.btree_index_iter
1936    }
1937}
1938
1939impl<'a> Iterator for IndexScanPointIter<'a> {
1940    type Item = RowRef<'a>;
1941
1942    fn next(&mut self) -> Option<Self::Item> {
1943        self.btree_index_iter.next().map(|ptr| {
1944            // SAFETY: `ptr` came from the index, which always holds pointers to valid rows for its table.
1945            unsafe { self.table.get_row_ref_unchecked(self.blob_store, ptr) }
1946        })
1947    }
1948}
1949
1950/// An iterator using a [`TableIndex`] to scan a `table`
1951/// for all the [`RowRef`]s matching the specified `range` in the indexed column(s).
1952///
1953/// Matching is defined by `Ord for AlgebraicValue`.
1954pub struct IndexScanRangeIter<'a> {
1955    /// The table being scanned for rows.
1956    table: &'a Table,
1957    /// The blob store; passed on to the [`RowRef`]s in case they need it.
1958    blob_store: &'a dyn BlobStore,
1959    /// The iterator performing the index scan yielding row pointers.
1960    btree_index_iter: TableIndexRangeIter<'a>,
1961}
1962
1963impl<'a> Iterator for IndexScanRangeIter<'a> {
1964    type Item = RowRef<'a>;
1965
1966    fn next(&mut self) -> Option<Self::Item> {
1967        self.btree_index_iter.next().map(|ptr| {
1968            // SAFETY: `ptr` came from the index, which always holds pointers to valid rows for its table.
1969            unsafe { self.table.get_row_ref_unchecked(self.blob_store, ptr) }
1970        })
1971    }
1972}
1973
1974#[derive(Error, Debug, PartialEq, Eq)]
1975#[error("Unique constraint violation '{}' in table '{}': column(s): '{:?}' value: {}", constraint_name, table_name, cols, value.to_satn())]
1976pub struct UniqueConstraintViolation {
1977    pub constraint_name: Box<str>,
1978    pub table_name: Box<str>,
1979    pub cols: Vec<Box<str>>,
1980    pub value: AlgebraicValue,
1981}
1982
1983impl UniqueConstraintViolation {
1984    /// Returns a unique constraint violation error for the given `index`
1985    /// and the `value` that would have been duplicated.
1986    ///
1987    /// In this version, the [`IndexSchema`] is looked up in `schema` based on `index_id`.
1988    #[cold]
1989    fn build(schema: &TableSchema, index: &TableIndex, index_id: IndexId, value: AlgebraicValue) -> Self {
1990        let index_schema = schema.indexes.iter().find(|i| i.index_id == index_id).unwrap();
1991        Self::build_with_index_schema(schema, index, index_schema, value)
1992    }
1993
1994    /// Returns a unique constraint violation error for the given `index`
1995    /// and the `value` that would have been duplicated.
1996    ///
1997    /// In this version, the `index_schema` is explicitly passed.
1998    #[cold]
1999    pub fn build_with_index_schema(
2000        schema: &TableSchema,
2001        index: &TableIndex,
2002        index_schema: &IndexSchema,
2003        value: AlgebraicValue,
2004    ) -> Self {
2005        // Fetch the table name.
2006        let table_name = schema.table_name.clone();
2007
2008        // Fetch the names of the columns used in the index.
2009        let cols = schema
2010            .get_columns(&index.indexed_columns)
2011            .map(|(_, cs)| cs.unwrap().col_name.clone())
2012            .collect();
2013
2014        // Fetch the name of the index.
2015        let constraint_name = index_schema.index_name.clone();
2016
2017        Self {
2018            constraint_name,
2019            table_name,
2020            cols,
2021            value,
2022        }
2023    }
2024}
2025
2026// Private API:
2027impl Table {
2028    /// Returns a unique constraint violation error for the given `index`
2029    /// and the `value` that would have been duplicated.
2030    #[cold]
2031    pub fn build_error_unique(
2032        &self,
2033        index: &TableIndex,
2034        index_id: IndexId,
2035        value: AlgebraicValue,
2036    ) -> UniqueConstraintViolation {
2037        let schema = self.get_schema();
2038        UniqueConstraintViolation::build(schema, index, index_id, value)
2039    }
2040
2041    /// Returns a new empty table using the particulars passed.
2042    fn new_raw(
2043        schema: Arc<TableSchema>,
2044        row_layout: RowTypeLayout,
2045        static_layout: Option<(StaticLayout, StaticBsatnValidator)>,
2046        visitor_prog: VarLenVisitorProgram,
2047        squashed_offset: SquashedOffset,
2048        pointer_map: Option<PointerMap>,
2049    ) -> Self {
2050        Self {
2051            inner: TableInner {
2052                row_layout,
2053                static_layout,
2054                visitor_prog,
2055                pages: Pages::default(),
2056            },
2057            is_scheduler: schema.schedule.is_some(),
2058            schema,
2059            indexes: BTreeMap::new(),
2060            pointer_map,
2061            squashed_offset,
2062            row_count: 0,
2063            blob_store_bytes: BlobNumBytes::default(),
2064        }
2065    }
2066
2067    /// Returns whether the row at `ptr` is present or not.
2068    // TODO: Remove all uses of this method,
2069    //       or more likely, gate them behind `debug_assert!`
2070    //       so they don't have semantic meaning.
2071    //
2072    //       Unlike the previous `locking_tx_datastore::Table`'s `RowId`,
2073    //       `RowPointer` is not content-addressed.
2074    //       This means it is possible to:
2075    //       - have a `RowPointer` A* to row A,
2076    //       - Delete row A,
2077    //       - Insert row B into the same storage as freed from A,
2078    //       - Test `is_row_present(A*)`, which falsely reports that row A is still present.
2079    //
2080    //       In the final interface, this method is superfluous anyways,
2081    //       as `RowPointer` is not part of our public interface.
2082    //       Instead, we will always discover a known-present `RowPointer`
2083    //       during a table scan or index seek.
2084    //       As such, our `delete` and `insert` methods can be `unsafe`
2085    //       and trust that the `RowPointer` is valid.
2086    fn is_row_present(&self, ptr: RowPointer) -> bool {
2087        if self.squashed_offset != ptr.squashed_offset() {
2088            return false;
2089        }
2090        let Some((page, offset)) = self.inner.try_page_and_offset(ptr) else {
2091            return false;
2092        };
2093        page.has_row_offset(self.row_size(), offset)
2094    }
2095
2096    /// Returns the row size for a row in the table.
2097    pub fn row_size(&self) -> Size {
2098        self.row_layout().size()
2099    }
2100
2101    /// Returns the layout for a row in the table.
2102    fn row_layout(&self) -> &RowTypeLayout {
2103        &self.inner.row_layout
2104    }
2105
2106    /// Returns the pages storing the physical rows of this table.
2107    fn pages(&self) -> &Pages {
2108        &self.inner.pages
2109    }
2110
2111    /// Iterates over each [`Page`] in this table, ensuring that its hash is computed before yielding it.
2112    ///
2113    /// Used when capturing a snapshot.
2114    pub fn iter_pages_with_hashes(&mut self) -> impl Iterator<Item = (blake3::Hash, &Page)> {
2115        self.inner.pages.iter_mut().map(|page| {
2116            let hash = page.save_or_get_content_hash();
2117            (hash, &**page)
2118        })
2119    }
2120
2121    /// Returns the number of pages storing the physical rows of this table.
2122    fn num_pages(&self) -> usize {
2123        self.inner.pages.len()
2124    }
2125
2126    /// Returns the [`StaticLayout`] for this table,
2127    pub(crate) fn static_layout(&self) -> Option<&StaticLayout> {
2128        self.inner.static_layout.as_ref().map(|(s, _)| s)
2129    }
2130
2131    /// Rebuild the [`PointerMap`] by iterating over all the rows in `self` and inserting them.
2132    ///
2133    /// Called when restoring from a snapshot after installing the pages,
2134    /// but after computing the row count,
2135    /// since snapshots do not save the pointer map..
2136    fn rebuild_pointer_map(&mut self, blob_store: &dyn BlobStore) -> PointerMap {
2137        // TODO(perf): Pre-allocate `PointerMap.map` with capacity `self.row_count`.
2138        // Alternatively, do this at the same time as `compute_row_count`.
2139        self.scan_rows(blob_store)
2140            .map(|row_ref| (row_ref.row_hash(), row_ref.pointer()))
2141            .collect()
2142    }
2143
2144    /// Compute and store `self.row_count` and `self.blob_store_bytes`
2145    /// by iterating over all the rows in `self` and counting them.
2146    ///
2147    /// Called when restoring from a snapshot after installing the pages,
2148    /// since snapshots do not save this metadata.
2149    fn compute_row_count(&mut self, blob_store: &dyn BlobStore) {
2150        let mut row_count = 0;
2151        let mut blob_store_bytes = 0;
2152        for row in self.scan_rows(blob_store) {
2153            row_count += 1;
2154            blob_store_bytes += row.blob_store_bytes();
2155        }
2156        self.row_count = row_count as u64;
2157        self.blob_store_bytes = blob_store_bytes.into();
2158    }
2159}
2160
2161#[cfg(test)]
2162pub(crate) mod test {
2163    use super::*;
2164    use crate::blob_store::{HashMapBlobStore, NullBlobStore};
2165    use crate::page::tests::hash_unmodified_save_get;
2166    use crate::var_len::VarLenGranule;
2167    use proptest::prelude::*;
2168    use proptest::test_runner::TestCaseResult;
2169    use spacetimedb_lib::db::raw_def::v9::{RawIndexAlgorithm, RawModuleDefV9Builder};
2170    use spacetimedb_primitives::{col_list, TableId};
2171    use spacetimedb_sats::bsatn::to_vec;
2172    use spacetimedb_sats::proptest::{generate_typed_row, generate_typed_row_vec};
2173    use spacetimedb_sats::{product, AlgebraicType, ArrayValue};
2174    use spacetimedb_schema::def::{BTreeAlgorithm, ModuleDef};
2175    use spacetimedb_schema::schema::Schema as _;
2176
2177    /// Create a `Table` from a `ProductType` without validation.
2178    pub(crate) fn table(ty: ProductType) -> Table {
2179        // Use a fast path here to avoid slowing down Miri in the proptests.
2180        // Does not perform validation.
2181        let schema = TableSchema::from_product_type(ty);
2182        Table::new(schema.into(), SquashedOffset::COMMITTED_STATE)
2183    }
2184
2185    #[test]
2186    fn unique_violation_error() {
2187        let table_name = "UniqueIndexed";
2188        let index_name = "UniqueIndexed_unique_col_idx_btree";
2189        let mut builder = RawModuleDefV9Builder::new();
2190        builder
2191            .build_table_with_new_type(
2192                table_name,
2193                ProductType::from([("unique_col", AlgebraicType::I32), ("other_col", AlgebraicType::I32)]),
2194                true,
2195            )
2196            .with_unique_constraint(0)
2197            .with_index(
2198                RawIndexAlgorithm::BTree { columns: col_list![0] },
2199                "accessor_name_doesnt_matter",
2200            );
2201
2202        let def: ModuleDef = builder.finish().try_into().expect("Failed to build schema");
2203
2204        let schema = TableSchema::from_module_def(&def, def.table(table_name).unwrap(), (), TableId::SENTINEL);
2205        assert_eq!(schema.indexes.len(), 1);
2206        let index_schema = schema.indexes[0].clone();
2207
2208        let mut table = Table::new(schema.into(), SquashedOffset::COMMITTED_STATE);
2209        let pool = PagePool::new_for_test();
2210        let cols = ColList::new(0.into());
2211        let algo = BTreeAlgorithm { columns: cols.clone() }.into();
2212
2213        let index = table.new_index(&algo, true).unwrap();
2214        // SAFETY: Index was derived from `table`.
2215        unsafe { table.insert_index(&NullBlobStore, index_schema.index_id, index) };
2216
2217        // Reserve a page so that we can check the hash.
2218        let pi = table.inner.pages.reserve_empty_page(&pool, table.row_size()).unwrap();
2219        let hash_pre_ins = hash_unmodified_save_get(&mut table.inner.pages[pi]);
2220
2221        // Insert the row (0, 0).
2222        table
2223            .insert(&pool, &mut NullBlobStore, &product![0i32, 0i32])
2224            .expect("Initial insert failed");
2225
2226        // Inserting cleared the hash.
2227        let hash_post_ins = hash_unmodified_save_get(&mut table.inner.pages[pi]);
2228        assert_ne!(hash_pre_ins, hash_post_ins);
2229
2230        // Try to insert the row (0, 1), and assert that we get the expected error.
2231        match table.insert(&pool, &mut NullBlobStore, &product![0i32, 1i32]) {
2232            Ok(_) => panic!("Second insert with same unique value succeeded"),
2233            Err(InsertError::IndexError(UniqueConstraintViolation {
2234                constraint_name,
2235                table_name,
2236                cols,
2237                value,
2238            })) => {
2239                assert_eq!(&*constraint_name, index_name);
2240                assert_eq!(&*table_name, "UniqueIndexed");
2241                assert_eq!(cols.iter().map(|c| c.to_string()).collect::<Vec<_>>(), &["unique_col"]);
2242                assert_eq!(value, AlgebraicValue::I32(0));
2243            }
2244            Err(e) => panic!("Expected UniqueConstraintViolation but found {e:?}"),
2245        }
2246
2247        // Second insert did clear the hash while we had a constraint violation,
2248        // as constraint checking is done after insertion and then rolled back.
2249        assert_eq!(table.inner.pages[pi].unmodified_hash(), None);
2250    }
2251
2252    fn insert_retrieve_body(ty: impl Into<ProductType>, val: impl Into<ProductValue>) -> TestCaseResult {
2253        let val = val.into();
2254        let pool = PagePool::new_for_test();
2255        let mut blob_store = HashMapBlobStore::default();
2256        let mut table = table(ty.into());
2257        let (hash, row) = table.insert(&pool, &mut blob_store, &val).unwrap();
2258        let hash = hash.unwrap();
2259        prop_assert_eq!(row.row_hash(), hash);
2260        let ptr = row.pointer();
2261        prop_assert_eq!(table.pointers_for(hash), &[ptr]);
2262
2263        prop_assert_eq!(table.inner.pages.len(), 1);
2264        prop_assert_eq!(table.inner.pages[PageIndex(0)].num_rows(), 1);
2265
2266        let row_ref = table.get_row_ref(&blob_store, ptr).unwrap();
2267        prop_assert_eq!(row_ref.to_product_value(), val.clone());
2268        let bsatn_val = to_vec(&val).unwrap();
2269        prop_assert_eq!(&bsatn_val, &to_vec(&row_ref).unwrap());
2270        prop_assert_eq!(&bsatn_val, &row_ref.to_bsatn_vec().unwrap());
2271
2272        prop_assert_eq!(
2273            &table.scan_rows(&blob_store).map(|r| r.pointer()).collect::<Vec<_>>(),
2274            &[ptr]
2275        );
2276
2277        Ok(())
2278    }
2279
2280    #[test]
2281    fn repro_serialize_bsatn_empty_array() {
2282        let ty = AlgebraicType::array(AlgebraicType::U64);
2283        let arr = ArrayValue::from(Vec::<u64>::new().into_boxed_slice());
2284        insert_retrieve_body(ty, AlgebraicValue::from(arr)).unwrap();
2285    }
2286
2287    #[test]
2288    fn repro_serialize_bsatn_debug_assert() {
2289        let ty = AlgebraicType::array(AlgebraicType::U64);
2290        let arr = ArrayValue::from((0..130u64).collect::<Box<_>>());
2291        insert_retrieve_body(ty, AlgebraicValue::from(arr)).unwrap();
2292    }
2293
2294    fn reconstruct_index_num_key_bytes(table: &Table, blob_store: &dyn BlobStore, index_id: IndexId) -> u64 {
2295        let index = table.get_index_by_id(index_id).unwrap();
2296
2297        index
2298            .seek_range(&(..))
2299            .map(|row_ptr| {
2300                let row_ref = table.get_row_ref(blob_store, row_ptr).unwrap();
2301                let key = row_ref.project(&index.indexed_columns).unwrap();
2302                crate::table_index::KeySize::key_size_in_bytes(&key) as u64
2303            })
2304            .sum()
2305    }
2306
2307    /// Given a row type `ty`, a set of rows of that type `vals`,
2308    /// and a set of columns within that type `indexed_columns`,
2309    /// populate a table with `vals`, add an index on the `indexed_columns`,
2310    /// and perform various assertions that the reported index size metrics are correct.
2311    fn test_index_size_reporting(
2312        ty: ProductType,
2313        vals: Vec<ProductValue>,
2314        indexed_columns: ColList,
2315    ) -> Result<(), TestCaseError> {
2316        let pool = PagePool::new_for_test();
2317        let mut blob_store = HashMapBlobStore::default();
2318        let mut table = table(ty.clone());
2319
2320        for row in &vals {
2321            prop_assume!(table.insert(&pool, &mut blob_store, row).is_ok());
2322        }
2323
2324        // We haven't added any indexes yet, so there should be 0 rows in indexes.
2325        prop_assert_eq!(table.num_rows_in_indexes(), 0);
2326
2327        let index_id = IndexId(0);
2328
2329        let algo = BTreeAlgorithm {
2330            columns: indexed_columns.clone(),
2331        }
2332        .into();
2333        let index = TableIndex::new(&ty, &algo, false).unwrap();
2334        // Add an index on column 0.
2335        // Safety:
2336        // We're using `ty` as the row type for both `table` and the new index.
2337        unsafe { table.insert_index(&blob_store, index_id, index) };
2338
2339        // We have one index, which should be fully populated,
2340        // so in total we should have the same number of rows in indexes as we have rows.
2341        prop_assert_eq!(table.num_rows_in_indexes(), table.num_rows());
2342
2343        let index = table.get_index_by_id(index_id).unwrap();
2344
2345        // One index, so table's reporting of bytes used should match that index's reporting.
2346        prop_assert_eq!(table.bytes_used_by_index_keys(), index.num_key_bytes());
2347
2348        // Walk all the rows in the index, sum their key size,
2349        // and assert it matches the `index.num_key_bytes()`
2350        prop_assert_eq!(
2351            index.num_key_bytes(),
2352            reconstruct_index_num_key_bytes(&table, &blob_store, index_id)
2353        );
2354
2355        // Walk all the rows we inserted, project them to the cols that will be their keys,
2356        // sum their key size,
2357        // and assert it matches the `index.num_key_bytes()`
2358        let key_size_in_pvs = vals
2359            .iter()
2360            .map(|row| crate::table_index::KeySize::key_size_in_bytes(&row.project(&indexed_columns).unwrap()) as u64)
2361            .sum();
2362        prop_assert_eq!(index.num_key_bytes(), key_size_in_pvs);
2363
2364        let algo = BTreeAlgorithm {
2365            columns: indexed_columns,
2366        }
2367        .into();
2368        let index = TableIndex::new(&ty, &algo, false).unwrap();
2369        // Add a duplicate of the same index, so we can check that all above quantities double.
2370        // Safety:
2371        // As above, we're using `ty` as the row type for both `table` and the new index.
2372        unsafe { table.insert_index(&blob_store, IndexId(1), index) };
2373
2374        prop_assert_eq!(table.num_rows_in_indexes(), table.num_rows() * 2);
2375        prop_assert_eq!(table.bytes_used_by_index_keys(), key_size_in_pvs * 2);
2376
2377        Ok(())
2378    }
2379
2380    proptest! {
2381        #![proptest_config(ProptestConfig { max_shrink_iters: 0x10000000, ..Default::default() })]
2382
2383        #[test]
2384        fn insert_retrieve((ty, val) in generate_typed_row()) {
2385            insert_retrieve_body(ty, val)?;
2386        }
2387
2388        #[test]
2389        fn insert_delete_removed_from_pointer_map((ty, val) in generate_typed_row()) {
2390            let pool = PagePool::new_for_test();
2391            let mut blob_store = HashMapBlobStore::default();
2392            let mut table = table(ty);
2393            let (hash, row) = table.insert(&pool, &mut blob_store, &val).unwrap();
2394            let hash = hash.unwrap();
2395            prop_assert_eq!(row.row_hash(), hash);
2396            let ptr = row.pointer();
2397            prop_assert_eq!(table.pointers_for(hash), &[ptr]);
2398
2399            prop_assert_eq!(table.inner.pages.len(), 1);
2400            prop_assert_eq!(table.inner.pages[PageIndex(0)].num_rows(), 1);
2401            prop_assert_eq!(&table.scan_rows(&blob_store).map(|r| r.pointer()).collect::<Vec<_>>(), &[ptr]);
2402            prop_assert_eq!(table.row_count, 1);
2403
2404            let hash_pre_del = hash_unmodified_save_get(&mut table.inner.pages[ptr.page_index()]);
2405
2406            table.delete(&mut blob_store, ptr, |_| ());
2407
2408            let hash_post_del = hash_unmodified_save_get(&mut table.inner.pages[ptr.page_index()]);
2409            assert_ne!(hash_pre_del, hash_post_del);
2410
2411            prop_assert_eq!(table.pointers_for(hash), &[]);
2412
2413            prop_assert_eq!(table.inner.pages.len(), 1);
2414            prop_assert_eq!(table.inner.pages[PageIndex(0)].num_rows(), 0);
2415            prop_assert_eq!(table.row_count, 0);
2416
2417            prop_assert!(&table.scan_rows(&blob_store).next().is_none());
2418        }
2419
2420        #[test]
2421        fn insert_duplicate_set_semantic((ty, val) in generate_typed_row()) {
2422            let pool = PagePool::new_for_test();
2423            let mut blob_store = HashMapBlobStore::default();
2424            let mut table = table(ty);
2425
2426            let (hash, row) = table.insert(&pool, &mut blob_store, &val).unwrap();
2427            let hash = hash.unwrap();
2428            prop_assert_eq!(row.row_hash(), hash);
2429            let ptr = row.pointer();
2430            prop_assert_eq!(table.inner.pages.len(), 1);
2431            prop_assert_eq!(table.pointers_for(hash), &[ptr]);
2432            prop_assert_eq!(table.row_count, 1);
2433            prop_assert_eq!(&table.scan_rows(&blob_store).map(|r| r.pointer()).collect::<Vec<_>>(), &[ptr]);
2434
2435            let blob_uses = blob_store.usage_counter();
2436
2437            let hash_pre_ins = hash_unmodified_save_get(&mut table.inner.pages[ptr.page_index()]);
2438
2439            prop_assert!(table.insert(&pool, &mut blob_store, &val).is_err());
2440
2441            // Hash was cleared and is different despite failure to insert.
2442            let hash_post_ins = hash_unmodified_save_get(&mut table.inner.pages[ptr.page_index()]);
2443            assert_ne!(hash_pre_ins, hash_post_ins);
2444
2445            prop_assert_eq!(table.row_count, 1);
2446            prop_assert_eq!(table.inner.pages.len(), 1);
2447            prop_assert_eq!(table.pointers_for(hash), &[ptr]);
2448
2449            let blob_uses_after = blob_store.usage_counter();
2450
2451            prop_assert_eq!(blob_uses_after, blob_uses);
2452            prop_assert_eq!(table.inner.pages[PageIndex(0)].num_rows(), 1);
2453            prop_assert_eq!(&table.scan_rows(&blob_store).map(|r| r.pointer()).collect::<Vec<_>>(), &[ptr]);
2454        }
2455
2456        #[test]
2457        fn insert_bsatn_same_as_pv((ty, val) in generate_typed_row()) {
2458            let pool = PagePool::new_for_test();
2459            let mut bs_pv = HashMapBlobStore::default();
2460            let mut table_pv = table(ty.clone());
2461            let res_pv = table_pv.insert(&pool, &mut bs_pv, &val);
2462
2463            let mut bs_bsatn = HashMapBlobStore::default();
2464            let mut table_bsatn = table(ty);
2465            let res_bsatn = insert_bsatn(&mut table_bsatn, &mut bs_bsatn, &val);
2466
2467            prop_assert_eq!(res_pv, res_bsatn);
2468            prop_assert_eq!(bs_pv, bs_bsatn);
2469            prop_assert_eq!(table_pv, table_bsatn);
2470        }
2471
2472        #[test]
2473        fn row_size_reporting_matches_slow_implementations((ty, vals) in generate_typed_row_vec(128, 2048)) {
2474            let pool = PagePool::new_for_test();
2475            let mut blob_store = HashMapBlobStore::default();
2476            let mut table = table(ty.clone());
2477
2478            for row in &vals {
2479                prop_assume!(table.insert(&pool, &mut blob_store, row).is_ok());
2480            }
2481
2482            prop_assert_eq!(table.bytes_used_by_rows(), table.reconstruct_bytes_used_by_rows());
2483            prop_assert_eq!(table.num_rows(), table.reconstruct_num_rows());
2484            prop_assert_eq!(table.num_rows(), vals.len() as u64);
2485
2486            // TODO(testing): Determine if there's a meaningful way to test that the blob store reporting is correct.
2487            // I (pgoldman 2025-01-27) doubt it, as the test would be "visit every blob and sum their size,"
2488            // which is already what the actual implementation does.
2489        }
2490
2491        #[test]
2492        fn index_size_reporting_matches_slow_implementations_single_column((ty, vals) in generate_typed_row_vec(128, 2048)) {
2493            prop_assume!(!ty.elements.is_empty());
2494
2495            test_index_size_reporting(ty, vals, ColList::from(ColId(0)))?;
2496        }
2497
2498        #[test]
2499        fn index_size_reporting_matches_slow_implementations_two_column((ty, vals) in generate_typed_row_vec(128, 2048)) {
2500            prop_assume!(ty.elements.len() >= 2);
2501
2502
2503            test_index_size_reporting(ty, vals, ColList::from([ColId(0), ColId(1)]))?;
2504        }
2505    }
2506
2507    fn insert_bsatn<'a>(
2508        table: &'a mut Table,
2509        blob_store: &'a mut dyn BlobStore,
2510        val: &ProductValue,
2511    ) -> Result<(Option<RowHash>, RowRef<'a>), InsertError> {
2512        let row = &to_vec(&val).unwrap();
2513
2514        // Optimistically insert the `row` before checking any constraints
2515        // under the assumption that errors (unique constraint & set semantic violations) are rare.
2516        let pool = PagePool::new_for_test();
2517        let (row_ref, blob_bytes) = table.insert_physically_bsatn(&pool, blob_store, row)?;
2518        let row_ptr = row_ref.pointer();
2519
2520        // Confirm the insertion, checking any constraints, removing the physical row on error.
2521        // SAFETY: We just inserted `ptr`, so it must be present.
2522        let (hash, row_ptr) = unsafe { table.confirm_insertion::<true>(blob_store, row_ptr, blob_bytes) }?;
2523        // SAFETY: Per post-condition of `confirm_insertion`, `row_ptr` refers to a valid row.
2524        let row_ref = unsafe { table.get_row_ref_unchecked(blob_store, row_ptr) };
2525        Ok((hash, row_ref))
2526    }
2527
2528    // Compare `scan_rows` against a simpler implementation.
2529    #[test]
2530    fn table_scan_iter_eq_flatmap() {
2531        let pool = PagePool::new_for_test();
2532        let mut blob_store = HashMapBlobStore::default();
2533        let mut table = table(AlgebraicType::U64.into());
2534        for v in 0..2u64.pow(14) {
2535            table.insert(&pool, &mut blob_store, &product![v]).unwrap();
2536        }
2537
2538        let complex = table.scan_rows(&blob_store).map(|r| r.pointer());
2539        let simple = table
2540            .inner
2541            .pages
2542            .iter()
2543            .zip((0..).map(PageIndex))
2544            .flat_map(|(page, pi)| {
2545                page.iter_fixed_len(table.row_size())
2546                    .map(move |po| RowPointer::new(false, pi, po, table.squashed_offset))
2547            });
2548        assert!(complex.eq(simple));
2549    }
2550
2551    #[test]
2552    #[should_panic]
2553    fn read_row_unaligned_page_offset_soundness() {
2554        // Insert a `u64` into a table.
2555        let pt = AlgebraicType::U64.into();
2556        let pv = product![42u64];
2557        let mut table = table(pt);
2558        let pool = &PagePool::new_for_test();
2559        let blob_store = &mut NullBlobStore;
2560        let (_, row_ref) = table.insert(pool, blob_store, &pv).unwrap();
2561
2562        // Manipulate the page offset to 1 instead of 0.
2563        // This now points into the "middle" of a row.
2564        let ptr = row_ref.pointer().with_page_offset(PageOffset(1));
2565
2566        // We expect this to panic.
2567        // Miri should not have any issue with this call either.
2568        table.get_row_ref(&NullBlobStore, ptr).unwrap().to_product_value();
2569    }
2570
2571    #[test]
2572    fn test_blob_store_bytes() {
2573        let pt: ProductType = [AlgebraicType::String, AlgebraicType::I32].into();
2574        let pool = &PagePool::new_for_test();
2575        let blob_store = &mut HashMapBlobStore::default();
2576        let mut insert = |table: &mut Table, string, num| {
2577            table
2578                .insert(pool, blob_store, &product![string, num])
2579                .unwrap()
2580                .1
2581                .pointer()
2582        };
2583        let mut table1 = table(pt.clone());
2584
2585        // Insert short string, `blob_store_bytes` should be 0.
2586        let short_str = std::str::from_utf8(&[98; 6]).unwrap();
2587        let short_row_ptr = insert(&mut table1, short_str, 0);
2588        assert_eq!(table1.blob_store_bytes.0, 0);
2589
2590        // Insert long string, `blob_store_bytes` should be the length of the string.
2591        const BLOB_OBJ_LEN: BlobNumBytes = BlobNumBytes(VarLenGranule::OBJECT_SIZE_BLOB_THRESHOLD + 1);
2592        let long_str = std::str::from_utf8(&[98; BLOB_OBJ_LEN.0]).unwrap();
2593        let long_row_ptr = insert(&mut table1, long_str, 0);
2594        assert_eq!(table1.blob_store_bytes, BLOB_OBJ_LEN);
2595
2596        // Insert previous long string in the same table,
2597        // `blob_store_bytes` should count the length twice,
2598        // even though `HashMapBlobStore` deduplicates it.
2599        let long_row_ptr2 = insert(&mut table1, long_str, 1);
2600        const BLOB_OBJ_LEN_2X: BlobNumBytes = BlobNumBytes(BLOB_OBJ_LEN.0 * 2);
2601        assert_eq!(table1.blob_store_bytes, BLOB_OBJ_LEN_2X);
2602
2603        // Insert previous long string in a new table,
2604        // `blob_store_bytes` should show the length,
2605        // even though `HashMapBlobStore` deduplicates it.
2606        let mut table2 = table(pt);
2607        let _ = insert(&mut table2, long_str, 0);
2608        assert_eq!(table2.blob_store_bytes, BLOB_OBJ_LEN);
2609
2610        // Delete `short_str` row. This should not affect the byte count.
2611        table1.delete(blob_store, short_row_ptr, |_| ()).unwrap();
2612        assert_eq!(table1.blob_store_bytes, BLOB_OBJ_LEN_2X);
2613
2614        // Delete the first long string row. This gets us down to `BLOB_OBJ_LEN` (we had 2x before).
2615        table1.delete(blob_store, long_row_ptr, |_| ()).unwrap();
2616        assert_eq!(table1.blob_store_bytes, BLOB_OBJ_LEN);
2617
2618        // Delete the first long string row. This gets us down to 0 (we've now deleted 2x).
2619        table1.delete(blob_store, long_row_ptr2, |_| ()).unwrap();
2620        assert_eq!(table1.blob_store_bytes, 0.into());
2621    }
2622
2623    /// Assert that calling `get_row_ref` to get a row ref to a non-existent `RowPointer`
2624    /// does not panic.
2625    #[test]
2626    fn get_row_ref_no_panic() {
2627        let blob_store = &mut HashMapBlobStore::default();
2628        let table = table([AlgebraicType::String, AlgebraicType::I32].into());
2629
2630        // This row pointer has an incorrect `SquashedOffset`, and so does not point into `table`.
2631        assert!(table
2632            .get_row_ref(
2633                blob_store,
2634                RowPointer::new(false, PageIndex(0), PageOffset(0), SquashedOffset::TX_STATE),
2635            )
2636            .is_none());
2637
2638        // This row pointer has the correct `SquashedOffset`, but points out-of-bounds within `table`.
2639        assert!(table
2640            .get_row_ref(
2641                blob_store,
2642                RowPointer::new(false, PageIndex(0), PageOffset(0), SquashedOffset::COMMITTED_STATE),
2643            )
2644            .is_none());
2645    }
2646}