spacetimedb_table/table.rs
1use crate::{
2 bflatn_to::write_row_to_pages_bsatn,
3 layout::AlgebraicTypeLayout,
4 static_bsatn_validator::{static_bsatn_validator, validate_bsatn, StaticBsatnValidator},
5 table_index::TableIndexPointIter,
6};
7
8use super::{
9 bflatn_from::serialize_row_from_page,
10 bflatn_to::{write_row_to_pages, Error},
11 blob_store::BlobStore,
12 eq::eq_row_in_page,
13 eq_to_pv::eq_row_in_page_to_pv,
14 indexes::{Bytes, PageIndex, PageOffset, RowHash, RowPointer, Size, SquashedOffset, PAGE_DATA_SIZE},
15 layout::RowTypeLayout,
16 page::{FixedLenRowsIter, Page},
17 pages::Pages,
18 pointer_map::PointerMap,
19 read_column::{ReadColumn, TypeError},
20 row_hash::hash_row_in_page,
21 row_type_visitor::{row_type_visitor, VarLenVisitorProgram},
22 static_assert_size,
23 static_layout::StaticLayout,
24 table_index::{TableIndex, TableIndexRangeIter},
25 var_len::VarLenMembers,
26 MemoryUsage,
27};
28use core::ops::RangeBounds;
29use core::{fmt, ptr};
30use core::{
31 hash::{Hash, Hasher},
32 hint::unreachable_unchecked,
33};
34use derive_more::{Add, AddAssign, From, Sub, SubAssign};
35use enum_as_inner::EnumAsInner;
36use smallvec::SmallVec;
37use spacetimedb_lib::{bsatn::DecodeError, de::DeserializeOwned};
38use spacetimedb_primitives::{ColId, ColList, IndexId, SequenceId};
39use spacetimedb_sats::{
40 algebraic_value::ser::ValueSerializer,
41 bsatn::{self, ser::BsatnError, ToBsatn},
42 i256,
43 product_value::InvalidFieldError,
44 satn::Satn,
45 ser::{Serialize, Serializer},
46 u256, AlgebraicValue, ProductType, ProductValue,
47};
48use spacetimedb_schema::{def::IndexAlgorithm, schema::TableSchema, type_for_generate::PrimitiveType};
49use std::{
50 collections::{btree_map, BTreeMap},
51 sync::Arc,
52};
53use thiserror::Error;
54
55/// The number of bytes used by, added to, or removed from a [`Table`]'s share of a [`BlobStore`].
56#[derive(Copy, Clone, PartialEq, Eq, Debug, Default, From, Add, Sub, AddAssign, SubAssign)]
57pub struct BlobNumBytes(usize);
58
59impl MemoryUsage for BlobNumBytes {}
60
61pub type SeqIdList = SmallVec<[SequenceId; 4]>;
62static_assert_size!(SeqIdList, 24);
63
64/// A database table containing the row schema, the rows, and indices.
65///
66/// The table stores the rows into a page manager
67/// and uses an internal map to ensure that no identical row is stored more than once.
68#[derive(Debug, PartialEq, Eq)]
69pub struct Table {
70 /// Page manager and row layout grouped together, for `RowRef` purposes.
71 inner: TableInner,
72 /// Maps `RowHash -> [RowPointer]` where a [`RowPointer`] points into `pages`.
73 /// A [`PointerMap`] is effectively a specialized unique index on all the columns.
74 ///
75 /// In tables without any other unique constraints,
76 /// the pointer map is used to enforce set semantics,
77 /// i.e. to prevent duplicate rows.
78 /// If `self.indexes` contains at least one unique index,
79 /// duplicate rows are impossible regardless, so this will be `None`.
80 pointer_map: Option<PointerMap>,
81 /// The indices associated with a set of columns of the table.
82 pub indexes: BTreeMap<IndexId, TableIndex>,
83 /// The schema of the table, from which the type, and other details are derived.
84 pub schema: Arc<TableSchema>,
85 /// `SquashedOffset::TX_STATE` or `SquashedOffset::COMMITTED_STATE`
86 /// depending on whether this is a tx scratchpad table
87 /// or a committed table.
88 squashed_offset: SquashedOffset,
89 /// Store number of rows present in table.
90 pub row_count: u64,
91 /// Stores the sum total number of bytes that each blob object in the table occupies.
92 ///
93 /// Note that the [`HashMapBlobStore`] does ref-counting and de-duplication,
94 /// but this sum will count an object each time its hash is mentioned, rather than just once.
95 blob_store_bytes: BlobNumBytes,
96 /// Indicates whether this is a scheduler table or not.
97 ///
98 /// This is an optimization to avoid checking the schema in e.g., `InstanceEnv::{insert, update}`.
99 is_scheduler: bool,
100}
101
102/// The part of a `Table` concerned only with storing rows.
103///
104/// Separated from the "outer" parts of `Table`, especially the `indexes`,
105/// so that `RowRef` can borrow only the `TableInner`,
106/// while other mutable references to the `indexes` exist.
107/// This is necessary because index insertions and deletions take a `RowRef` as an argument,
108/// from which they [`ReadColumn::read_column`] their keys.
109#[derive(Debug, PartialEq, Eq)]
110pub(crate) struct TableInner {
111 /// The type of rows this table stores, with layout information included.
112 row_layout: RowTypeLayout,
113 /// A [`StaticLayout`] for fast BFLATN <-> BSATN conversion,
114 /// if the [`RowTypeLayout`] has a static BSATN length and layout.
115 ///
116 /// A [`StaticBsatnValidator`] is also included.
117 /// It's used to validate BSATN-encoded rows before converting to BFLATN.
118 static_layout: Option<(StaticLayout, StaticBsatnValidator)>,
119 /// The visitor program for `row_layout`.
120 ///
121 /// Must be in the `TableInner` so that [`RowRef::blob_store_bytes`] can use it.
122 visitor_prog: VarLenVisitorProgram,
123 /// The page manager that holds rows
124 /// including both their fixed and variable components.
125 pages: Pages,
126}
127
128impl TableInner {
129 /// Assumes `ptr` is a present row in `self` and returns a [`RowRef`] to it.
130 ///
131 /// # Safety
132 ///
133 /// The requirement is that `table.is_row_present(ptr)` must hold,
134 /// where `table` is the `Table` which contains this `TableInner`.
135 /// That is, `ptr` must refer to a row within `self`
136 /// which was previously inserted and has not been deleted since.
137 ///
138 /// This means:
139 /// - The `PageIndex` of `ptr` must be in-bounds for `self.pages`.
140 /// - The `PageOffset` of `ptr` must be properly aligned for the row type of `self`,
141 /// and must refer to a valid, live row in that page.
142 /// - The `SquashedOffset` of `ptr` must match the enclosing table's `table.squashed_offset`.
143 ///
144 /// Showing that `ptr` was the result of a call to [`Table::insert(table, ..)`]
145 /// and has not been passed to [`Table::delete(table, ..)`]
146 /// is sufficient to demonstrate all of these properties.
147 unsafe fn get_row_ref_unchecked<'a>(&'a self, blob_store: &'a dyn BlobStore, ptr: RowPointer) -> RowRef<'a> {
148 // SAFETY: Forward caller requirements.
149 unsafe { RowRef::new(self, blob_store, ptr) }
150 }
151
152 fn try_page_and_offset(&self, ptr: RowPointer) -> Option<(&Page, PageOffset)> {
153 (ptr.page_index().idx() < self.pages.len()).then(|| (&self.pages[ptr.page_index()], ptr.page_offset()))
154 }
155
156 /// Returns the page and page offset that `ptr` points to.
157 fn page_and_offset(&self, ptr: RowPointer) -> (&Page, PageOffset) {
158 self.try_page_and_offset(ptr).unwrap()
159 }
160}
161
162static_assert_size!(Table, 264);
163
164impl MemoryUsage for Table {
165 fn heap_usage(&self) -> usize {
166 let Self {
167 inner,
168 pointer_map,
169 indexes,
170 // MEMUSE: intentionally ignoring schema
171 schema: _,
172 squashed_offset,
173 row_count,
174 blob_store_bytes,
175 is_scheduler,
176 } = self;
177 inner.heap_usage()
178 + pointer_map.heap_usage()
179 + indexes.heap_usage()
180 + squashed_offset.heap_usage()
181 + row_count.heap_usage()
182 + blob_store_bytes.heap_usage()
183 + is_scheduler.heap_usage()
184 }
185}
186
187impl MemoryUsage for TableInner {
188 fn heap_usage(&self) -> usize {
189 let Self {
190 row_layout,
191 static_layout,
192 visitor_prog,
193 pages,
194 } = self;
195 row_layout.heap_usage() + static_layout.heap_usage() + visitor_prog.heap_usage() + pages.heap_usage()
196 }
197}
198
199/// There was already a row with the same value.
200#[derive(Error, Debug, PartialEq, Eq)]
201#[error("Duplicate insertion of row {0:?} violates set semantics")]
202pub struct DuplicateError(pub RowPointer);
203
204/// Various error that can happen on table insertion.
205#[derive(Error, Debug, PartialEq, Eq, EnumAsInner)]
206pub enum InsertError {
207 /// There was already a row with the same value.
208 #[error(transparent)]
209 Duplicate(#[from] DuplicateError),
210
211 /// Couldn't write the row to the page manager.
212 #[error(transparent)]
213 Bflatn(#[from] super::bflatn_to::Error),
214
215 /// Some index related error occurred.
216 #[error(transparent)]
217 IndexError(#[from] UniqueConstraintViolation),
218}
219
220/// Errors that can occur while trying to read a value via bsatn.
221#[derive(Error, Debug)]
222pub enum ReadViaBsatnError {
223 #[error(transparent)]
224 BSatnError(#[from] BsatnError),
225
226 #[error(transparent)]
227 DecodeError(#[from] DecodeError),
228}
229
230// Public API:
231impl Table {
232 /// Creates a new empty table with the given `schema` and `squashed_offset`.
233 pub fn new(schema: Arc<TableSchema>, squashed_offset: SquashedOffset) -> Self {
234 let row_layout: RowTypeLayout = schema.get_row_type().clone().into();
235 let static_layout = StaticLayout::for_row_type(&row_layout).map(|sl| (sl, static_bsatn_validator(&row_layout)));
236 let visitor_prog = row_type_visitor(&row_layout);
237 // By default, we start off with an empty pointer map,
238 // which is removed when the first unique index is added.
239 let pm = Some(PointerMap::default());
240 Self::new_with_indexes_capacity(schema, row_layout, static_layout, visitor_prog, squashed_offset, pm)
241 }
242
243 /// Returns whether this is a scheduler table.
244 pub fn is_scheduler(&self) -> bool {
245 self.is_scheduler
246 }
247
248 /// Check if the `row` conflicts with any unique index on `self`,
249 /// and if there is a conflict, return `Err`.
250 ///
251 /// `is_deleted` is a predicate which, for a given row pointer,
252 /// returns true if and only if that row should be ignored.
253 /// While checking unique constraints against the committed state,
254 /// `MutTxId::insert` will ignore rows which are listed in the delete table.
255 ///
256 /// # Safety
257 ///
258 /// `row.row_layout() == self.row_layout()` must hold.
259 pub unsafe fn check_unique_constraints<'a, I: Iterator<Item = (&'a IndexId, &'a TableIndex)>>(
260 &'a self,
261 row: RowRef<'_>,
262 adapt: impl FnOnce(btree_map::Iter<'a, IndexId, TableIndex>) -> I,
263 mut is_deleted: impl FnMut(RowPointer) -> bool,
264 ) -> Result<(), UniqueConstraintViolation> {
265 for (&index_id, index) in adapt(self.indexes.iter()).filter(|(_, index)| index.is_unique()) {
266 // SAFETY: Caller promised that `row´ has the same layout as `self`.
267 // Thus, as `index.indexed_columns` is in-bounds of `self`'s layout,
268 // it's also in-bounds of `row`'s layout.
269 let value = unsafe { row.project_unchecked(&index.indexed_columns) };
270 if index.seek_point(&value).next().is_some_and(|ptr| !is_deleted(ptr)) {
271 return Err(self.build_error_unique(index, index_id, value));
272 }
273 }
274 Ok(())
275 }
276
277 /// Insert a `row` into this table, storing its large var-len members in the `blob_store`.
278 ///
279 /// On success, returns the hash, if any, of the newly-inserted row,
280 /// and a `RowRef` referring to the row.s
281 /// The hash is only computed if this table has a [`PointerMap`],
282 /// i.e., does not have any unique indexes.
283 /// If the table has unique indexes,
284 /// the returned `Option<RowHash>` will be `None`.
285 ///
286 /// When a row equal to `row` already exists in `self`,
287 /// returns `InsertError::Duplicate(existing_row_pointer)`,
288 /// where `existing_row_pointer` is a `RowPointer` which identifies the existing row.
289 /// In this case, the duplicate is not inserted,
290 /// but internal data structures may be altered in ways that affect performance and fragmentation.
291 ///
292 /// TODO(error-handling): describe errors from `write_row_to_pages` and return meaningful errors.
293 pub fn insert<'a>(
294 &'a mut self,
295 blob_store: &'a mut dyn BlobStore,
296 row: &ProductValue,
297 ) -> Result<(Option<RowHash>, RowRef<'a>), InsertError> {
298 // Optimistically insert the `row` before checking any constraints
299 // under the assumption that errors (unique constraint & set semantic violations) are rare.
300 let (row_ref, blob_bytes) = self.insert_physically_pv(blob_store, row)?;
301 let row_ptr = row_ref.pointer();
302
303 // Confirm the insertion, checking any constraints, removing the physical row on error.
304 // SAFETY: We just inserted `ptr`, so it must be present.
305 let (hash, row_ptr) = unsafe { self.confirm_insertion(blob_store, row_ptr, blob_bytes) }?;
306 // SAFETY: Per post-condition of `confirm_insertion`, `row_ptr` refers to a valid row.
307 let row_ref = unsafe { self.inner.get_row_ref_unchecked(blob_store, row_ptr) };
308 Ok((hash, row_ref))
309 }
310
311 /// Physically inserts `row` into the page
312 /// without inserting it logically into the pointer map.
313 ///
314 /// This is useful when we need to insert a row temporarily to get back a `RowPointer`.
315 /// A call to this method should be followed by a call to [`delete_internal_skip_pointer_map`].
316 pub fn insert_physically_pv<'a>(
317 &'a mut self,
318 blob_store: &'a mut dyn BlobStore,
319 row: &ProductValue,
320 ) -> Result<(RowRef<'a>, BlobNumBytes), Error> {
321 // SAFETY: `self.pages` is known to be specialized for `self.row_layout`,
322 // as `self.pages` was constructed from `self.row_layout` in `Table::new`.
323 let (ptr, blob_bytes) = unsafe {
324 write_row_to_pages(
325 &mut self.inner.pages,
326 &self.inner.visitor_prog,
327 blob_store,
328 &self.inner.row_layout,
329 row,
330 self.squashed_offset,
331 )
332 }?;
333 // SAFETY: We just inserted `ptr`, so it must be present.
334 let row_ref = unsafe { self.inner.get_row_ref_unchecked(blob_store, ptr) };
335
336 Ok((row_ref, blob_bytes))
337 }
338
339 /// Physically insert a `row`, encoded in BSATN, into this table,
340 /// storing its large var-len members in the `blob_store`.
341 ///
342 /// On success, returns the hash of the newly-inserted row,
343 /// and a `RowRef` referring to the row.
344 ///
345 /// This does not check for set semantic or unique constraints.
346 ///
347 /// This is also useful when we need to insert a row temporarily to get back a `RowPointer`.
348 /// In this case, A call to this method should be followed by a call to [`delete_internal_skip_pointer_map`].
349 ///
350 /// When `row` is not valid BSATN at the table's row type,
351 /// an error is returned and there will be nothing for the caller to revert.
352 pub fn insert_physically_bsatn<'a>(
353 &'a mut self,
354 blob_store: &'a mut dyn BlobStore,
355 row: &[u8],
356 ) -> Result<(RowRef<'a>, BlobNumBytes), Error> {
357 // Got a static layout? => Use fast-path insertion.
358 let (ptr, blob_bytes) = if let Some((static_layout, static_validator)) = self.inner.static_layout.as_ref() {
359 // Before inserting, validate the row, ensuring type safety.
360 // SAFETY: The `static_validator` was derived from the same row layout as the static layout.
361 unsafe { validate_bsatn(static_validator, static_layout, row) }.map_err(Error::Decode)?;
362
363 let fixed_row_size = self.inner.row_layout.size();
364 let squashed_offset = self.squashed_offset;
365 let res = self
366 .inner
367 .pages
368 .with_page_to_insert_row(fixed_row_size, 0, |page| {
369 // SAFETY: We've used the right `row_size` and we trust that others have too.
370 // `RowTypeLayout` also ensures that we satisfy the minimum row size.
371 let fixed_offset = unsafe { page.alloc_fixed_len(fixed_row_size) }.map_err(Error::PageError)?;
372 let (mut fixed, _) = page.split_fixed_var_mut();
373 let fixed_buf = fixed.get_row_mut(fixed_offset, fixed_row_size);
374 // SAFETY:
375 // - We've validated that `row` is of sufficient length.
376 // - The `fixed_buf` is exactly the right `fixed_row_size`.
377 unsafe { static_layout.deserialize_row_into(fixed_buf, row) };
378 Ok(fixed_offset)
379 })
380 .map_err(Error::PagesError)?;
381 match res {
382 (page, Ok(offset)) => (RowPointer::new(false, page, offset, squashed_offset), 0.into()),
383 (_, Err(e)) => return Err(e),
384 }
385 } else {
386 // SAFETY: `self.pages` is known to be specialized for `self.row_layout`,
387 // as `self.pages` was constructed from `self.row_layout` in `Table::new`.
388 unsafe {
389 write_row_to_pages_bsatn(
390 &mut self.inner.pages,
391 &self.inner.visitor_prog,
392 blob_store,
393 &self.inner.row_layout,
394 row,
395 self.squashed_offset,
396 )
397 }?
398 };
399
400 // SAFETY: We just inserted `ptr`, so it must be present.
401 let row_ref = unsafe { self.inner.get_row_ref_unchecked(blob_store, ptr) };
402
403 Ok((row_ref, blob_bytes))
404 }
405
406 /// Returns all the columns with sequences that need generation for this `row`.
407 ///
408 /// # Safety
409 ///
410 /// `self.is_row_present(row)` must hold.
411 pub unsafe fn sequence_triggers_for<'a>(
412 &'a self,
413 blob_store: &'a dyn BlobStore,
414 row: RowPointer,
415 ) -> (ColList, SeqIdList) {
416 let sequences = &*self.get_schema().sequences;
417 let row_ty = self.row_layout().product();
418
419 // SAFETY: Caller promised that `self.is_row_present(row)` holds.
420 let row_ref = unsafe { self.get_row_ref_unchecked(blob_store, row) };
421
422 sequences
423 .iter()
424 // Find all the sequences that are triggered by this row.
425 .filter(|seq| {
426 // SAFETY: `seq.col_pos` is in-bounds of `row_ty.elements`
427 // as `row_ty` was derived from the same schema as `seq` is part of.
428 let elem_ty = unsafe { &row_ty.elements.get_unchecked(seq.col_pos.idx()) };
429 // SAFETY:
430 // - `elem_ty` appears as a column in the row type.
431 // - `AlgebraicValue` is compatible with all types.
432 let val = unsafe { AlgebraicValue::unchecked_read_column(row_ref, elem_ty) };
433 val.is_numeric_zero()
434 })
435 .map(|seq| (seq.col_pos, seq.sequence_id))
436 .unzip()
437 }
438
439 /// Writes `seq_val` to the column at `col_id` in the row identified by `ptr`.
440 ///
441 /// Truncates the `seq_val` to fit the type of the column.
442 ///
443 /// # Safety
444 ///
445 /// - `self.is_row_present(row)` must hold.
446 /// - `col_id` must be a valid column, with a primitive integer type, of the row type.
447 pub unsafe fn write_gen_val_to_col(&mut self, col_id: ColId, ptr: RowPointer, seq_val: i128) {
448 let row_ty = self.inner.row_layout.product();
449 // SAFETY: Caller promised that `col_id` was a valid column.
450 let elem_ty = unsafe { row_ty.elements.get_unchecked(col_id.idx()) };
451 let AlgebraicTypeLayout::Primitive(col_typ) = elem_ty.ty else {
452 // SAFETY: Columns with sequences must be primitive types.
453 unsafe { unreachable_unchecked() }
454 };
455
456 let fixed_row_size = self.inner.row_layout.size();
457 let fixed_buf = self.inner.pages[ptr.page_index()].get_fixed_row_data_mut(ptr.page_offset(), fixed_row_size);
458
459 fn write<const N: usize>(dst: &mut [u8], offset: u16, bytes: [u8; N]) {
460 let offset = offset as usize;
461 dst[offset..offset + N].copy_from_slice(&bytes);
462 }
463
464 match col_typ {
465 PrimitiveType::I8 => write(fixed_buf, elem_ty.offset, (seq_val as i8).to_le_bytes()),
466 PrimitiveType::U8 => write(fixed_buf, elem_ty.offset, (seq_val as u8).to_le_bytes()),
467 PrimitiveType::I16 => write(fixed_buf, elem_ty.offset, (seq_val as i16).to_le_bytes()),
468 PrimitiveType::U16 => write(fixed_buf, elem_ty.offset, (seq_val as u16).to_le_bytes()),
469 PrimitiveType::I32 => write(fixed_buf, elem_ty.offset, (seq_val as i32).to_le_bytes()),
470 PrimitiveType::U32 => write(fixed_buf, elem_ty.offset, (seq_val as u32).to_le_bytes()),
471 PrimitiveType::I64 => write(fixed_buf, elem_ty.offset, (seq_val as i64).to_le_bytes()),
472 PrimitiveType::U64 => write(fixed_buf, elem_ty.offset, (seq_val as u64).to_le_bytes()),
473 PrimitiveType::I128 => write(fixed_buf, elem_ty.offset, seq_val.to_le_bytes()),
474 PrimitiveType::U128 => write(fixed_buf, elem_ty.offset, (seq_val as u128).to_le_bytes()),
475 PrimitiveType::I256 => write(fixed_buf, elem_ty.offset, (i256::from(seq_val)).to_le_bytes()),
476 PrimitiveType::U256 => write(fixed_buf, elem_ty.offset, (u256::from(seq_val as u128)).to_le_bytes()),
477 // SAFETY: Columns with sequences must be integer types.
478 PrimitiveType::Bool | PrimitiveType::F32 | PrimitiveType::F64 => unsafe { unreachable_unchecked() },
479 }
480 }
481
482 /// Performs all the checks necessary after having fully decided on a rows contents.
483 ///
484 /// This includes inserting the row into any applicable indices and/or the pointer map.
485 ///
486 /// On `Ok(_)`, statistics of the table are also updated,
487 /// and the `ptr` still points to a valid row, and otherwise not.
488 ///
489 /// # Safety
490 ///
491 /// `self.is_row_present(row)` must hold.
492 pub unsafe fn confirm_insertion<'a>(
493 &'a mut self,
494 blob_store: &'a mut dyn BlobStore,
495 ptr: RowPointer,
496 blob_bytes: BlobNumBytes,
497 ) -> Result<(Option<RowHash>, RowPointer), InsertError> {
498 // SAFETY: Caller promised that `self.is_row_present(ptr)` holds.
499 let hash = unsafe { self.insert_into_pointer_map(blob_store, ptr) }?;
500 // SAFETY: Caller promised that `self.is_row_present(ptr)` holds.
501 unsafe { self.insert_into_indices(blob_store, ptr) }?;
502
503 self.update_statistics_added_row(blob_bytes);
504 Ok((hash, ptr))
505 }
506
507 /// Confirms a row update, after first updating indices and checking constraints.
508 ///
509 /// On `Ok(_)`:
510 /// - the statistics of the table are also updated,
511 /// - the `ptr` still points to a valid row.
512 ///
513 /// Otherwise, on `Err(_)`:
514 /// - `ptr` will not point to a valid row,
515 /// - the statistics won't be updated.
516 ///
517 /// # Safety
518 ///
519 /// `self.is_row_present(new_row)` and `self.is_row_present(old_row)` must hold.
520 pub unsafe fn confirm_update<'a>(
521 &'a mut self,
522 blob_store: &'a mut dyn BlobStore,
523 new_ptr: RowPointer,
524 old_ptr: RowPointer,
525 blob_bytes_added: BlobNumBytes,
526 ) -> Result<RowPointer, UniqueConstraintViolation> {
527 // (1) Remove old row from indices.
528 // SAFETY: Caller promised that `self.is_row_present(old_ptr)` holds.
529 unsafe { self.delete_from_indices(blob_store, old_ptr) };
530
531 // Insert new row into indices.
532 // SAFETY: Caller promised that `self.is_row_present(ptr)` holds.
533 let res = unsafe { self.insert_into_indices(blob_store, new_ptr) };
534 if let Err(e) = res {
535 // Undo (1).
536 unsafe { self.insert_into_indices(blob_store, old_ptr) }
537 .expect("re-inserting the old row into indices should always work");
538 return Err(e);
539 }
540
541 // Remove the old row physically.
542 // SAFETY: The physical `old_ptr` still exists.
543 let blob_bytes_removed = unsafe { self.delete_internal_skip_pointer_map(blob_store, old_ptr) };
544 self.update_statistics_deleted_row(blob_bytes_removed);
545
546 // Update statistics.
547 self.update_statistics_added_row(blob_bytes_added);
548 Ok(new_ptr)
549 }
550
551 /// We've added a row, update the statistics to record this.
552 #[inline]
553 fn update_statistics_added_row(&mut self, blob_bytes: BlobNumBytes) {
554 self.row_count += 1;
555 self.blob_store_bytes += blob_bytes;
556 }
557
558 /// We've removed a row, update the statistics to record this.
559 #[inline]
560 fn update_statistics_deleted_row(&mut self, blob_bytes: BlobNumBytes) {
561 self.row_count -= 1;
562 self.blob_store_bytes -= blob_bytes;
563 }
564
565 /// Insert row identified by `ptr` into indices.
566 /// This also checks unique constraints.
567 /// Deletes the row if there were any violations.
568 ///
569 /// SAFETY: `self.is_row_present(row)` must hold.
570 /// Post-condition: If this method returns `Ok(_)`, the row still exists.
571 unsafe fn insert_into_indices<'a>(
572 &'a mut self,
573 blob_store: &'a mut dyn BlobStore,
574 ptr: RowPointer,
575 ) -> Result<(), UniqueConstraintViolation> {
576 let mut index_error = None;
577 for (&index_id, index) in self.indexes.iter_mut() {
578 // SAFETY: We just inserted `ptr`, so it must be present.
579 let row_ref = unsafe { self.inner.get_row_ref_unchecked(blob_store, ptr) };
580 // SAFETY: any index in this table was constructed with the same row type as this table.
581 let violation = unsafe { index.check_and_insert(row_ref) };
582 if violation.is_err() {
583 let cols = &index.indexed_columns;
584 let value = row_ref.project(cols).unwrap();
585 let error = UniqueConstraintViolation::build(&self.schema, index, index_id, value);
586 index_error = Some(error);
587 break;
588 }
589 }
590 if let Some(err) = index_error {
591 // Found unique constraint violation.
592 // Undo the insertion.
593 // SAFETY: We just inserted `ptr`, so it must be present.
594 unsafe {
595 self.delete_unchecked(blob_store, ptr);
596 }
597 return Err(err);
598 }
599 Ok(())
600 }
601
602 /// Finds the [`RowPointer`] to the row in `target_table` equal, if any,
603 /// to the row `needle_ptr` in `needle_table`,
604 /// by any unique index in `target_table`.
605 ///
606 /// # Safety
607 ///
608 /// - `target_table` and `needle_table` must have the same `row_layout`.
609 /// - `needle_table.is_row_present(needle_ptr)` must hold.
610 unsafe fn find_same_row_via_unique_index(
611 target_table: &Table,
612 needle_table: &Table,
613 needle_bs: &dyn BlobStore,
614 needle_ptr: RowPointer,
615 ) -> Option<RowPointer> {
616 // Use some index (the one with the lowest `IndexId` currently).
617 // TODO(centril): this isn't what we actually want.
618 // Rather, we'd prefer the index with the simplest type,
619 // but this is left as future work as we don't have to optimize this method now.
620 let target_index = target_table
621 .indexes
622 .values()
623 .find(|idx| idx.is_unique())
624 .expect("there should be at least one unique index");
625 // Project the needle row to the columns of the index, and then seek.
626 // As this is a unique index, there are 0-1 rows for this key.
627 let needle_row = unsafe { needle_table.get_row_ref_unchecked(needle_bs, needle_ptr) };
628 let key = needle_row
629 .project(&target_index.indexed_columns)
630 .expect("needle row should be valid");
631 target_index.seek_point(&key).next().filter(|&target_ptr| {
632 // SAFETY:
633 // - Caller promised that the row layouts were the same.
634 // - We know `target_ptr` exists, as it was in `target_index`, belonging to `target_table`.
635 // - Caller promised that `needle_ptr` is valid for `needle_table`.
636 unsafe { Self::eq_row_in_page(target_table, target_ptr, needle_table, needle_ptr) }
637 })
638 }
639
640 /// Insert the row identified by `ptr` into the table's [`PointerMap`],
641 /// if the table has one.
642 ///
643 /// This checks for set semantic violations.
644 /// If a set semantic conflict (i.e. duplicate row) is detected by the pointer map,
645 /// the row will be deleted and an error returned.
646 /// If the pointer map confirms that the row was unique, returns the `RowHash` of that row.
647 ///
648 /// If this table has no `PointerMap`, returns `Ok(None)`.
649 /// In that case, the row's uniqueness will be verified by [`Self::insert_into_indices`],
650 /// as this table has at least one unique index.
651 ///
652 /// SAFETY: `self.is_row_present(row)` must hold.
653 /// Post-condition: If this method returns `Ok(_)`, the row still exists.
654 unsafe fn insert_into_pointer_map<'a>(
655 &'a mut self,
656 blob_store: &'a mut dyn BlobStore,
657 ptr: RowPointer,
658 ) -> Result<Option<RowHash>, DuplicateError> {
659 if self.pointer_map.is_none() {
660 // No pointer map? Set semantic constraint is checked by a unique index instead.
661 return Ok(None);
662 };
663
664 // SAFETY:
665 // - `self` trivially has the same `row_layout` as `self`.
666 // - Caller promised that `self.is_row_present(row)` holds.
667 let (hash, existing_row) = unsafe { Self::find_same_row_via_pointer_map(self, self, blob_store, ptr, None) };
668
669 if let Some(existing_row) = existing_row {
670 // If an equal row was already present,
671 // roll back our optimistic insert to avoid violating set semantics.
672
673 // SAFETY: Caller promised that `ptr` is a valid row in `self`.
674 unsafe {
675 self.inner
676 .pages
677 .delete_row(&self.inner.visitor_prog, self.row_size(), ptr, blob_store)
678 };
679 return Err(DuplicateError(existing_row));
680 }
681
682 // If the optimistic insertion was correct,
683 // i.e. this is not a set-semantic duplicate,
684 // add it to the `pointer_map`.
685 self.pointer_map
686 .as_mut()
687 .expect("pointer map should exist, as it did previously")
688 .insert(hash, ptr);
689
690 Ok(Some(hash))
691 }
692
693 /// Returns the list of pointers to rows which hash to `row_hash`.
694 ///
695 /// If `self` does not have a [`PointerMap`], always returns the empty slice.
696 fn pointers_for(&self, row_hash: RowHash) -> &[RowPointer] {
697 self.pointer_map.as_ref().map_or(&[], |pm| pm.pointers_for(row_hash))
698 }
699
700 /// Using the [`PointerMap`],
701 /// searches `target_table` for a row equal to `needle_table[needle_ptr]`.
702 ///
703 /// Rows are compared for equality by [`eq_row_in_page`].
704 ///
705 /// Lazily computes the row hash if needed and returns it, or uses the one provided, if any.
706 ///
707 /// Used for detecting set-semantic duplicates when inserting
708 /// into tables without any unique constraints.
709 ///
710 /// Does nothing and always returns `None` if `target_table` does not have a `PointerMap`,
711 /// in which case the caller should instead use [`Self::find_same_row_via_unique_index`].
712 ///
713 /// Note that we don't need the blob store to compute equality,
714 /// as content-addressing means it's sufficient to compare the hashes of large blobs.
715 /// (If we see a collision in `BlobHash` we have bigger problems.)
716 ///
717 /// # Safety
718 ///
719 /// - `target_table` and `needle_table` must have the same `row_layout`.
720 /// - `needle_table.is_row_present(needle_ptr)`.
721 pub unsafe fn find_same_row_via_pointer_map(
722 target_table: &Table,
723 needle_table: &Table,
724 needle_bs: &dyn BlobStore,
725 needle_ptr: RowPointer,
726 row_hash: Option<RowHash>,
727 ) -> (RowHash, Option<RowPointer>) {
728 let row_hash = row_hash.unwrap_or_else(|| {
729 // SAFETY: Caller promised that `needle_table.is_row_present(needle_ptr)`.
730 let row_ref = unsafe { needle_table.get_row_ref_unchecked(needle_bs, needle_ptr) };
731 row_ref.row_hash()
732 });
733
734 // Scan all the frow pointers with `row_hash` in the `committed_table`.
735 let row_ptr = target_table.pointers_for(row_hash).iter().copied().find(|&target_ptr| {
736 // SAFETY:
737 // - Caller promised that the row layouts were the same.
738 // - We know `target_ptr` exists, as it was found in a pointer map.
739 // - Caller promised that `needle_ptr` is valid for `needle_table`.
740 unsafe { Self::eq_row_in_page(target_table, target_ptr, needle_table, needle_ptr) }
741 });
742
743 (row_hash, row_ptr)
744 }
745
746 /// Returns whether the row `target_ptr` in `target_table`
747 /// is exactly equal to the row `needle_ptr` in `needle_ptr`.
748 ///
749 /// # Safety
750 ///
751 /// - `target_table` and `needle_table` must have the same `row_layout`.
752 /// - `target_table.is_row_present(target_ptr)`.
753 /// - `needle_table.is_row_present(needle_ptr)`.
754 pub unsafe fn eq_row_in_page(
755 target_table: &Table,
756 target_ptr: RowPointer,
757 needle_table: &Table,
758 needle_ptr: RowPointer,
759 ) -> bool {
760 let (target_page, target_offset) = target_table.inner.page_and_offset(target_ptr);
761 let (needle_page, needle_offset) = needle_table.inner.page_and_offset(needle_ptr);
762
763 // SAFETY:
764 // - Caller promised that `target_ptr` is valid, so `target_page` and `target_offset` are both valid.
765 // - Caller promised that `needle_ptr` is valid, so `needle_page` and `needle_offset` are both valid.
766 // - Caller promised that the layouts of `target_table` and `needle_table` are the same,
767 // so `target_table` applies to both.
768 // Moreover `(x: Table).inner.static_layout` is always derived from `x.row_layout`.
769 unsafe {
770 eq_row_in_page(
771 target_page,
772 needle_page,
773 target_offset,
774 needle_offset,
775 &target_table.inner.row_layout,
776 target_table.static_layout(),
777 )
778 }
779 }
780
781 /// Searches `target_table` for a row equal to `needle_table[needle_ptr]`,
782 /// and returns the [`RowPointer`] to that row in `target_table`, if it exists.
783 ///
784 /// Searches using the [`PointerMap`] or a unique index, as appropriate for the table.
785 ///
786 /// Lazily computes the row hash if needed and returns it, or uses the one provided, if any.
787 ///
788 /// # Safety
789 ///
790 /// - `target_table` and `needle_table` must have the same `row_layout`.
791 /// - `needle_table.is_row_present(needle_ptr)` must hold.
792 pub unsafe fn find_same_row(
793 target_table: &Table,
794 needle_table: &Table,
795 needle_bs: &dyn BlobStore,
796 needle_ptr: RowPointer,
797 row_hash: Option<RowHash>,
798 ) -> (Option<RowHash>, Option<RowPointer>) {
799 if target_table.pointer_map.is_some() {
800 // SAFETY: Caller promised that `target_table` and `needle_table` have the same `row_layout`.
801 // SAFETY: Caller promised that `needle_table.is_row_present(needle_ptr)`.
802 let (row_hash, row_ptr) = unsafe {
803 Self::find_same_row_via_pointer_map(target_table, needle_table, needle_bs, needle_ptr, row_hash)
804 };
805 (Some(row_hash), row_ptr)
806 } else {
807 (
808 row_hash,
809 // SAFETY: Caller promised that `target_table` and `needle_table` have the same `row_layout`.
810 // SAFETY: Caller promised that `needle_table.is_row_present(needle_ptr)`.
811 unsafe { Self::find_same_row_via_unique_index(target_table, needle_table, needle_bs, needle_ptr) },
812 )
813 }
814 }
815
816 /// Returns a [`RowRef`] for `ptr` or `None` if the row isn't present.
817 pub fn get_row_ref<'a>(&'a self, blob_store: &'a dyn BlobStore, ptr: RowPointer) -> Option<RowRef<'a>> {
818 self.is_row_present(ptr)
819 // SAFETY: We only call `get_row_ref_unchecked` when `is_row_present` holds.
820 .then(|| unsafe { self.get_row_ref_unchecked(blob_store, ptr) })
821 }
822
823 /// Assumes `ptr` is a present row in `self` and returns a [`RowRef`] to it.
824 ///
825 /// # Safety
826 ///
827 /// The requirement is that `self.is_row_present(ptr)` must hold.
828 /// That is, `ptr` must refer to a row within `self`
829 /// which was previously inserted and has not been deleted since.
830 ///
831 /// This means:
832 /// - The `PageIndex` of `ptr` must be in-bounds for `self.pages`.
833 /// - The `PageOffset` of `ptr` must be properly aligned for the row type of `self`,
834 /// and must refer to a valid, live row in that page.
835 /// - The `SquashedOffset` of `ptr` must match `self.squashed_offset`.
836 ///
837 /// Showing that `ptr` was the result of a call to [`Table::insert(table, ..)`]
838 /// and has not been passed to [`Table::delete(table, ..)`]
839 /// is sufficient to demonstrate all of these properties.
840 pub unsafe fn get_row_ref_unchecked<'a>(&'a self, blob_store: &'a dyn BlobStore, ptr: RowPointer) -> RowRef<'a> {
841 debug_assert!(self.is_row_present(ptr));
842 // SAFETY: Caller promised that ^-- holds.
843 unsafe { RowRef::new(&self.inner, blob_store, ptr) }
844 }
845
846 /// Deletes a row in the page manager
847 /// without deleting it logically in the pointer map.
848 ///
849 /// # Safety
850 ///
851 /// `ptr` must point to a valid, live row in this table.
852 pub unsafe fn delete_internal_skip_pointer_map(
853 &mut self,
854 blob_store: &mut dyn BlobStore,
855 ptr: RowPointer,
856 ) -> BlobNumBytes {
857 // Delete the physical row.
858 //
859 // SAFETY:
860 // - `ptr` points to a valid row in this table, per our invariants.
861 // - `self.row_size` known to be consistent with `self.pages`,
862 // as the two are tied together in `Table::new`.
863 unsafe {
864 self.inner
865 .pages
866 .delete_row(&self.inner.visitor_prog, self.row_size(), ptr, blob_store)
867 }
868 }
869
870 /// Deletes the row identified by `ptr` from the table.
871 ///
872 /// Returns the number of blob bytes added. This method does not update statistics by itself.
873 ///
874 /// NOTE: This method skips updating indexes.
875 /// Use `delete_unchecked` or `delete` to delete a row with index updating.
876 ///
877 /// SAFETY: `self.is_row_present(row)` must hold.
878 unsafe fn delete_internal(&mut self, blob_store: &mut dyn BlobStore, ptr: RowPointer) -> BlobNumBytes {
879 // Remove the set semantic association.
880 if let Some(pointer_map) = &mut self.pointer_map {
881 // SAFETY: `self.is_row_present(row)` holds.
882 let row = unsafe { RowRef::new(&self.inner, blob_store, ptr) };
883
884 let _remove_result = pointer_map.remove(row.row_hash(), ptr);
885 debug_assert!(_remove_result);
886 }
887
888 // Delete the physical row.
889 // SAFETY: `ptr` points to a valid row in this table as `self.is_row_present(row)` holds.
890 unsafe { self.delete_internal_skip_pointer_map(blob_store, ptr) }
891 }
892
893 /// Deletes the row identified by `ptr` from the table.
894 ///
895 /// Returns the number of blob bytes deleted. This method does not update statistics by itself.
896 ///
897 /// SAFETY: `self.is_row_present(row)` must hold.
898 unsafe fn delete_unchecked(&mut self, blob_store: &mut dyn BlobStore, ptr: RowPointer) -> BlobNumBytes {
899 // Delete row from indices.
900 // Do this before the actual deletion, as `index.delete` needs a `RowRef`
901 // so it can extract the appropriate value.
902 // SAFETY: Caller promised that `self.is_row_present(row)` holds.
903 unsafe { self.delete_from_indices(blob_store, ptr) };
904
905 // SAFETY: Caller promised that `self.is_row_present(row)` holds.
906 unsafe { self.delete_internal(blob_store, ptr) }
907 }
908
909 /// Delete `row_ref` from all the indices of this table.
910 ///
911 /// SAFETY: `self.is_row_present(row)` must hold.
912 unsafe fn delete_from_indices(&mut self, blob_store: &dyn BlobStore, ptr: RowPointer) {
913 // SAFETY: Caller promised that `self.is_row_present(row)` holds.
914 let row_ref = unsafe { self.inner.get_row_ref_unchecked(blob_store, ptr) };
915
916 for index in self.indexes.values_mut() {
917 index.delete(row_ref).unwrap();
918 }
919 }
920
921 /// Deletes the row identified by `ptr` from the table.
922 ///
923 /// The function `before` is run on the to-be-deleted row,
924 /// if it is present, before deleting.
925 /// This enables callers to extract the deleted row.
926 /// E.g. applying deletes when squashing/merging a transaction into the committed state
927 /// passes `|row| row.to_product_value()` as `before`
928 /// so that the resulting `ProductValue`s can be passed to the subscription evaluator.
929 pub fn delete<'a, R>(
930 &'a mut self,
931 blob_store: &'a mut dyn BlobStore,
932 ptr: RowPointer,
933 before: impl for<'b> FnOnce(RowRef<'b>) -> R,
934 ) -> Option<R> {
935 if !self.is_row_present(ptr) {
936 return None;
937 };
938
939 // SAFETY: We only call `get_row_ref_unchecked` when `is_row_present` holds.
940 let row_ref = unsafe { self.inner.get_row_ref_unchecked(blob_store, ptr) };
941
942 let ret = before(row_ref);
943
944 // SAFETY: We've checked above that `self.is_row_present(ptr)`.
945 let blob_bytes_deleted = unsafe { self.delete_unchecked(blob_store, ptr) };
946 self.update_statistics_deleted_row(blob_bytes_deleted);
947
948 Some(ret)
949 }
950
951 /// If a row exists in `self` which matches `row`
952 /// by [`Table::find_same_row`],
953 /// delete that row.
954 ///
955 /// If a matching row was found, returns the pointer to that row.
956 /// The returned pointer is now invalid, as the row to which it referred has been deleted.
957 ///
958 /// This operation works by temporarily inserting the `row` into `self`,
959 /// checking `find_same_row` on the newly-inserted row,
960 /// deleting the matching row if it exists,
961 /// then deleting the temporary insertion.
962 pub fn delete_equal_row(
963 &mut self,
964 blob_store: &mut dyn BlobStore,
965 row: &ProductValue,
966 ) -> Result<Option<RowPointer>, Error> {
967 // Insert `row` temporarily so `temp_ptr` and `hash` can be used to find the row.
968 // This must avoid consulting and inserting to the pointer map,
969 // as the row is already present, set-semantically.
970 let (temp_row, _) = self.insert_physically_pv(blob_store, row)?;
971 let temp_ptr = temp_row.pointer();
972
973 // Find the row equal to the passed-in `row`.
974 // This uses one of two approaches.
975 // Either there is a pointer map, so we use that,
976 // or, here is at least one unique index, so we use one of them.
977 //
978 // SAFETY:
979 // - `self` trivially has the same `row_layout` as `self`.
980 // - We just inserted `temp_ptr`, so it's valid.
981 let (_, existing_row_ptr) = unsafe { Self::find_same_row(self, self, blob_store, temp_ptr, None) };
982
983 // If an equal row was present, delete it.
984 if let Some(existing_row_ptr) = existing_row_ptr {
985 let blob_bytes_deleted = unsafe {
986 // SAFETY: `find_same_row` ensures that the pointer is valid.
987 self.delete_unchecked(blob_store, existing_row_ptr)
988 };
989 self.update_statistics_deleted_row(blob_bytes_deleted);
990 }
991
992 // Remove the temporary row we inserted in the beginning.
993 // Avoid the pointer map, since we don't want to delete it twice.
994 // SAFETY: `ptr` is valid as we just inserted it.
995 unsafe {
996 self.delete_internal_skip_pointer_map(blob_store, temp_ptr);
997 }
998
999 Ok(existing_row_ptr)
1000 }
1001
1002 /// Returns the row type for rows in this table.
1003 pub fn get_row_type(&self) -> &ProductType {
1004 self.get_schema().get_row_type()
1005 }
1006
1007 /// Returns the schema for this table.
1008 pub fn get_schema(&self) -> &Arc<TableSchema> {
1009 &self.schema
1010 }
1011
1012 /// Runs a mutation on the [`TableSchema`] of this table.
1013 ///
1014 /// This uses a clone-on-write mechanism.
1015 /// If none but `self` refers to the schema, then the mutation will be in-place.
1016 /// Otherwise, the schema must be cloned, mutated,
1017 /// and then the cloned version is written back to the table.
1018 pub fn with_mut_schema(&mut self, with: impl FnOnce(&mut TableSchema)) {
1019 with(Arc::make_mut(&mut self.schema));
1020 }
1021
1022 /// Returns a new [`TableIndex`] for `table`.
1023 pub fn new_index(&self, algo: &IndexAlgorithm, is_unique: bool) -> Result<TableIndex, InvalidFieldError> {
1024 TableIndex::new(self.get_schema().get_row_type(), algo, is_unique)
1025 }
1026
1027 /// Inserts a new `index` into the table.
1028 ///
1029 /// The index will be populated using the rows of the table.
1030 ///
1031 /// # Panics
1032 ///
1033 /// Panics if any row would violate `index`'s unique constraint, if it has one.
1034 ///
1035 /// # Safety
1036 ///
1037 /// Caller must promise that `index` was constructed with the same row type/layout as this table.
1038 pub unsafe fn insert_index(&mut self, blob_store: &dyn BlobStore, index_id: IndexId, mut index: TableIndex) {
1039 let rows = self.scan_rows(blob_store);
1040 // SAFETY: Caller promised that table's row type/layout
1041 // matches that which `index` was constructed with.
1042 // It follows that this applies to any `rows`, as required.
1043 let violation = unsafe { index.build_from_rows(rows) };
1044 violation.unwrap_or_else(|ptr| {
1045 panic!("adding `index` should cause no unique constraint violations, but {ptr:?} would")
1046 });
1047 // SAFETY: Forward caller requirement.
1048 unsafe { self.add_index(index_id, index) };
1049 }
1050
1051 /// Adds an index to the table without populating.
1052 ///
1053 /// # Safety
1054 ///
1055 /// Caller must promise that `index` was constructed with the same row type/layout as this table.
1056 pub unsafe fn add_index(&mut self, index_id: IndexId, index: TableIndex) {
1057 let is_unique = index.is_unique();
1058 self.indexes.insert(index_id, index);
1059
1060 // Remove the pointer map, if any.
1061 if is_unique {
1062 self.pointer_map = None;
1063 }
1064 }
1065
1066 /// Removes an index from the table.
1067 ///
1068 /// Returns whether an index existed with `index_id`.
1069 pub fn delete_index(&mut self, blob_store: &dyn BlobStore, index_id: IndexId) -> bool {
1070 let index = self.indexes.remove(&index_id);
1071 let ret = index.is_some();
1072
1073 // If we removed the last unique index, add a pointer map.
1074 if index.is_some_and(|i| i.is_unique()) && !self.indexes.values().any(|idx| idx.is_unique()) {
1075 self.rebuild_pointer_map(blob_store);
1076 }
1077
1078 // Remove index from schema.
1079 //
1080 // This likely will do a clone-write as over time?
1081 // The schema might have found other referents.
1082 self.with_mut_schema(|s| s.indexes.retain(|x| x.index_id != index_id));
1083
1084 ret
1085 }
1086
1087 /// Returns an iterator over all the rows of `self`, yielded as [`RefRef`]s.
1088 pub fn scan_rows<'a>(&'a self, blob_store: &'a dyn BlobStore) -> TableScanIter<'a> {
1089 TableScanIter {
1090 current_page: None, // Will be filled by the iterator.
1091 current_page_idx: PageIndex(0),
1092 table: self,
1093 blob_store,
1094 }
1095 }
1096
1097 /// Returns this table combined with the index for [`IndexId`], if any.
1098 pub fn get_index_by_id_with_table<'a>(
1099 &'a self,
1100 blob_store: &'a dyn BlobStore,
1101 index_id: IndexId,
1102 ) -> Option<TableAndIndex<'a>> {
1103 Some(TableAndIndex {
1104 table: self,
1105 blob_store,
1106 index: self.get_index_by_id(index_id)?,
1107 })
1108 }
1109
1110 /// Returns the [`TableIndex`] for this [`IndexId`].
1111 pub fn get_index_by_id(&self, index_id: IndexId) -> Option<&TableIndex> {
1112 self.indexes.get(&index_id)
1113 }
1114
1115 /// Returns this table combined with the first index with `cols`, if any.
1116 pub fn get_index_by_cols_with_table<'a>(
1117 &'a self,
1118 blob_store: &'a dyn BlobStore,
1119 cols: &ColList,
1120 ) -> Option<TableAndIndex<'a>> {
1121 let (_, index) = self.get_index_by_cols(cols)?;
1122 Some(TableAndIndex {
1123 table: self,
1124 blob_store,
1125 index,
1126 })
1127 }
1128
1129 /// Returns the first [`TableIndex`] with the given [`ColList`].
1130 pub fn get_index_by_cols(&self, cols: &ColList) -> Option<(IndexId, &TableIndex)> {
1131 self.indexes
1132 .iter()
1133 .find(|(_, index)| &index.indexed_columns == cols)
1134 .map(|(id, idx)| (*id, idx))
1135 }
1136
1137 /// Clones the structure of this table into a new one with
1138 /// the same schema, visitor program, and indices.
1139 /// The new table will be completely empty
1140 /// and will use the given `squashed_offset` instead of that of `self`.
1141 pub fn clone_structure(&self, squashed_offset: SquashedOffset) -> Self {
1142 let schema = self.schema.clone();
1143 let layout = self.row_layout().clone();
1144 let sbl = self.inner.static_layout.clone();
1145 let visitor = self.inner.visitor_prog.clone();
1146 // If we had a pointer map, we'll have one in the cloned one as well, but empty.
1147 let pm = self.pointer_map.as_ref().map(|_| PointerMap::default());
1148 let mut new = Table::new_with_indexes_capacity(schema, layout, sbl, visitor, squashed_offset, pm);
1149
1150 // Clone the index structure. The table is empty, so no need to `build_from_rows`.
1151 for (&index_id, index) in self.indexes.iter() {
1152 new.indexes.insert(index_id, index.clone_structure());
1153 }
1154 new
1155 }
1156
1157 /// Returns the number of bytes occupied by the pages and the blob store.
1158 /// Note that result can be more than the actual physical size occupied by the table
1159 /// because the blob store implementation can do internal optimizations.
1160 /// For more details, refer to the documentation of `self.blob_store_bytes`.
1161 pub fn bytes_occupied_overestimate(&self) -> usize {
1162 (self.num_pages() * PAGE_DATA_SIZE) + (self.blob_store_bytes.0)
1163 }
1164
1165 /// Reset the internal storage of `self` to be `pages`.
1166 ///
1167 /// This recomputes the pointer map based on the `pages`,
1168 /// but does not recompute indexes.
1169 ///
1170 /// Used when restoring from a snapshot.
1171 ///
1172 /// # Safety
1173 ///
1174 /// The schema of rows stored in the `pages` must exactly match `self.schema` and `self.inner.row_layout`.
1175 pub unsafe fn set_pages(&mut self, pages: Vec<Box<Page>>, blob_store: &dyn BlobStore) {
1176 self.inner.pages.set_contents(pages, self.inner.row_layout.size());
1177
1178 // Recompute table metadata based on the new pages.
1179 // Compute the row count first, in case later computations want to use it as a capacity to pre-allocate.
1180 self.compute_row_count(blob_store);
1181 self.rebuild_pointer_map(blob_store);
1182 }
1183
1184 /// Returns the number of rows resident in this table.
1185 ///
1186 /// This scales in runtime with the number of pages in the table.
1187 pub fn num_rows(&self) -> u64 {
1188 self.pages().iter().map(|page| page.num_rows() as u64).sum()
1189 }
1190
1191 #[cfg(test)]
1192 fn reconstruct_num_rows(&self) -> u64 {
1193 self.pages().iter().map(|page| page.reconstruct_num_rows() as u64).sum()
1194 }
1195
1196 /// Returns the number of bytes used by rows resident in this table.
1197 ///
1198 /// This includes data bytes, padding bytes and some overhead bytes,
1199 /// as described in the docs for [`Page::bytes_used_by_rows`],
1200 /// but *does not* include:
1201 ///
1202 /// - Unallocated space within pages.
1203 /// - Per-page overhead (e.g. page headers).
1204 /// - Table overhead (e.g. the [`RowTypeLayout`], [`PointerMap`], [`Schema`] &c).
1205 /// - Indexes.
1206 /// - Large blobs in the [`BlobStore`].
1207 ///
1208 /// Of these, the caller should inspect the blob store in order to account for memory usage by large blobs,
1209 /// and call [`Self::bytes_used_by_index_keys`] to account for indexes,
1210 /// but we intend to eat all the other overheads when billing.
1211 pub fn bytes_used_by_rows(&self) -> u64 {
1212 self.pages()
1213 .iter()
1214 .map(|page| page.bytes_used_by_rows(self.inner.row_layout.size()) as u64)
1215 .sum()
1216 }
1217
1218 #[cfg(test)]
1219 fn reconstruct_bytes_used_by_rows(&self) -> u64 {
1220 self.pages()
1221 .iter()
1222 .map(|page| unsafe {
1223 // Safety: `page` is in `self`, and was constructed using `self.innser.row_layout` and `self.inner.visitor_prog`,
1224 // so the three are mutually consistent.
1225 page.reconstruct_bytes_used_by_rows(self.inner.row_layout.size(), &self.inner.visitor_prog)
1226 } as u64)
1227 .sum()
1228 }
1229
1230 /// Returns the number of rows (or [`RowPointer`]s, more accurately)
1231 /// stored in indexes by this table.
1232 ///
1233 /// This method runs in constant time.
1234 pub fn num_rows_in_indexes(&self) -> u64 {
1235 // Assume that each index contains all rows in the table.
1236 self.num_rows() * self.indexes.len() as u64
1237 }
1238
1239 /// Returns the number of bytes used by keys stored in indexes by this table.
1240 ///
1241 /// This method scales in runtime with the number of indexes in the table,
1242 /// but not with the number of pages or rows.
1243 ///
1244 /// Key size is measured using a metric called "key size" or "data size,"
1245 /// which is intended to capture the number of live user-supplied bytes,
1246 /// not including representational overhead.
1247 /// This is distinct from the BFLATN size measured by [`Self::bytes_used_by_rows`].
1248 /// See the trait [`crate::btree_index::KeySize`] for specifics on the metric measured.
1249 pub fn bytes_used_by_index_keys(&self) -> u64 {
1250 self.indexes.values().map(|idx| idx.num_key_bytes()).sum()
1251 }
1252}
1253
1254/// A reference to a single row within a table.
1255///
1256/// # Safety
1257///
1258/// Having a `r: RowRef` is a proof that [`r.pointer()`](RowRef::pointer) refers to a valid row.
1259/// This makes constructing a `RowRef`, i.e., `RowRef::new`, an `unsafe` operation.
1260#[derive(Copy, Clone)]
1261pub struct RowRef<'a> {
1262 /// The table that has the row at `self.pointer`.
1263 table: &'a TableInner,
1264 /// The blob store used in case there are blob hashes to resolve.
1265 blob_store: &'a dyn BlobStore,
1266 /// The pointer to the row in `self.table`.
1267 pointer: RowPointer,
1268}
1269
1270impl fmt::Debug for RowRef<'_> {
1271 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1272 fmt.debug_struct("RowRef")
1273 .field("pointer", &self.pointer)
1274 .finish_non_exhaustive()
1275 }
1276}
1277
1278impl<'a> RowRef<'a> {
1279 /// Construct a `RowRef` to the row at `pointer` within `table`.
1280 ///
1281 /// # Safety
1282 ///
1283 /// `pointer` must refer to a row within `table`
1284 /// which was previously inserted and has not been deleted since.
1285 ///
1286 /// This means:
1287 /// - The `PageIndex` of `pointer` must be in-bounds for `table.pages`.
1288 /// - The `PageOffset` of `pointer` must be properly aligned for the row type of `table`,
1289 /// and must refer to a valid, live row in that page.
1290 /// - The `SquashedOffset` of `pointer` must match `table.squashed_offset`.
1291 ///
1292 /// Showing that `pointer` was the result of a call to `table.insert`
1293 /// and has not been passed to `table.delete`
1294 /// is sufficient to demonstrate all of these properties.
1295 unsafe fn new(table: &'a TableInner, blob_store: &'a dyn BlobStore, pointer: RowPointer) -> Self {
1296 Self {
1297 table,
1298 blob_store,
1299 pointer,
1300 }
1301 }
1302
1303 /// Extract a `ProductValue` from the table.
1304 ///
1305 /// This is a potentially expensive operation,
1306 /// as it must walk the table's `ProductTypeLayout`
1307 /// and heap-allocate various substructures of the `ProductValue`.
1308 pub fn to_product_value(&self) -> ProductValue {
1309 let res = self
1310 .serialize(ValueSerializer)
1311 .unwrap_or_else(|x| match x {})
1312 .into_product();
1313 // SAFETY: the top layer of a row when serialized is always a product.
1314 unsafe { res.unwrap_unchecked() }
1315 }
1316
1317 /// Check that the `idx`th column of the row type stored by `self` is compatible with `T`,
1318 /// and read the value of that column from `self`.
1319 #[inline]
1320 pub fn read_col<T: ReadColumn>(self, col: impl Into<ColId>) -> Result<T, TypeError> {
1321 T::read_column(self, col.into().idx())
1322 }
1323
1324 /// Construct a projection of the row at `self` by extracting the `cols`.
1325 ///
1326 /// If `cols` contains zero or more than one column, the values of the projected columns are wrapped in a [`ProductValue`].
1327 /// If `cols` is a single column, the value of that column is returned without wrapping in a `ProductValue`.
1328 ///
1329 /// # Safety
1330 ///
1331 /// - `cols` must not specify any column which is out-of-bounds for the row `self´.
1332 pub unsafe fn project_unchecked(self, cols: &ColList) -> AlgebraicValue {
1333 let col_layouts = &self.row_layout().product().elements;
1334
1335 if let Some(head) = cols.as_singleton() {
1336 let head = head.idx();
1337 // SAFETY: caller promised that `head` is in-bounds of `col_layouts`.
1338 let col_layout = unsafe { col_layouts.get_unchecked(head) };
1339 // SAFETY:
1340 // - `col_layout` was just derived from the row layout.
1341 // - `AlgebraicValue` is compatible with any `col_layout`.
1342 // - `self` is a valid row and offsetting to `col_layout` is valid.
1343 return unsafe { AlgebraicValue::unchecked_read_column(self, col_layout) };
1344 }
1345 let mut elements = Vec::with_capacity(cols.len() as usize);
1346 for col in cols.iter() {
1347 let col = col.idx();
1348 // SAFETY: caller promised that any `col` is in-bounds of `col_layouts`.
1349 let col_layout = unsafe { col_layouts.get_unchecked(col) };
1350 // SAFETY:
1351 // - `col_layout` was just derived from the row layout.
1352 // - `AlgebraicValue` is compatible with any `col_layout`.
1353 // - `self` is a valid row and offsetting to `col_layout` is valid.
1354 elements.push(unsafe { AlgebraicValue::unchecked_read_column(self, col_layout) });
1355 }
1356 AlgebraicValue::product(elements)
1357 }
1358
1359 /// Construct a projection of the row at `self` by extracting the `cols`.
1360 ///
1361 /// Returns an error if `cols` specifies an index which is out-of-bounds for the row at `self`.
1362 ///
1363 /// If `cols` contains zero or more than one column, the values of the projected columns are wrapped in a [`ProductValue`].
1364 /// If `cols` is a single column, the value of that column is returned without wrapping in a `ProductValue`.
1365 pub fn project(self, cols: &ColList) -> Result<AlgebraicValue, InvalidFieldError> {
1366 if let Some(head) = cols.as_singleton() {
1367 return self.read_col(head).map_err(|_| head.into());
1368 }
1369 let mut elements = Vec::with_capacity(cols.len() as usize);
1370 for col in cols.iter() {
1371 let col_val = self.read_col(col).map_err(|err| match err {
1372 TypeError::WrongType { .. } => {
1373 unreachable!("AlgebraicValue::read_column never returns a `TypeError::WrongType`")
1374 }
1375 TypeError::IndexOutOfBounds { .. } => col,
1376 })?;
1377 elements.push(col_val);
1378 }
1379 Ok(AlgebraicValue::product(elements))
1380 }
1381
1382 /// Returns the raw row pointer for this row reference.
1383 pub fn pointer(&self) -> RowPointer {
1384 self.pointer
1385 }
1386
1387 /// Returns the blob store that any [`crate::blob_store::BlobHash`]es within the row refer to.
1388 pub(crate) fn blob_store(&self) -> &dyn BlobStore {
1389 self.blob_store
1390 }
1391
1392 /// Return the layout of the row.
1393 ///
1394 /// All rows within the same table will have the same layout.
1395 pub fn row_layout(&self) -> &RowTypeLayout {
1396 &self.table.row_layout
1397 }
1398
1399 /// Returns the page the row is in and the offset of the row within that page.
1400 pub fn page_and_offset(&self) -> (&Page, PageOffset) {
1401 self.table.page_and_offset(self.pointer())
1402 }
1403
1404 /// Returns the bytes for the fixed portion of this row.
1405 pub(crate) fn get_row_data(&self) -> &Bytes {
1406 let (page, offset) = self.page_and_offset();
1407 page.get_row_data(offset, self.table.row_layout.size())
1408 }
1409
1410 /// Returns the row hash for `ptr`.
1411 pub fn row_hash(&self) -> RowHash {
1412 RowHash(RowHash::hasher_builder().hash_one(self))
1413 }
1414
1415 /// Returns the static layout for this row reference, if any.
1416 pub fn static_layout(&self) -> Option<&StaticLayout> {
1417 self.table.static_layout.as_ref().map(|(s, _)| s)
1418 }
1419
1420 /// Encode the row referred to by `self` into a `Vec<u8>` using BSATN and then deserialize it.
1421 pub fn read_via_bsatn<T>(&self, scratch: &mut Vec<u8>) -> Result<T, ReadViaBsatnError>
1422 where
1423 T: DeserializeOwned,
1424 {
1425 self.to_bsatn_extend(scratch)?;
1426 Ok(bsatn::from_slice::<T>(scratch)?)
1427 }
1428
1429 /// Return the number of bytes in the blob store to which this object holds a reference.
1430 ///
1431 /// Used to compute the table's `blob_store_bytes` when reconstructing a snapshot.
1432 ///
1433 /// Even within a single row, this is a conservative overestimate,
1434 /// as a row may contain multiple references to the same large blob.
1435 /// This seems unlikely to occur in practice.
1436 fn blob_store_bytes(&self) -> usize {
1437 let row_data = self.get_row_data();
1438 let (page, _) = self.page_and_offset();
1439 // SAFETY:
1440 // - Existence of a `RowRef` treated as proof
1441 // of the row's validity and type information's correctness.
1442 unsafe { self.table.visitor_prog.visit_var_len(row_data) }
1443 .filter(|vlr| vlr.is_large_blob())
1444 .map(|vlr| {
1445 // SAFETY:
1446 // - Because `vlr.is_large_blob`, it points to exactly one granule.
1447 let granule = unsafe { page.iter_var_len_object(vlr.first_granule) }.next().unwrap();
1448 let blob_hash = granule.blob_hash();
1449 let blob = self.blob_store.retrieve_blob(&blob_hash).unwrap();
1450
1451 blob.len()
1452 })
1453 .sum()
1454 }
1455}
1456
1457impl Serialize for RowRef<'_> {
1458 fn serialize<S: Serializer>(&self, ser: S) -> Result<S::Ok, S::Error> {
1459 let table = self.table;
1460 let (page, offset) = table.page_and_offset(self.pointer);
1461 // SAFETY: `ptr` points to a valid row in this table per above check.
1462 unsafe { serialize_row_from_page(ser, page, self.blob_store, offset, &table.row_layout) }
1463 }
1464}
1465
1466impl ToBsatn for RowRef<'_> {
1467 /// BSATN-encode the row referred to by `self` into a freshly-allocated `Vec<u8>`.
1468 ///
1469 /// This method will use a [`StaticLayout`] if one is available,
1470 /// and may therefore be faster than calling [`bsatn::to_vec`].
1471 fn to_bsatn_vec(&self) -> Result<Vec<u8>, BsatnError> {
1472 if let Some(static_layout) = self.static_layout() {
1473 // Use fast path, by first fetching the row data and then using the static layout.
1474 let row = self.get_row_data();
1475 // SAFETY:
1476 // - Existence of a `RowRef` treated as proof
1477 // of row's validity and type information's correctness.
1478 Ok(unsafe { static_layout.serialize_row_into_vec(row) })
1479 } else {
1480 bsatn::to_vec(self)
1481 }
1482 }
1483
1484 /// BSATN-encode the row referred to by `self` into `buf`,
1485 /// pushing `self`'s bytes onto the end of `buf`, similar to [`Vec::extend`].
1486 ///
1487 /// This method will use a [`StaticLayout`] if one is available,
1488 /// and may therefore be faster than calling [`bsatn::to_writer`].
1489 fn to_bsatn_extend(&self, buf: &mut Vec<u8>) -> Result<(), BsatnError> {
1490 if let Some(static_layout) = self.static_layout() {
1491 // Use fast path, by first fetching the row data and then using the static layout.
1492 let row = self.get_row_data();
1493 // SAFETY:
1494 // - Existence of a `RowRef` treated as proof
1495 // of row's validity and type information's correctness.
1496 unsafe {
1497 static_layout.serialize_row_extend(buf, row);
1498 }
1499 Ok(())
1500 } else {
1501 // Use the slower, but more general, `bsatn_from` serializer to write the row.
1502 bsatn::to_writer(buf, self)
1503 }
1504 }
1505
1506 fn static_bsatn_size(&self) -> Option<u16> {
1507 self.static_layout().map(|sl| sl.bsatn_length)
1508 }
1509}
1510
1511impl Eq for RowRef<'_> {}
1512impl PartialEq for RowRef<'_> {
1513 fn eq(&self, other: &Self) -> bool {
1514 // Ensure that the layouts are the same
1515 // so that we can use `eq_row_in_page`.
1516 // To do this, we first try address equality on the layouts.
1517 // This should succeed when the rows originate from the same table.
1518 // Otherwise, actually compare the layouts, which is expensive, but unlikely to happen.
1519 let a_ty = self.row_layout();
1520 let b_ty = other.row_layout();
1521 if !(ptr::eq(a_ty, b_ty) || a_ty == b_ty) {
1522 return false;
1523 }
1524 let (page_a, offset_a) = self.page_and_offset();
1525 let (page_b, offset_b) = other.page_and_offset();
1526 let static_layout = self.static_layout();
1527 // SAFETY: `offset_a/b` are valid rows in `page_a/b` typed at `a_ty`
1528 // and `static_bsatn_layout` is derived from `a_ty`.
1529 unsafe { eq_row_in_page(page_a, page_b, offset_a, offset_b, a_ty, static_layout) }
1530 }
1531}
1532
1533impl PartialEq<ProductValue> for RowRef<'_> {
1534 fn eq(&self, rhs: &ProductValue) -> bool {
1535 let ty = self.row_layout();
1536 let (page, offset) = self.page_and_offset();
1537 // SAFETY: By having `RowRef`,
1538 // we know that `offset` is a valid offset for a row in `page` typed at `ty`.
1539 unsafe { eq_row_in_page_to_pv(self.blob_store, page, offset, rhs, ty) }
1540 }
1541}
1542
1543impl Hash for RowRef<'_> {
1544 fn hash<H: Hasher>(&self, state: &mut H) {
1545 let (page, offset) = self.table.page_and_offset(self.pointer);
1546 let ty = &self.table.row_layout;
1547 // SAFETY: A `RowRef` is a proof that `self.pointer` refers to a live fixed row in `self.table`, so:
1548 // 1. `offset` points at a row in `page` lasting `ty.size()` bytes.
1549 // 2. the row is valid for `ty`.
1550 // 3. for any `vlr: VarLenRef` stored in the row,
1551 // `vlr.first_offset` is either `NULL` or points to a valid granule in `page`.
1552 unsafe { hash_row_in_page(state, page, self.blob_store, offset, ty) };
1553 }
1554}
1555
1556/// An iterator over all the rows, yielded as [`RowRef`]s, in a table.
1557pub struct TableScanIter<'table> {
1558 /// The current page we're yielding rows from.
1559 /// When `None`, the iterator will attempt to advance to the next page, if any.
1560 current_page: Option<FixedLenRowsIter<'table>>,
1561 /// The current page index we are or will visit.
1562 current_page_idx: PageIndex,
1563 /// The table the iterator is yielding rows from.
1564 pub(crate) table: &'table Table,
1565 /// The `BlobStore` that row references may refer into.
1566 pub(crate) blob_store: &'table dyn BlobStore,
1567}
1568
1569impl<'a> Iterator for TableScanIter<'a> {
1570 type Item = RowRef<'a>;
1571
1572 fn next(&mut self) -> Option<Self::Item> {
1573 // This could have been written using `.flat_map`,
1574 // but we don't have `type Foo = impl Iterator<...>;` on stable yet.
1575 loop {
1576 match &mut self.current_page {
1577 // We're currently visiting a page,
1578 Some(iter_fixed_len) => {
1579 if let Some(page_offset) = iter_fixed_len.next() {
1580 // There's still at least one row in that page to visit,
1581 // return a ref to that row.
1582 let ptr =
1583 RowPointer::new(false, self.current_page_idx, page_offset, self.table.squashed_offset);
1584
1585 // SAFETY: `offset` came from the `iter_fixed_len`, so it must point to a valid row.
1586 let row_ref = unsafe { self.table.get_row_ref_unchecked(self.blob_store, ptr) };
1587 return Some(row_ref);
1588 } else {
1589 // We've finished visiting that page, so set `current_page` to `None`,
1590 // increment `self.current_page_idx` to the index of the next page,
1591 // and go to the `None` case (1) in the match.
1592 self.current_page = None;
1593 self.current_page_idx.0 += 1;
1594 }
1595 }
1596
1597 // (1) If we aren't currently visiting a page,
1598 // the `else` case in the `Some` match arm
1599 // already incremented `self.current_page_idx`,
1600 // or we're just beginning and so it was initialized as 0.
1601 None => {
1602 // If there's another page, set `self.current_page` to it,
1603 // and go to the `Some` case in the match.
1604 let next_page = self.table.pages().get(self.current_page_idx.idx())?;
1605 let iter = next_page.iter_fixed_len(self.table.row_size());
1606 self.current_page = Some(iter);
1607 }
1608 }
1609 }
1610 }
1611}
1612
1613/// A combined table and index,
1614/// allowing direct extraction of a [`IndexScanIter`].
1615#[derive(Copy, Clone)]
1616pub struct TableAndIndex<'a> {
1617 table: &'a Table,
1618 blob_store: &'a dyn BlobStore,
1619 index: &'a TableIndex,
1620}
1621
1622impl<'a> TableAndIndex<'a> {
1623 pub fn table(&self) -> &'a Table {
1624 self.table
1625 }
1626
1627 pub fn index(&self) -> &'a TableIndex {
1628 self.index
1629 }
1630
1631 /// Returns an iterator yielding all rows in this index for `key`.
1632 ///
1633 /// Matching is defined by `Ord for AlgebraicValue`.
1634 pub fn seek_point(&self, key: &AlgebraicValue) -> IndexScanPointIter<'a> {
1635 IndexScanPointIter {
1636 table: self.table,
1637 blob_store: self.blob_store,
1638 btree_index_iter: self.index.seek_point(key),
1639 }
1640 }
1641
1642 /// Returns an iterator yielding all rows in this index that fall within `range`.
1643 ///
1644 /// Matching is defined by `Ord for AlgebraicValue`.
1645 pub fn seek_range(&self, range: &impl RangeBounds<AlgebraicValue>) -> IndexScanRangeIter<'a> {
1646 IndexScanRangeIter {
1647 table: self.table,
1648 blob_store: self.blob_store,
1649 btree_index_iter: self.index.seek_range(range),
1650 }
1651 }
1652}
1653
1654/// An iterator using a [`TableIndex`] to scan a `table`
1655/// for all the [`RowRef`]s matching the specified `key` in the indexed column(s).
1656///
1657/// Matching is defined by `Ord for AlgebraicValue`.
1658pub struct IndexScanPointIter<'a> {
1659 /// The table being scanned for rows.
1660 table: &'a Table,
1661 /// The blob store; passed on to the [`RowRef`]s in case they need it.
1662 blob_store: &'a dyn BlobStore,
1663 /// The iterator performing the index scan yielding row pointers.
1664 btree_index_iter: TableIndexPointIter<'a>,
1665}
1666
1667impl<'a> Iterator for IndexScanPointIter<'a> {
1668 type Item = RowRef<'a>;
1669
1670 fn next(&mut self) -> Option<Self::Item> {
1671 let ptr = self.btree_index_iter.next()?;
1672 // FIXME: Determine if this is correct and if so use `_unchecked`.
1673 // Will a table's index necessarily hold only pointers into that index?
1674 // Edge case: if an index is added during a transaction which then scans that index,
1675 // it appears that the newly-created `TxState` index
1676 // will also hold pointers into the `CommittedState`.
1677 //
1678 // SAFETY: Assuming this is correct,
1679 // `ptr` came from the index, which always holds pointers to valid rows.
1680 self.table.get_row_ref(self.blob_store, ptr)
1681 }
1682}
1683
1684/// An iterator using a [`TableIndex`] to scan a `table`
1685/// for all the [`RowRef`]s matching the specified `range` in the indexed column(s).
1686///
1687/// Matching is defined by `Ord for AlgebraicValue`.
1688pub struct IndexScanRangeIter<'a> {
1689 /// The table being scanned for rows.
1690 table: &'a Table,
1691 /// The blob store; passed on to the [`RowRef`]s in case they need it.
1692 blob_store: &'a dyn BlobStore,
1693 /// The iterator performing the index scan yielding row pointers.
1694 btree_index_iter: TableIndexRangeIter<'a>,
1695}
1696
1697impl<'a> Iterator for IndexScanRangeIter<'a> {
1698 type Item = RowRef<'a>;
1699
1700 fn next(&mut self) -> Option<Self::Item> {
1701 let ptr = self.btree_index_iter.next()?;
1702 // FIXME: Determine if this is correct and if so use `_unchecked`.
1703 // Will a table's index necessarily hold only pointers into that index?
1704 // Edge case: if an index is added during a transaction which then scans that index,
1705 // it appears that the newly-created `TxState` index
1706 // will also hold pointers into the `CommittedState`.
1707 //
1708 // SAFETY: Assuming this is correct,
1709 // `ptr` came from the index, which always holds pointers to valid rows.
1710 self.table.get_row_ref(self.blob_store, ptr)
1711 }
1712}
1713
1714#[derive(Error, Debug, PartialEq, Eq)]
1715#[error("Unique constraint violation '{}' in table '{}': column(s): '{:?}' value: {}", constraint_name, table_name, cols, value.to_satn())]
1716pub struct UniqueConstraintViolation {
1717 pub constraint_name: Box<str>,
1718 pub table_name: Box<str>,
1719 pub cols: Vec<Box<str>>,
1720 pub value: AlgebraicValue,
1721}
1722
1723impl UniqueConstraintViolation {
1724 /// Returns a unique constraint violation error for the given `index`
1725 /// and the `value` that would have been duplicated.
1726 #[cold]
1727 fn build(schema: &TableSchema, index: &TableIndex, index_id: IndexId, value: AlgebraicValue) -> Self {
1728 // Fetch the table name.
1729 let table_name = schema.table_name.clone();
1730
1731 // Fetch the names of the columns used in the index.
1732 let cols = index
1733 .indexed_columns
1734 .iter()
1735 .map(|x| schema.columns()[x.idx()].col_name.clone())
1736 .collect();
1737
1738 // Fetch the name of the index.
1739 let constraint_name = schema
1740 .indexes
1741 .iter()
1742 .find(|i| i.index_id == index_id)
1743 .unwrap()
1744 .index_name
1745 .clone();
1746
1747 Self {
1748 constraint_name,
1749 table_name,
1750 cols,
1751 value,
1752 }
1753 }
1754}
1755
1756// Private API:
1757impl Table {
1758 /// Returns a unique constraint violation error for the given `index`
1759 /// and the `value` that would have been duplicated.
1760 #[cold]
1761 pub fn build_error_unique(
1762 &self,
1763 index: &TableIndex,
1764 index_id: IndexId,
1765 value: AlgebraicValue,
1766 ) -> UniqueConstraintViolation {
1767 let schema = self.get_schema();
1768 UniqueConstraintViolation::build(schema, index, index_id, value)
1769 }
1770
1771 /// Returns a new empty table using the particulars passed.
1772 fn new_with_indexes_capacity(
1773 schema: Arc<TableSchema>,
1774 row_layout: RowTypeLayout,
1775 static_layout: Option<(StaticLayout, StaticBsatnValidator)>,
1776 visitor_prog: VarLenVisitorProgram,
1777 squashed_offset: SquashedOffset,
1778 pointer_map: Option<PointerMap>,
1779 ) -> Self {
1780 Self {
1781 inner: TableInner {
1782 row_layout,
1783 static_layout,
1784 visitor_prog,
1785 pages: Pages::default(),
1786 },
1787 is_scheduler: schema.schedule.is_some(),
1788 schema,
1789 indexes: BTreeMap::new(),
1790 pointer_map,
1791 squashed_offset,
1792 row_count: 0,
1793 blob_store_bytes: BlobNumBytes::default(),
1794 }
1795 }
1796
1797 /// Returns whether the row at `ptr` is present or not.
1798 // TODO: Remove all uses of this method,
1799 // or more likely, gate them behind `debug_assert!`
1800 // so they don't have semantic meaning.
1801 //
1802 // Unlike the previous `locking_tx_datastore::Table`'s `RowId`,
1803 // `RowPointer` is not content-addressed.
1804 // This means it is possible to:
1805 // - have a `RowPointer` A* to row A,
1806 // - Delete row A,
1807 // - Insert row B into the same storage as freed from A,
1808 // - Test `is_row_present(A*)`, which falsely reports that row A is still present.
1809 //
1810 // In the final interface, this method is superfluous anyways,
1811 // as `RowPointer` is not part of our public interface.
1812 // Instead, we will always discover a known-present `RowPointer`
1813 // during a table scan or index seek.
1814 // As such, our `delete` and `insert` methods can be `unsafe`
1815 // and trust that the `RowPointer` is valid.
1816 fn is_row_present(&self, ptr: RowPointer) -> bool {
1817 if self.squashed_offset != ptr.squashed_offset() {
1818 return false;
1819 }
1820 let Some((page, offset)) = self.inner.try_page_and_offset(ptr) else {
1821 return false;
1822 };
1823 page.has_row_offset(self.row_size(), offset)
1824 }
1825
1826 /// Returns the row size for a row in the table.
1827 pub fn row_size(&self) -> Size {
1828 self.row_layout().size()
1829 }
1830
1831 /// Returns the layout for a row in the table.
1832 fn row_layout(&self) -> &RowTypeLayout {
1833 &self.inner.row_layout
1834 }
1835
1836 /// Returns the pages storing the physical rows of this table.
1837 fn pages(&self) -> &Pages {
1838 &self.inner.pages
1839 }
1840
1841 /// Iterates over each [`Page`] in this table, ensuring that its hash is computed before yielding it.
1842 ///
1843 /// Used when capturing a snapshot.
1844 pub fn iter_pages_with_hashes(&mut self) -> impl Iterator<Item = (blake3::Hash, &Page)> {
1845 self.inner.pages.iter_mut().map(|page| {
1846 let hash = page.save_or_get_content_hash();
1847 (hash, &**page)
1848 })
1849 }
1850
1851 /// Returns the number of pages storing the physical rows of this table.
1852 fn num_pages(&self) -> usize {
1853 self.inner.pages.len()
1854 }
1855
1856 /// Returns the [`StaticLayout`] for this table,
1857 pub(crate) fn static_layout(&self) -> Option<&StaticLayout> {
1858 self.inner.static_layout.as_ref().map(|(s, _)| s)
1859 }
1860
1861 /// Rebuild the [`PointerMap`] by iterating over all the rows in `self` and inserting them.
1862 ///
1863 /// Called when restoring from a snapshot after installing the pages,
1864 /// but after computing the row count,
1865 /// since snapshots do not save the pointer map..
1866 fn rebuild_pointer_map(&mut self, blob_store: &dyn BlobStore) {
1867 // TODO(perf): Pre-allocate `PointerMap.map` with capacity `self.row_count`.
1868 // Alternatively, do this at the same time as `compute_row_count`.
1869 let ptrs = self
1870 .scan_rows(blob_store)
1871 .map(|row_ref| (row_ref.row_hash(), row_ref.pointer()))
1872 .collect::<PointerMap>();
1873 self.pointer_map = Some(ptrs);
1874 }
1875
1876 /// Compute and store `self.row_count` and `self.blob_store_bytes`
1877 /// by iterating over all the rows in `self` and counting them.
1878 ///
1879 /// Called when restoring from a snapshot after installing the pages,
1880 /// since snapshots do not save this metadata.
1881 fn compute_row_count(&mut self, blob_store: &dyn BlobStore) {
1882 let mut row_count = 0;
1883 let mut blob_store_bytes = 0;
1884 for row in self.scan_rows(blob_store) {
1885 row_count += 1;
1886 blob_store_bytes += row.blob_store_bytes();
1887 }
1888 self.row_count = row_count as u64;
1889 self.blob_store_bytes = blob_store_bytes.into();
1890 }
1891}
1892
1893#[cfg(test)]
1894pub(crate) mod test {
1895 use super::*;
1896 use crate::blob_store::{HashMapBlobStore, NullBlobStore};
1897 use crate::page::tests::hash_unmodified_save_get;
1898 use crate::var_len::VarLenGranule;
1899 use proptest::prelude::*;
1900 use proptest::test_runner::TestCaseResult;
1901 use spacetimedb_lib::db::raw_def::v9::{RawIndexAlgorithm, RawModuleDefV9Builder};
1902 use spacetimedb_primitives::{col_list, TableId};
1903 use spacetimedb_sats::bsatn::to_vec;
1904 use spacetimedb_sats::proptest::{generate_typed_row, generate_typed_row_vec};
1905 use spacetimedb_sats::{product, AlgebraicType, ArrayValue};
1906 use spacetimedb_schema::def::{BTreeAlgorithm, ModuleDef};
1907 use spacetimedb_schema::schema::Schema as _;
1908
1909 /// Create a `Table` from a `ProductType` without validation.
1910 pub(crate) fn table(ty: ProductType) -> Table {
1911 // Use a fast path here to avoid slowing down Miri in the proptests.
1912 // Does not perform validation.
1913 let schema = TableSchema::from_product_type(ty);
1914 Table::new(schema.into(), SquashedOffset::COMMITTED_STATE)
1915 }
1916
1917 #[test]
1918 fn unique_violation_error() {
1919 let table_name = "UniqueIndexed";
1920 let index_name = "UniqueIndexed_unique_col_idx_btree";
1921 let mut builder = RawModuleDefV9Builder::new();
1922 builder
1923 .build_table_with_new_type(
1924 table_name,
1925 ProductType::from([("unique_col", AlgebraicType::I32), ("other_col", AlgebraicType::I32)]),
1926 true,
1927 )
1928 .with_unique_constraint(0)
1929 .with_index(
1930 RawIndexAlgorithm::BTree { columns: col_list![0] },
1931 "accessor_name_doesnt_matter",
1932 );
1933
1934 let def: ModuleDef = builder.finish().try_into().expect("Failed to build schema");
1935
1936 let schema = TableSchema::from_module_def(&def, def.table(table_name).unwrap(), (), TableId::SENTINEL);
1937 assert_eq!(schema.indexes.len(), 1);
1938 let index_schema = schema.indexes[0].clone();
1939
1940 let mut table = Table::new(schema.into(), SquashedOffset::COMMITTED_STATE);
1941 let cols = ColList::new(0.into());
1942 let algo = BTreeAlgorithm { columns: cols.clone() }.into();
1943
1944 let index = table.new_index(&algo, true).unwrap();
1945 // SAFETY: Index was derived from `table`.
1946 unsafe { table.insert_index(&NullBlobStore, index_schema.index_id, index) };
1947
1948 // Reserve a page so that we can check the hash.
1949 let pi = table.inner.pages.reserve_empty_page(table.row_size()).unwrap();
1950 let hash_pre_ins = hash_unmodified_save_get(&mut table.inner.pages[pi]);
1951
1952 // Insert the row (0, 0).
1953 table
1954 .insert(&mut NullBlobStore, &product![0i32, 0i32])
1955 .expect("Initial insert failed");
1956
1957 // Inserting cleared the hash.
1958 let hash_post_ins = hash_unmodified_save_get(&mut table.inner.pages[pi]);
1959 assert_ne!(hash_pre_ins, hash_post_ins);
1960
1961 // Try to insert the row (0, 1), and assert that we get the expected error.
1962 match table.insert(&mut NullBlobStore, &product![0i32, 1i32]) {
1963 Ok(_) => panic!("Second insert with same unique value succeeded"),
1964 Err(InsertError::IndexError(UniqueConstraintViolation {
1965 constraint_name,
1966 table_name,
1967 cols,
1968 value,
1969 })) => {
1970 assert_eq!(&*constraint_name, index_name);
1971 assert_eq!(&*table_name, "UniqueIndexed");
1972 assert_eq!(cols.iter().map(|c| c.to_string()).collect::<Vec<_>>(), &["unique_col"]);
1973 assert_eq!(value, AlgebraicValue::I32(0));
1974 }
1975 Err(e) => panic!("Expected UniqueConstraintViolation but found {:?}", e),
1976 }
1977
1978 // Second insert did clear the hash while we had a constraint violation,
1979 // as constraint checking is done after insertion and then rolled back.
1980 assert_eq!(table.inner.pages[pi].unmodified_hash(), None);
1981 }
1982
1983 fn insert_retrieve_body(ty: impl Into<ProductType>, val: impl Into<ProductValue>) -> TestCaseResult {
1984 let val = val.into();
1985 let mut blob_store = HashMapBlobStore::default();
1986 let mut table = table(ty.into());
1987 let (hash, row) = table.insert(&mut blob_store, &val).unwrap();
1988 let hash = hash.unwrap();
1989 prop_assert_eq!(row.row_hash(), hash);
1990 let ptr = row.pointer();
1991 prop_assert_eq!(table.pointers_for(hash), &[ptr]);
1992
1993 prop_assert_eq!(table.inner.pages.len(), 1);
1994 prop_assert_eq!(table.inner.pages[PageIndex(0)].num_rows(), 1);
1995
1996 let row_ref = table.get_row_ref(&blob_store, ptr).unwrap();
1997 prop_assert_eq!(row_ref.to_product_value(), val.clone());
1998 let bsatn_val = to_vec(&val).unwrap();
1999 prop_assert_eq!(&bsatn_val, &to_vec(&row_ref).unwrap());
2000 prop_assert_eq!(&bsatn_val, &row_ref.to_bsatn_vec().unwrap());
2001
2002 prop_assert_eq!(
2003 &table.scan_rows(&blob_store).map(|r| r.pointer()).collect::<Vec<_>>(),
2004 &[ptr]
2005 );
2006
2007 Ok(())
2008 }
2009
2010 #[test]
2011 fn repro_serialize_bsatn_empty_array() {
2012 let ty = AlgebraicType::array(AlgebraicType::U64);
2013 let arr = ArrayValue::from(Vec::<u64>::new().into_boxed_slice());
2014 insert_retrieve_body(ty, AlgebraicValue::from(arr)).unwrap();
2015 }
2016
2017 #[test]
2018 fn repro_serialize_bsatn_debug_assert() {
2019 let ty = AlgebraicType::array(AlgebraicType::U64);
2020 let arr = ArrayValue::from((0..130u64).collect::<Box<_>>());
2021 insert_retrieve_body(ty, AlgebraicValue::from(arr)).unwrap();
2022 }
2023
2024 fn reconstruct_index_num_key_bytes(table: &Table, blob_store: &dyn BlobStore, index_id: IndexId) -> u64 {
2025 let index = table.get_index_by_id(index_id).unwrap();
2026
2027 index
2028 .seek_range(&(..))
2029 .map(|row_ptr| {
2030 let row_ref = table.get_row_ref(blob_store, row_ptr).unwrap();
2031 let key = row_ref.project(&index.indexed_columns).unwrap();
2032 crate::table_index::KeySize::key_size_in_bytes(&key) as u64
2033 })
2034 .sum()
2035 }
2036
2037 /// Given a row type `ty`, a set of rows of that type `vals`,
2038 /// and a set of columns within that type `indexed_columns`,
2039 /// populate a table with `vals`, add an index on the `indexed_columns`,
2040 /// and perform various assertions that the reported index size metrics are correct.
2041 fn test_index_size_reporting(
2042 ty: ProductType,
2043 vals: Vec<ProductValue>,
2044 indexed_columns: ColList,
2045 ) -> Result<(), TestCaseError> {
2046 let mut blob_store = HashMapBlobStore::default();
2047 let mut table = table(ty.clone());
2048
2049 for row in &vals {
2050 prop_assume!(table.insert(&mut blob_store, row).is_ok());
2051 }
2052
2053 // We haven't added any indexes yet, so there should be 0 rows in indexes.
2054 prop_assert_eq!(table.num_rows_in_indexes(), 0);
2055
2056 let index_id = IndexId(0);
2057
2058 let algo = BTreeAlgorithm {
2059 columns: indexed_columns.clone(),
2060 }
2061 .into();
2062 let index = TableIndex::new(&ty, &algo, false).unwrap();
2063 // Add an index on column 0.
2064 // Safety:
2065 // We're using `ty` as the row type for both `table` and the new index.
2066 unsafe { table.insert_index(&blob_store, index_id, index) };
2067
2068 // We have one index, which should be fully populated,
2069 // so in total we should have the same number of rows in indexes as we have rows.
2070 prop_assert_eq!(table.num_rows_in_indexes(), table.num_rows());
2071
2072 let index = table.get_index_by_id(index_id).unwrap();
2073
2074 // One index, so table's reporting of bytes used should match that index's reporting.
2075 prop_assert_eq!(table.bytes_used_by_index_keys(), index.num_key_bytes());
2076
2077 // Walk all the rows in the index, sum their key size,
2078 // and assert it matches the `index.num_key_bytes()`
2079 prop_assert_eq!(
2080 index.num_key_bytes(),
2081 reconstruct_index_num_key_bytes(&table, &blob_store, index_id)
2082 );
2083
2084 // Walk all the rows we inserted, project them to the cols that will be their keys,
2085 // sum their key size,
2086 // and assert it matches the `index.num_key_bytes()`
2087 let key_size_in_pvs = vals
2088 .iter()
2089 .map(|row| crate::table_index::KeySize::key_size_in_bytes(&row.project(&indexed_columns).unwrap()) as u64)
2090 .sum();
2091 prop_assert_eq!(index.num_key_bytes(), key_size_in_pvs);
2092
2093 let algo = BTreeAlgorithm {
2094 columns: indexed_columns,
2095 }
2096 .into();
2097 let index = TableIndex::new(&ty, &algo, false).unwrap();
2098 // Add a duplicate of the same index, so we can check that all above quantities double.
2099 // Safety:
2100 // As above, we're using `ty` as the row type for both `table` and the new index.
2101 unsafe { table.insert_index(&blob_store, IndexId(1), index) };
2102
2103 prop_assert_eq!(table.num_rows_in_indexes(), table.num_rows() * 2);
2104 prop_assert_eq!(table.bytes_used_by_index_keys(), key_size_in_pvs * 2);
2105
2106 Ok(())
2107 }
2108
2109 proptest! {
2110 #![proptest_config(ProptestConfig { max_shrink_iters: 0x10000000, ..Default::default() })]
2111
2112 #[test]
2113 fn insert_retrieve((ty, val) in generate_typed_row()) {
2114 insert_retrieve_body(ty, val)?;
2115 }
2116
2117 #[test]
2118 fn insert_delete_removed_from_pointer_map((ty, val) in generate_typed_row()) {
2119 let mut blob_store = HashMapBlobStore::default();
2120 let mut table = table(ty);
2121 let (hash, row) = table.insert(&mut blob_store, &val).unwrap();
2122 let hash = hash.unwrap();
2123 prop_assert_eq!(row.row_hash(), hash);
2124 let ptr = row.pointer();
2125 prop_assert_eq!(table.pointers_for(hash), &[ptr]);
2126
2127 prop_assert_eq!(table.inner.pages.len(), 1);
2128 prop_assert_eq!(table.inner.pages[PageIndex(0)].num_rows(), 1);
2129 prop_assert_eq!(&table.scan_rows(&blob_store).map(|r| r.pointer()).collect::<Vec<_>>(), &[ptr]);
2130 prop_assert_eq!(table.row_count, 1);
2131
2132 let hash_pre_del = hash_unmodified_save_get(&mut table.inner.pages[ptr.page_index()]);
2133
2134 table.delete(&mut blob_store, ptr, |_| ());
2135
2136 let hash_post_del = hash_unmodified_save_get(&mut table.inner.pages[ptr.page_index()]);
2137 assert_ne!(hash_pre_del, hash_post_del);
2138
2139 prop_assert_eq!(table.pointers_for(hash), &[]);
2140
2141 prop_assert_eq!(table.inner.pages.len(), 1);
2142 prop_assert_eq!(table.inner.pages[PageIndex(0)].num_rows(), 0);
2143 prop_assert_eq!(table.row_count, 0);
2144
2145 prop_assert!(&table.scan_rows(&blob_store).next().is_none());
2146 }
2147
2148 #[test]
2149 fn insert_duplicate_set_semantic((ty, val) in generate_typed_row()) {
2150 let mut blob_store = HashMapBlobStore::default();
2151 let mut table = table(ty);
2152
2153 let (hash, row) = table.insert(&mut blob_store, &val).unwrap();
2154 let hash = hash.unwrap();
2155 prop_assert_eq!(row.row_hash(), hash);
2156 let ptr = row.pointer();
2157 prop_assert_eq!(table.inner.pages.len(), 1);
2158 prop_assert_eq!(table.pointers_for(hash), &[ptr]);
2159 prop_assert_eq!(table.row_count, 1);
2160 prop_assert_eq!(&table.scan_rows(&blob_store).map(|r| r.pointer()).collect::<Vec<_>>(), &[ptr]);
2161
2162 let blob_uses = blob_store.usage_counter();
2163
2164 let hash_pre_ins = hash_unmodified_save_get(&mut table.inner.pages[ptr.page_index()]);
2165
2166 prop_assert!(table.insert(&mut blob_store, &val).is_err());
2167
2168 // Hash was cleared and is different despite failure to insert.
2169 let hash_post_ins = hash_unmodified_save_get(&mut table.inner.pages[ptr.page_index()]);
2170 assert_ne!(hash_pre_ins, hash_post_ins);
2171
2172 prop_assert_eq!(table.row_count, 1);
2173 prop_assert_eq!(table.inner.pages.len(), 1);
2174 prop_assert_eq!(table.pointers_for(hash), &[ptr]);
2175
2176 let blob_uses_after = blob_store.usage_counter();
2177
2178 prop_assert_eq!(blob_uses_after, blob_uses);
2179 prop_assert_eq!(table.inner.pages[PageIndex(0)].num_rows(), 1);
2180 prop_assert_eq!(&table.scan_rows(&blob_store).map(|r| r.pointer()).collect::<Vec<_>>(), &[ptr]);
2181 }
2182
2183 #[test]
2184 fn insert_bsatn_same_as_pv((ty, val) in generate_typed_row()) {
2185 let mut bs_pv = HashMapBlobStore::default();
2186 let mut table_pv = table(ty.clone());
2187 let res_pv = table_pv.insert(&mut bs_pv, &val);
2188
2189 let mut bs_bsatn = HashMapBlobStore::default();
2190 let mut table_bsatn = table(ty);
2191 let res_bsatn = insert_bsatn(&mut table_bsatn, &mut bs_bsatn, &val);
2192
2193 prop_assert_eq!(res_pv, res_bsatn);
2194 prop_assert_eq!(bs_pv, bs_bsatn);
2195 prop_assert_eq!(table_pv, table_bsatn);
2196 }
2197
2198 #[test]
2199 fn row_size_reporting_matches_slow_implementations((ty, vals) in generate_typed_row_vec(128, 2048)) {
2200 let mut blob_store = HashMapBlobStore::default();
2201 let mut table = table(ty.clone());
2202
2203 for row in &vals {
2204 prop_assume!(table.insert(&mut blob_store, row).is_ok());
2205 }
2206
2207 prop_assert_eq!(table.bytes_used_by_rows(), table.reconstruct_bytes_used_by_rows());
2208 prop_assert_eq!(table.num_rows(), table.reconstruct_num_rows());
2209 prop_assert_eq!(table.num_rows(), vals.len() as u64);
2210
2211 // TODO(testing): Determine if there's a meaningful way to test that the blob store reporting is correct.
2212 // I (pgoldman 2025-01-27) doubt it, as the test would be "visit every blob and sum their size,"
2213 // which is already what the actual implementation does.
2214 }
2215
2216 #[test]
2217 fn index_size_reporting_matches_slow_implementations_single_column((ty, vals) in generate_typed_row_vec(128, 2048)) {
2218 prop_assume!(!ty.elements.is_empty());
2219
2220 test_index_size_reporting(ty, vals, ColList::from(ColId(0)))?;
2221 }
2222
2223 #[test]
2224 fn index_size_reporting_matches_slow_implementations_two_column((ty, vals) in generate_typed_row_vec(128, 2048)) {
2225 prop_assume!(ty.elements.len() >= 2);
2226
2227
2228 test_index_size_reporting(ty, vals, ColList::from([ColId(0), ColId(1)]))?;
2229 }
2230 }
2231
2232 fn insert_bsatn<'a>(
2233 table: &'a mut Table,
2234 blob_store: &'a mut dyn BlobStore,
2235 val: &ProductValue,
2236 ) -> Result<(Option<RowHash>, RowRef<'a>), InsertError> {
2237 let row = &to_vec(&val).unwrap();
2238
2239 // Optimistically insert the `row` before checking any constraints
2240 // under the assumption that errors (unique constraint & set semantic violations) are rare.
2241 let (row_ref, blob_bytes) = table.insert_physically_bsatn(blob_store, row)?;
2242 let row_ptr = row_ref.pointer();
2243
2244 // Confirm the insertion, checking any constraints, removing the physical row on error.
2245 // SAFETY: We just inserted `ptr`, so it must be present.
2246 let (hash, row_ptr) = unsafe { table.confirm_insertion(blob_store, row_ptr, blob_bytes) }?;
2247 // SAFETY: Per post-condition of `confirm_insertion`, `row_ptr` refers to a valid row.
2248 let row_ref = unsafe { table.inner.get_row_ref_unchecked(blob_store, row_ptr) };
2249 Ok((hash, row_ref))
2250 }
2251
2252 // Compare `scan_rows` against a simpler implementation.
2253 #[test]
2254 fn table_scan_iter_eq_flatmap() {
2255 let mut blob_store = HashMapBlobStore::default();
2256 let mut table = table(AlgebraicType::U64.into());
2257 for v in 0..2u64.pow(14) {
2258 table.insert(&mut blob_store, &product![v]).unwrap();
2259 }
2260
2261 let complex = table.scan_rows(&blob_store).map(|r| r.pointer());
2262 let simple = table
2263 .inner
2264 .pages
2265 .iter()
2266 .zip((0..).map(PageIndex))
2267 .flat_map(|(page, pi)| {
2268 page.iter_fixed_len(table.row_size())
2269 .map(move |po| RowPointer::new(false, pi, po, table.squashed_offset))
2270 });
2271 assert!(complex.eq(simple));
2272 }
2273
2274 #[test]
2275 #[should_panic]
2276 fn read_row_unaligned_page_offset_soundness() {
2277 // Insert a `u64` into a table.
2278 let pt = AlgebraicType::U64.into();
2279 let pv = product![42u64];
2280 let mut table = table(pt);
2281 let blob_store = &mut NullBlobStore;
2282 let (_, row_ref) = table.insert(blob_store, &pv).unwrap();
2283
2284 // Manipulate the page offset to 1 instead of 0.
2285 // This now points into the "middle" of a row.
2286 let ptr = row_ref.pointer().with_page_offset(PageOffset(1));
2287
2288 // We expect this to panic.
2289 // Miri should not have any issue with this call either.
2290 table.get_row_ref(&NullBlobStore, ptr).unwrap().to_product_value();
2291 }
2292
2293 #[test]
2294 fn test_blob_store_bytes() {
2295 let pt: ProductType = [AlgebraicType::String, AlgebraicType::I32].into();
2296 let blob_store = &mut HashMapBlobStore::default();
2297 let mut insert =
2298 |table: &mut Table, string, num| table.insert(blob_store, &product![string, num]).unwrap().1.pointer();
2299 let mut table1 = table(pt.clone());
2300
2301 // Insert short string, `blob_store_bytes` should be 0.
2302 let short_str = std::str::from_utf8(&[98; 6]).unwrap();
2303 let short_row_ptr = insert(&mut table1, short_str, 0);
2304 assert_eq!(table1.blob_store_bytes.0, 0);
2305
2306 // Insert long string, `blob_store_bytes` should be the length of the string.
2307 const BLOB_OBJ_LEN: BlobNumBytes = BlobNumBytes(VarLenGranule::OBJECT_SIZE_BLOB_THRESHOLD + 1);
2308 let long_str = std::str::from_utf8(&[98; BLOB_OBJ_LEN.0]).unwrap();
2309 let long_row_ptr = insert(&mut table1, long_str, 0);
2310 assert_eq!(table1.blob_store_bytes, BLOB_OBJ_LEN);
2311
2312 // Insert previous long string in the same table,
2313 // `blob_store_bytes` should count the length twice,
2314 // even though `HashMapBlobStore` deduplicates it.
2315 let long_row_ptr2 = insert(&mut table1, long_str, 1);
2316 const BLOB_OBJ_LEN_2X: BlobNumBytes = BlobNumBytes(BLOB_OBJ_LEN.0 * 2);
2317 assert_eq!(table1.blob_store_bytes, BLOB_OBJ_LEN_2X);
2318
2319 // Insert previous long string in a new table,
2320 // `blob_store_bytes` should show the length,
2321 // even though `HashMapBlobStore` deduplicates it.
2322 let mut table2 = table(pt);
2323 let _ = insert(&mut table2, long_str, 0);
2324 assert_eq!(table2.blob_store_bytes, BLOB_OBJ_LEN);
2325
2326 // Delete `short_str` row. This should not affect the byte count.
2327 table1.delete(blob_store, short_row_ptr, |_| ()).unwrap();
2328 assert_eq!(table1.blob_store_bytes, BLOB_OBJ_LEN_2X);
2329
2330 // Delete the first long string row. This gets us down to `BLOB_OBJ_LEN` (we had 2x before).
2331 table1.delete(blob_store, long_row_ptr, |_| ()).unwrap();
2332 assert_eq!(table1.blob_store_bytes, BLOB_OBJ_LEN);
2333
2334 // Delete the first long string row. This gets us down to 0 (we've now deleted 2x).
2335 table1.delete(blob_store, long_row_ptr2, |_| ()).unwrap();
2336 assert_eq!(table1.blob_store_bytes, 0.into());
2337 }
2338
2339 /// Assert that calling `get_row_ref` to get a row ref to a non-existent `RowPointer`
2340 /// does not panic.
2341 #[test]
2342 fn get_row_ref_no_panic() {
2343 let blob_store = &mut HashMapBlobStore::default();
2344 let table = table([AlgebraicType::String, AlgebraicType::I32].into());
2345
2346 // This row pointer has an incorrect `SquashedOffset`, and so does not point into `table`.
2347 assert!(table
2348 .get_row_ref(
2349 blob_store,
2350 RowPointer::new(false, PageIndex(0), PageOffset(0), SquashedOffset::TX_STATE),
2351 )
2352 .is_none());
2353
2354 // This row pointer has the correct `SquashedOffset`, but points out-of-bounds within `table`.
2355 assert!(table
2356 .get_row_ref(
2357 blob_store,
2358 RowPointer::new(false, PageIndex(0), PageOffset(0), SquashedOffset::COMMITTED_STATE),
2359 )
2360 .is_none());
2361 }
2362}