use super::{
blob_store::BlobStore,
fixed_bit_set::FixedBitSet,
indexes::{Byte, Bytes, PageOffset, PAGE_HEADER_SIZE, PAGE_SIZE},
var_len::{is_granule_offset_aligned, VarLenGranule, VarLenGranuleHeader, VarLenMembers, VarLenRef},
};
use crate::{fixed_bit_set::IterSet, indexes::max_rows_in_page, static_assert_size, table::BlobNumBytes};
use core::{mem, ops::ControlFlow};
use spacetimedb_sats::layout::{Size, MIN_ROW_SIZE};
use spacetimedb_sats::memory_usage::MemoryUsage;
use spacetimedb_sats::{de::Deserialize, ser::Serialize};
use thiserror::Error;
#[derive(Error, Debug, PartialEq, Eq)]
pub enum Error {
#[error("Want to allocate a var-len object of {need} granules, but have only {have} granules available")]
InsufficientVarLenSpace { need: u16, have: u16 },
#[error("Want to allocate a fixed-len row of {} bytes, but the page is full", need.len())]
InsufficientFixedLenSpace { need: Size },
}
#[repr(C)] #[derive(Clone, Copy, Debug, PartialEq, Eq, bytemuck::NoUninit, Serialize, Deserialize)]
struct FreeCellRef {
next: PageOffset,
}
impl MemoryUsage for FreeCellRef {
fn heap_usage(&self) -> usize {
let Self { next } = self;
next.heap_usage()
}
}
impl FreeCellRef {
const NIL: Self = Self {
next: PageOffset::PAGE_END,
};
#[inline]
fn replace(&mut self, offset: PageOffset) -> FreeCellRef {
let next = mem::replace(&mut self.next, offset);
Self { next }
}
#[inline]
const fn has(&self) -> bool {
!self.next.is_at_end()
}
#[inline]
unsafe fn take_freelist_head(
self: &mut FreeCellRef,
row_data: &Bytes,
adjust_free: impl FnOnce(PageOffset) -> PageOffset,
) -> Option<PageOffset> {
self.has().then(|| {
let head = adjust_free(self.next);
let next = unsafe { get_ref(row_data, head) };
self.replace(*next).next
})
}
#[inline]
unsafe fn prepend_freelist(
self: &mut FreeCellRef,
row_data: &mut Bytes,
new_head: PageOffset,
adjust_free: impl FnOnce(PageOffset) -> PageOffset,
) {
let next = self.replace(new_head);
let new_head = adjust_free(new_head);
let next_slot: &mut FreeCellRef = unsafe { get_mut(row_data, new_head) };
*next_slot = next;
}
}
#[repr(C)] #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] struct FixedHeader {
next_free: FreeCellRef,
last: PageOffset,
num_rows: u16,
present_rows: FixedBitSet,
}
impl MemoryUsage for FixedHeader {
fn heap_usage(&self) -> usize {
let Self {
next_free,
last,
num_rows,
present_rows,
} = self;
next_free.heap_usage() + last.heap_usage() + num_rows.heap_usage() + present_rows.heap_usage()
}
}
static_assert_size!(FixedHeader, 16);
impl FixedHeader {
#[inline]
fn new(max_rows_in_page: usize) -> Self {
Self {
next_free: FreeCellRef::NIL,
last: PageOffset::VAR_LEN_NULL,
num_rows: 0,
present_rows: FixedBitSet::new(max_rows_in_page),
}
}
#[inline]
fn set_row_present(&mut self, offset: PageOffset, fixed_row_size: Size) {
self.set_row_presence(offset, fixed_row_size, true);
self.num_rows += 1;
}
#[inline]
fn set_row_presence(&mut self, offset: PageOffset, fixed_row_size: Size, present: bool) {
self.present_rows.set(offset / fixed_row_size, present);
}
#[inline]
fn is_row_present(&self, offset: PageOffset, fixed_row_size: Size) -> bool {
self.present_rows.get(offset / fixed_row_size)
}
fn reset_for(&mut self, max_rows_in_page: usize) {
self.next_free = FreeCellRef::NIL;
self.last = PageOffset::VAR_LEN_NULL;
self.num_rows = 0;
self.present_rows.reset_for(max_rows_in_page);
}
#[inline]
fn clear(&mut self) {
self.next_free = FreeCellRef::NIL;
self.last = PageOffset::VAR_LEN_NULL;
self.num_rows = 0;
self.present_rows.clear();
}
}
#[repr(C)] #[derive(bytemuck::NoUninit, Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
struct VarHeader {
next_free: FreeCellRef,
freelist_len: u16,
first: PageOffset,
num_granules: u16,
}
impl MemoryUsage for VarHeader {
fn heap_usage(&self) -> usize {
let Self {
next_free,
freelist_len,
first,
num_granules,
} = self;
next_free.heap_usage() + freelist_len.heap_usage() + first.heap_usage() + num_granules.heap_usage()
}
}
static_assert_size!(VarHeader, 8);
impl Default for VarHeader {
fn default() -> Self {
Self {
next_free: FreeCellRef::NIL,
freelist_len: 0,
first: PageOffset::PAGE_END,
num_granules: 0,
}
}
}
impl VarHeader {
fn clear(&mut self) {
*self = Self::default();
}
}
#[repr(C)] #[repr(align(64))] #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] pub(super) struct PageHeader {
fixed: FixedHeader,
var: VarHeader,
unmodified_hash: Option<blake3::Hash>,
}
impl MemoryUsage for PageHeader {
fn heap_usage(&self) -> usize {
let Self {
fixed,
var,
unmodified_hash: _,
} = self;
fixed.heap_usage() + var.heap_usage()
}
}
static_assert_size!(PageHeader, PAGE_HEADER_SIZE);
impl PageHeader {
fn new(max_rows_in_page: usize) -> Self {
Self {
fixed: FixedHeader::new(max_rows_in_page),
var: VarHeader::default(),
unmodified_hash: None,
}
}
fn reset_for(&mut self, max_rows_in_page: usize) {
self.fixed.reset_for(max_rows_in_page);
self.var.clear();
self.unmodified_hash = None;
}
fn clear(&mut self) {
self.fixed.clear();
self.var.clear();
self.unmodified_hash = None;
}
pub(super) fn max_rows_in_page(&self) -> usize {
self.fixed.present_rows.bits()
}
#[cfg(test)]
pub(super) fn present_rows_storage_ptr_for_test(&self) -> *const () {
self.fixed.present_rows.storage().as_ptr().cast()
}
}
const _MIN_ROW_SIZE_CAN_STORE_FCR: () = assert!(MIN_ROW_SIZE.len() >= mem::size_of::<FreeCellRef>());
const _VLG_CAN_STORE_FCR: () = assert!(VarLenGranule::SIZE.len() >= MIN_ROW_SIZE.len());
const _VLG_ALIGN_MULTIPLE_OF_FCR: () = assert!(mem::align_of::<VarLenGranule>() % mem::align_of::<FreeCellRef>() == 0);
type RowData = [Byte; PageOffset::PAGE_END.idx()];
#[repr(C)]
#[repr(align(64))]
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct Page {
header: PageHeader,
row_data: RowData,
}
impl MemoryUsage for Page {
fn heap_usage(&self) -> usize {
self.header.heap_usage()
}
}
static_assert_size!(Page, PAGE_SIZE);
pub struct FixedView<'page> {
fixed_row_data: &'page mut Bytes,
header: &'page mut FixedHeader,
}
impl FixedView<'_> {
pub fn get_row_mut(&mut self, start: PageOffset, fixed_row_size: Size) -> &mut Bytes {
&mut self.fixed_row_data[start.range(fixed_row_size)]
}
fn get_row(&mut self, start: PageOffset, fixed_row_size: Size) -> &Bytes {
&self.fixed_row_data[start.range(fixed_row_size)]
}
pub unsafe fn free(&mut self, row_offset: PageOffset, fixed_row_size: Size) {
unsafe {
self.header
.next_free
.prepend_freelist(self.fixed_row_data, row_offset, |x| x)
};
self.header.num_rows -= 1;
self.header.set_row_presence(row_offset, fixed_row_size, false);
}
}
pub struct VarView<'page> {
var_row_data: &'page mut Bytes,
header: &'page mut VarHeader,
last_fixed: PageOffset,
}
impl<'page> VarView<'page> {
fn has_enough_space_for(&self, len_in_bytes: usize) -> (usize, bool, bool) {
let (num_granules_req, in_blob) = VarLenGranule::bytes_to_granules(len_in_bytes);
let enough_space = num_granules_req <= self.num_granules_available();
(num_granules_req, enough_space, in_blob)
}
fn num_granules_available(&self) -> usize {
self.header.freelist_len as usize
+ VarLenGranule::space_to_granules(gap_remaining_size(self.header.first, self.last_fixed))
}
#[inline(always)]
fn adjuster(&self) -> impl FnOnce(PageOffset) -> PageOffset {
let lf = self.last_fixed;
move |offset| offset - lf
}
pub fn alloc_for_len(&mut self, obj_len: usize) -> Result<(VarLenRef, bool), Error> {
self.alloc_for_obj_common(obj_len, |req_granules| {
let rem = obj_len % VarLenGranule::DATA_SIZE;
(0..req_granules).map(move |rev_idx| {
let len = if rev_idx == 0 && rem != 0 {
rem
} else {
VarLenGranule::DATA_SIZE
};
(<&[u8]>::default(), len)
})
})
}
pub unsafe fn granule_offset_iter(&mut self, first_granule: PageOffset) -> GranuleOffsetIter<'_, 'page> {
GranuleOffsetIter {
next_granule: first_granule,
var_view: self,
}
}
pub fn alloc_for_slice(&mut self, slice: &[u8]) -> Result<(VarLenRef, bool), Error> {
let obj_len = slice.len();
let chunks = |_| VarLenGranule::chunks(slice).rev().map(|c| (c, c.len()));
self.alloc_for_obj_common(obj_len, chunks)
}
#[expect(clippy::doc_overindented_list_items)]
fn alloc_for_obj_common<'chunk, Cs: Iterator<Item = (&'chunk [u8], usize)>>(
&mut self,
obj_len: usize,
chunks: impl Copy + FnOnce(usize) -> Cs,
) -> Result<(VarLenRef, bool), Error> {
let (req_granules, enough_space, in_blob) = self.has_enough_space_for(obj_len);
if !enough_space {
return Err(Error::InsufficientVarLenSpace {
need: req_granules.try_into().unwrap_or(u16::MAX),
have: self.num_granules_available().try_into().unwrap_or(u16::MAX),
});
}
if in_blob {
let vlr = self.alloc_blob_hash()?;
return Ok((vlr, true));
};
let mut next = PageOffset::VAR_LEN_NULL;
debug_assert_eq!(obj_len, chunks(req_granules).map(|(_, len)| len).sum::<usize>());
for (chunk, len) in chunks(req_granules) {
let granule = self.alloc_granule()?;
unsafe { self.write_chunk_to_granule(chunk, len, granule, next) };
next = granule;
}
Ok((
VarLenRef {
first_granule: next,
length_in_bytes: obj_len as u16,
},
false,
))
}
#[cold]
fn alloc_blob_hash(&mut self) -> Result<VarLenRef, Error> {
self.alloc_granule().map(VarLenRef::large_blob)
}
pub unsafe fn write_large_blob_hash_to_granule(
&mut self,
blob_store: &mut dyn BlobStore,
var_len_obj: &impl AsRef<[u8]>,
vlr: VarLenRef,
) -> BlobNumBytes {
let hash = blob_store.insert_blob(var_len_obj.as_ref());
let granule = vlr.first_granule;
unsafe { self.write_chunk_to_granule(&hash.data, hash.data.len(), granule, PageOffset::VAR_LEN_NULL) };
var_len_obj.as_ref().len().into()
}
unsafe fn write_chunk_to_granule(&mut self, chunk: &[u8], len: usize, granule: PageOffset, next: PageOffset) {
let granule = self.adjuster()(granule);
let ptr: *mut VarLenGranule = unsafe { offset_to_ptr_mut(self.var_row_data, granule).cast() };
let header = unsafe { &raw mut (*ptr).header };
unsafe {
header.write(VarLenGranuleHeader::new(len as u8, next));
}
let data = unsafe { &mut (*ptr).data };
data[0..chunk.len()].copy_from_slice(chunk);
}
fn alloc_granule(&mut self) -> Result<PageOffset, Error> {
let granule = self
.alloc_from_freelist()
.or_else(|| self.alloc_from_gap())
.ok_or(Error::InsufficientVarLenSpace { need: 1, have: 0 })?;
debug_assert!(
is_granule_offset_aligned(granule),
"Allocated an unaligned var-len granule: {granule:x}",
);
self.header.num_granules += 1;
Ok(granule)
}
#[inline]
fn alloc_from_freelist(&mut self) -> Option<PageOffset> {
let free = unsafe {
self.header
.next_free
.take_freelist_head(self.var_row_data, |o| o - self.last_fixed)
}?;
self.header.freelist_len -= 1;
Some(free)
}
#[inline]
fn alloc_from_gap(&mut self) -> Option<PageOffset> {
if gap_enough_size_for_row(self.header.first, self.last_fixed, VarLenGranule::SIZE) {
self.header.first -= VarLenGranule::SIZE;
Some(self.header.first)
} else {
None
}
}
#[inline]
unsafe fn free_granule(&mut self, offset: PageOffset) {
self.header.freelist_len += 1;
self.header.num_granules -= 1;
let adjuster = self.adjuster();
unsafe {
self.header
.next_free
.prepend_freelist(self.var_row_data, offset, adjuster)
};
}
unsafe fn get_granule_ref(&self, offset: PageOffset) -> &VarLenGranule {
unsafe { get_ref(self.var_row_data, self.adjuster()(offset)) }
}
#[cold]
#[inline(never)]
unsafe fn free_blob(&self, offset: PageOffset, blob_store: &mut dyn BlobStore) -> BlobNumBytes {
assert!(!offset.is_var_len_null());
let granule = unsafe { self.get_granule_ref(offset) };
let hash = granule.blob_hash();
let blob_store_deleted_bytes = blob_store
.retrieve_blob(&hash)
.expect("failed to free var-len blob")
.len()
.into();
blob_store.free_blob(&hash).expect("failed to free var-len blob");
blob_store_deleted_bytes
}
pub unsafe fn free_object_ignore_blob(&mut self, var_len_obj: VarLenRef) {
let mut next_granule = var_len_obj.first_granule;
while !next_granule.is_var_len_null() {
let header = unsafe { self.get_granule_ref(next_granule) }.header;
unsafe {
self.free_granule(next_granule);
}
next_granule = header.next();
}
}
unsafe fn free_object(&mut self, var_len_obj: VarLenRef, blob_store: &mut dyn BlobStore) -> BlobNumBytes {
let mut blob_store_deleted_bytes = BlobNumBytes::default();
if var_len_obj.is_large_blob() {
unsafe {
blob_store_deleted_bytes = self.free_blob(var_len_obj.first_granule, blob_store);
}
}
unsafe {
self.free_object_ignore_blob(var_len_obj);
}
blob_store_deleted_bytes
}
}
pub struct GranuleOffsetIter<'vv, 'page> {
var_view: &'vv mut VarView<'page>,
next_granule: PageOffset,
}
impl GranuleOffsetIter<'_, '_> {
pub unsafe fn get_mut_data(&mut self, offset: PageOffset, start: usize) -> &mut Bytes {
let granule: &mut VarLenGranule = unsafe { get_mut(self.var_view.var_row_data, offset) };
unsafe { granule.data.as_mut_slice().get_unchecked_mut(start..) }
}
}
impl Iterator for GranuleOffsetIter<'_, '_> {
type Item = PageOffset;
fn next(&mut self) -> Option<Self::Item> {
let adjust = self.var_view.adjuster();
if self.next_granule.is_var_len_null() {
return None;
}
let ret = adjust(self.next_granule);
let granule: &VarLenGranule = unsafe { get_ref(self.var_view.var_row_data, ret) };
self.next_granule = granule.header.next();
Some(ret)
}
}
fn assert_alignment<T>(ptr: *const Byte) {
debug_assert_eq!(
ptr as usize % mem::align_of::<T>(),
0,
"Wanted a PageOffset with align 0x{:x} (for {}) but found 0x{:x}",
mem::align_of::<T>(),
std::any::type_name::<T>(),
ptr as usize,
);
}
#[inline]
pub unsafe fn get_ref<T>(row_data: &Bytes, offset: PageOffset) -> &T {
let ptr = unsafe { offset_to_ptr(row_data, offset) };
assert_alignment::<T>(ptr);
let ptr = ptr.cast::<T>();
unsafe { &*ptr }
}
#[inline]
unsafe fn get_mut<T>(row_data: &mut Bytes, offset: PageOffset) -> &mut T {
let ptr = unsafe { offset_to_ptr_mut(row_data, offset) };
assert_alignment::<T>(ptr as *const Byte);
let ptr = ptr.cast::<T>();
unsafe { &mut *ptr }
}
#[inline]
unsafe fn offset_to_ptr(row_data: &Bytes, offset: PageOffset) -> *const Byte {
debug_assert!(offset.idx() <= row_data.len());
unsafe { row_data.as_ptr().add(offset.idx()) }
}
#[inline]
unsafe fn offset_to_ptr_mut(row_data: &mut Bytes, offset: PageOffset) -> *mut Byte {
debug_assert!(offset.idx() <= row_data.len());
unsafe { row_data.as_mut_ptr().add(offset.idx()) }
}
#[inline]
fn gap_remaining_size(first_var: PageOffset, last_fixed: PageOffset) -> Size {
Size((first_var - last_fixed).0)
}
#[inline]
fn gap_enough_size_for_row(first_var: PageOffset, last_fixed: PageOffset, fixed_row_size: Size) -> bool {
gap_remaining_size(first_var, last_fixed) >= fixed_row_size
}
impl Page {
pub fn new(fixed_row_size: Size) -> Box<Self> {
Self::new_with_max_row_count(max_rows_in_page(fixed_row_size))
}
pub fn new_with_max_row_count(max_rows_in_page: usize) -> Box<Self> {
use std::alloc::{alloc_zeroed, handle_alloc_error, Layout};
let layout = Layout::new::<Page>();
let raw: *mut Page = unsafe { alloc_zeroed(layout) }.cast();
if raw.is_null() {
handle_alloc_error(layout);
}
let header = unsafe { &raw mut (*raw).header };
unsafe { header.write(PageHeader::new(max_rows_in_page)) };
unsafe { Box::from_raw(raw) }
}
pub fn num_rows(&self) -> usize {
self.header.fixed.num_rows as usize
}
#[cfg(test)]
pub fn reconstruct_num_rows(&self) -> usize {
self.header.fixed.present_rows.iter_set().count()
}
pub fn num_var_len_granules(&self) -> usize {
self.header.var.num_granules as usize
}
#[cfg(test)]
pub unsafe fn reconstruct_num_var_len_granules(
&self,
fixed_row_size: Size,
var_len_visitor: &impl VarLenMembers,
) -> usize {
self.iter_fixed_len(fixed_row_size)
.flat_map(|row| unsafe {
var_len_visitor.visit_var_len(self.get_row_data(row, fixed_row_size))
})
.flat_map(|var_len_obj| unsafe {
self.iter_var_len_object(var_len_obj.first_granule)
})
.count()
}
pub fn bytes_used_by_rows(&self, fixed_row_size: Size) -> usize {
let fixed_row_bytes = self.num_rows() * fixed_row_size.len();
let var_len_bytes = self.num_var_len_granules() * VarLenGranule::SIZE.len();
fixed_row_bytes + var_len_bytes
}
#[cfg(test)]
pub unsafe fn reconstruct_bytes_used_by_rows(
&self,
fixed_row_size: Size,
var_len_visitor: &impl VarLenMembers,
) -> usize {
let fixed_row_bytes = self.reconstruct_num_rows() * fixed_row_size.len();
let var_len_bytes = unsafe { self.reconstruct_num_var_len_granules(fixed_row_size, var_len_visitor) }
* VarLenGranule::SIZE.len();
fixed_row_bytes + var_len_bytes
}
pub fn get_row_data(&self, row: PageOffset, size: Size) -> &Bytes {
&self.row_data[row.range(size)]
}
pub fn has_row_offset(&self, fixed_row_size: Size, offset: PageOffset) -> bool {
assert_eq!(offset.idx() % fixed_row_size.len(), 0);
self.header.fixed.is_row_present(offset, fixed_row_size)
}
pub fn split_fixed_var_mut(&mut self) -> (FixedView<'_>, VarView<'_>) {
let last_fixed = self.header.fixed.last;
let (fixed_row_data, var_row_data) = self.row_data.split_at_mut(last_fixed.idx());
let fixed = FixedView {
fixed_row_data,
header: &mut self.header.fixed,
};
let var = VarView {
var_row_data,
header: &mut self.header.var,
last_fixed,
};
(fixed, var)
}
pub fn get_fixed_row_data_mut(&mut self, start: PageOffset, fixed_row_size: Size) -> &mut Bytes {
self.header.unmodified_hash = None;
&mut self.row_data[start.range(fixed_row_size)]
}
pub fn total_granules_required_for_objects(objects: &[impl AsRef<[u8]>]) -> usize {
objects
.iter()
.map(|obj| VarLenGranule::bytes_to_granules(obj.as_ref().len()).0)
.sum()
}
pub fn has_space_for_row_with_objects(&self, fixed_row_size: Size, var_len_objects: &[impl AsRef<[u8]>]) -> bool {
let num_granules_required = Self::total_granules_required_for_objects(var_len_objects);
self.has_space_for_row(fixed_row_size, num_granules_required)
}
pub fn has_space_for_row(&self, fixed_row_size: Size, num_granules: usize) -> bool {
let gap_remaining = gap_remaining_size(self.header.var.first, self.header.fixed.last);
let gap_avail_for_granules = if self.header.fixed.next_free.has() {
gap_remaining
} else {
if gap_remaining < fixed_row_size {
return false;
}
gap_remaining - fixed_row_size
};
let gap_in_granules = VarLenGranule::space_to_granules(gap_avail_for_granules);
let needed_granules_after_freelist = num_granules.saturating_sub(self.header.var.freelist_len as usize);
gap_in_granules >= needed_granules_after_freelist
}
pub fn is_full(&self, fixed_row_size: Size) -> bool {
!self.has_space_for_row(fixed_row_size, 0)
}
pub unsafe fn insert_row(
&mut self,
fixed_row: &Bytes,
var_len_objects: &[impl AsRef<[u8]>],
var_len_visitor: &impl VarLenMembers,
blob_store: &mut dyn BlobStore,
) -> Result<PageOffset, Error> {
let fixed_row_size = Size(fixed_row.len() as u16);
let fixed_len_offset = unsafe { self.alloc_fixed_len(fixed_row_size)? };
let (mut fixed, mut var) = self.split_fixed_var_mut();
let row = fixed.get_row_mut(fixed_len_offset, fixed_row_size);
row.copy_from_slice(fixed_row);
let vlr_slot_iter = unsafe { var_len_visitor.visit_var_len_mut(row) };
for (var_len_ref_slot, var_len_obj) in vlr_slot_iter.zip(var_len_objects) {
let (var_len_ref, in_blob) = var.alloc_for_slice(var_len_obj.as_ref())?;
if in_blob {
unsafe {
var.write_large_blob_hash_to_granule(blob_store, var_len_obj, var_len_ref);
}
}
*var_len_ref_slot = var_len_ref;
}
Ok(fixed_len_offset)
}
pub unsafe fn alloc_fixed_len(&mut self, fixed_row_size: Size) -> Result<PageOffset, Error> {
self.alloc_fixed_len_from_freelist(fixed_row_size)
.or_else(|| self.alloc_fixed_len_from_gap(fixed_row_size))
.ok_or(Error::InsufficientFixedLenSpace { need: fixed_row_size })
}
#[inline]
fn alloc_fixed_len_from_freelist(&mut self, fixed_row_size: Size) -> Option<PageOffset> {
let header = &mut self.header.fixed;
let free = unsafe { header.next_free.take_freelist_head(&self.row_data, |x| x) }?;
header.set_row_present(free, fixed_row_size);
self.header.unmodified_hash = None;
Some(free)
}
#[inline]
fn alloc_fixed_len_from_gap(&mut self, fixed_row_size: Size) -> Option<PageOffset> {
if gap_enough_size_for_row(self.header.var.first, self.header.fixed.last, fixed_row_size) {
self.header.unmodified_hash = None;
let ptr = self.header.fixed.last;
self.header.fixed.last += fixed_row_size;
self.header.fixed.set_row_present(ptr, fixed_row_size);
Some(ptr)
} else {
None
}
}
fn iter_fixed_len_from(&self, fixed_row_size: Size, starting_from: PageOffset) -> FixedLenRowsIter<'_> {
let idx = starting_from / fixed_row_size;
FixedLenRowsIter {
idx_iter: self.header.fixed.present_rows.iter_set_from(idx),
fixed_row_size,
}
}
pub fn iter_fixed_len(&self, fixed_row_size: Size) -> FixedLenRowsIter<'_> {
FixedLenRowsIter {
idx_iter: self.header.fixed.present_rows.iter_set(),
fixed_row_size,
}
}
pub unsafe fn iter_var_len_object(
&self,
first_granule: PageOffset,
) -> impl Clone + Iterator<Item = &VarLenGranule> {
VarLenGranulesIter {
page: self,
next_granule: first_granule,
}
}
pub unsafe fn iter_vlo_data(&self, first_granule: PageOffset) -> impl '_ + Clone + Iterator<Item = &[u8]> {
unsafe { self.iter_var_len_object(first_granule) }.map(|g| g.data())
}
pub unsafe fn delete_row(
&mut self,
fixed_row: PageOffset,
fixed_row_size: Size,
var_len_visitor: &impl VarLenMembers,
blob_store: &mut dyn BlobStore,
) -> BlobNumBytes {
self.header.unmodified_hash = None;
let (mut fixed, mut var) = self.split_fixed_var_mut();
let mut blob_store_deleted_bytes = BlobNumBytes::default();
let row = fixed.get_row(fixed_row, fixed_row_size);
let var_len_refs = unsafe { var_len_visitor.visit_var_len(row) };
for var_len_ref in var_len_refs {
blob_store_deleted_bytes += unsafe { var.free_object(*var_len_ref, blob_store) }
}
unsafe {
fixed.free(fixed_row, fixed_row_size);
}
blob_store_deleted_bytes
}
pub unsafe fn row_total_granules(
&self,
fixed_row_offset: PageOffset,
fixed_row_size: Size,
var_len_visitor: &impl VarLenMembers,
) -> usize {
let fixed_row = self.get_row_data(fixed_row_offset, fixed_row_size);
let vlr_iter = unsafe { var_len_visitor.visit_var_len(fixed_row) };
vlr_iter.copied().map(|slot| slot.granules_used()).sum()
}
pub unsafe fn copy_filter_into(
&self,
starting_from: PageOffset,
dst: &mut Page,
fixed_row_size: Size,
var_len_visitor: &impl VarLenMembers,
blob_store: &mut dyn BlobStore,
mut filter: impl FnMut(&Page, PageOffset) -> bool,
) -> ControlFlow<(), PageOffset> {
for row_offset in self
.iter_fixed_len_from(fixed_row_size, starting_from)
.filter(|o| filter(self, *o))
{
if !unsafe { self.copy_row_into(row_offset, dst, fixed_row_size, var_len_visitor, blob_store) } {
return ControlFlow::Continue(row_offset);
}
}
ControlFlow::Break(())
}
unsafe fn copy_row_into(
&self,
row_offset: PageOffset,
dst: &mut Page,
fixed_row_size: Size,
var_len_visitor: &impl VarLenMembers,
blob_store: &mut dyn BlobStore,
) -> bool {
let required_granules = unsafe { self.row_total_granules(row_offset, fixed_row_size, var_len_visitor) };
if !dst.has_space_for_row(fixed_row_size, required_granules) {
return false;
};
let src_row = self.get_row_data(row_offset, fixed_row_size);
let inserted_offset = unsafe { dst.alloc_fixed_len(fixed_row_size) }
.expect("Failed to allocate fixed-len row in dst page after checking for available space");
let (mut dst_fixed, mut dst_var) = dst.split_fixed_var_mut();
let dst_row = dst_fixed.get_row_mut(inserted_offset, fixed_row_size);
dst_row.copy_from_slice(src_row);
let src_vlr_iter = unsafe { var_len_visitor.visit_var_len(src_row) };
let target_vlr_iter = unsafe { var_len_visitor.visit_var_len_mut(dst_row) };
for (src_vlr, target_vlr_slot) in src_vlr_iter.zip(target_vlr_iter) {
let target_vlr_fixup = unsafe { self.copy_var_len_into(*src_vlr, &mut dst_var, blob_store) }
.expect("Failed to allocate var-len object in dst page after checking for available space");
*target_vlr_slot = target_vlr_fixup;
}
true
}
unsafe fn copy_var_len_into(
&self,
src_vlr: VarLenRef,
dst_var: &mut VarView<'_>,
blob_store: &mut dyn BlobStore,
) -> Result<VarLenRef, Error> {
let mut iter = unsafe { self.iter_var_len_object(src_vlr.first_granule) };
let Some(mut src_chunk) = iter.next() else {
debug_assert!(src_vlr.length_in_bytes == 0);
return Ok(VarLenRef::NULL);
};
let mut dst_chunk = dst_var.alloc_granule()?;
let copied_head = dst_chunk;
for next_src_chunk in iter {
let next_dst_chunk = dst_var.alloc_granule()?;
let data = src_chunk.data();
unsafe { dst_var.write_chunk_to_granule(data, data.len(), dst_chunk, next_dst_chunk) };
dst_chunk = next_dst_chunk;
src_chunk = next_src_chunk;
}
let data = src_chunk.data();
unsafe { dst_var.write_chunk_to_granule(data, data.len(), dst_chunk, PageOffset::VAR_LEN_NULL) };
if src_vlr.is_large_blob() {
blob_store
.clone_blob(&src_chunk.blob_hash())
.expect("blob_store could not mark hash as used");
}
Ok(VarLenRef {
first_granule: copied_head,
length_in_bytes: src_vlr.length_in_bytes,
})
}
pub fn clear(&mut self) {
self.header.clear();
}
pub unsafe fn zero_data(&mut self) {
self.row_data.fill(0);
}
pub fn reset_for(&mut self, max_rows_in_page: usize) {
self.header.reset_for(max_rows_in_page);
}
pub(super) unsafe fn set_raw(&mut self, header: PageHeader, row_data: RowData) {
self.header = header;
self.row_data = row_data;
}
#[cfg(test)]
pub(super) fn page_header_for_test(&self) -> &PageHeader {
&self.header
}
pub fn content_hash(&self) -> blake3::Hash {
let mut hasher = blake3::Hasher::new();
hasher.update(&self.row_data);
let fixed = &self.header.fixed;
let mut fixed_bytes = [0u8; 6];
fixed_bytes[0..2].copy_from_slice(&fixed.next_free.next.0.to_le_bytes());
fixed_bytes[2..4].copy_from_slice(&fixed.last.0.to_le_bytes());
fixed_bytes[4..6].copy_from_slice(&fixed.num_rows.to_le_bytes());
hasher.update(&fixed_bytes);
hasher.update(bytemuck::must_cast_slice(fixed.present_rows.storage()));
hasher.update(bytemuck::bytes_of(&self.header.var));
hasher.finalize()
}
pub fn save_content_hash(&mut self) {
let hash = self.content_hash();
self.header.unmodified_hash = Some(hash);
}
pub fn save_or_get_content_hash(&mut self) -> blake3::Hash {
self.unmodified_hash().copied().unwrap_or_else(|| {
self.save_content_hash();
self.header.unmodified_hash.unwrap()
})
}
pub fn unmodified_hash(&self) -> Option<&blake3::Hash> {
self.header.unmodified_hash.as_ref()
}
}
pub struct FixedLenRowsIter<'page> {
idx_iter: IterSet<'page>,
fixed_row_size: Size,
}
impl Iterator for FixedLenRowsIter<'_> {
type Item = PageOffset;
fn next(&mut self) -> Option<Self::Item> {
self.idx_iter
.next()
.map(|idx| PageOffset(idx as u16 * self.fixed_row_size.0))
}
}
#[derive(Clone, Copy)]
struct VarLenGranulesIter<'page> {
page: &'page Page,
next_granule: PageOffset,
}
impl<'page> Iterator for VarLenGranulesIter<'page> {
type Item = &'page VarLenGranule;
fn next(&mut self) -> Option<Self::Item> {
if self.next_granule.is_var_len_null() {
return None;
}
let granule: &VarLenGranule = unsafe { get_ref(&self.page.row_data, self.next_granule) };
self.next_granule = granule.header.next();
Some(granule)
}
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use crate::{blob_store::NullBlobStore, page_pool::PagePool, var_len::AlignedVarLenOffsets};
use proptest::{collection::vec, prelude::*};
use spacetimedb_sats::bsatn;
use spacetimedb_sats::layout::row_size_for_type;
fn u64_row_size() -> Size {
let fixed_row_size = row_size_for_type::<u64>();
assert_eq!(fixed_row_size.len(), 8);
fixed_row_size
}
const U64_VL_VISITOR: AlignedVarLenOffsets<'_> = AlignedVarLenOffsets::from_offsets(&[]);
fn u64_var_len_visitor() -> &'static AlignedVarLenOffsets<'static> {
&U64_VL_VISITOR
}
fn insert_u64(page: &mut Page, val: u64) -> PageOffset {
let val_slice = val.to_le_bytes();
unsafe { page.insert_row(&val_slice, &[] as &[&[u8]], u64_var_len_visitor(), &mut NullBlobStore) }
.expect("Failed to insert first row")
}
fn insert_u64_drop(page: &mut Page, val: u64) {
insert_u64(page, val);
}
fn read_u64(page: &Page, offset: PageOffset) -> u64 {
let row = page.get_row_data(offset, u64_row_size());
u64::from_le_bytes(row.try_into().unwrap())
}
fn data_sub_n_vlg(n: usize) -> usize {
PageOffset::PAGE_END.idx() - (VarLenGranule::SIZE * n).len()
}
pub(crate) fn hash_unmodified_save_get(page: &mut Page) -> blake3::Hash {
assert_eq!(page.header.unmodified_hash, None);
page.save_content_hash();
page.header.unmodified_hash.unwrap()
}
#[test]
fn insert_one_u64() {
let mut page = Page::new(u64_row_size());
let hash = hash_unmodified_save_get(&mut page);
let val: u64 = 0xa5a5_a5a5_a5a5_a5a5;
let offset = insert_u64(&mut page, val);
assert_eq!(offset.idx(), 0);
let row_val = read_u64(&page, offset);
assert_eq!(row_val, val);
assert_ne!(hash, hash_unmodified_save_get(&mut page));
}
fn insert_while(
page: &mut Page,
mut next_val: u64,
fixed_row_size: Size,
vl_num: usize,
mut insert: impl FnMut(&mut Page, u64),
) -> u64 {
while page.has_space_for_row(fixed_row_size, vl_num) {
insert(page, next_val);
next_val += 1;
}
next_val
}
#[test]
fn fill_then_iter_fixed_len_u64() {
let mut page = Page::new(u64_row_size());
let last_val = insert_while(&mut page, 0, u64_row_size(), 0, insert_u64_drop);
assert_eq!(last_val, (PageOffset::PAGE_END / u64_row_size()) as u64);
for (row_idx, expected_val) in page.iter_fixed_len(u64_row_size()).zip(0..last_val) {
let row_val = read_u64(&page, row_idx);
assert_eq!(
row_val, expected_val,
"row_val {row_val:x} /= expected_val {expected_val:x}"
);
}
}
#[test]
fn fill_delete_iter_fixed_len_u64() {
let mut page = Page::new(u64_row_size());
let hash_pre_ins = hash_unmodified_save_get(&mut page);
let mut odds: Vec<PageOffset> = Vec::new();
let last_val = insert_while(&mut page, 2, u64_row_size(), 0, |page, val| {
let offset = insert_u64(page, val);
if val % 2 == 1 {
odds.push(offset);
}
});
let hash_pre_del = hash_unmodified_save_get(&mut page);
assert_ne!(hash_pre_ins, hash_pre_del);
for row_offset in odds {
unsafe { page.delete_row(row_offset, u64_row_size(), u64_var_len_visitor(), &mut NullBlobStore) };
}
let hash_pre_iter = hash_unmodified_save_get(&mut page);
assert_ne!(hash_pre_ins, hash_pre_iter);
assert_ne!(hash_pre_del, hash_pre_iter);
for (row_offset, expected_val) in page.iter_fixed_len(u64_row_size()).zip((2..last_val).step_by(2)) {
let found_val = read_u64(&page, row_offset);
assert_eq!(found_val, expected_val);
}
assert_eq!(page.header.unmodified_hash, Some(hash_pre_iter));
}
#[test]
fn reuse_fixed_len_space() {
let mut page = Page::new(u64_row_size());
let hash_pre_ins = hash_unmodified_save_get(&mut page);
let offset_0 = insert_u64(&mut page, 0xa5a5a5a5_a5a5a5a5);
assert_eq!(offset_0.idx(), 0);
let offset_1 = insert_u64(&mut page, 0xbeefbeef_beefbeef);
assert_eq!(offset_1, u64_row_size());
assert_eq!(page.header.fixed.last, u64_row_size() * 2);
let hash_pre_del = hash_unmodified_save_get(&mut page);
assert_ne!(hash_pre_ins, hash_pre_del);
unsafe { page.delete_row(offset_0, u64_row_size(), u64_var_len_visitor(), &mut NullBlobStore) };
assert_eq!(page.header.fixed.last, u64_row_size() * 2);
let hash_pre_ins2 = hash_unmodified_save_get(&mut page);
assert_ne!(hash_pre_ins, hash_pre_ins2);
assert_ne!(hash_pre_del, hash_pre_ins2);
let offset_0_again = insert_u64(&mut page, 0xffffffff_ffffffff);
assert_eq!(offset_0_again.idx(), 0);
assert_eq!(offset_0.idx(), offset_0_again.idx());
assert_eq!(page.header.fixed.last, u64_row_size() * 2);
let hash_post_ins2 = hash_unmodified_save_get(&mut page);
assert_ne!(hash_pre_ins, hash_post_ins2);
assert_ne!(hash_pre_del, hash_post_ins2);
assert_ne!(hash_pre_ins2, hash_post_ins2);
}
const STR_ROW_SIZE: Size = row_size_for_type::<VarLenRef>();
const _: () = assert!(STR_ROW_SIZE.len() == mem::size_of::<VarLenRef>());
const STR_VL_VISITOR: AlignedVarLenOffsets<'_> = AlignedVarLenOffsets::from_offsets(&[0]);
fn str_var_len_visitor() -> &'static AlignedVarLenOffsets<'static> {
&STR_VL_VISITOR
}
fn insert_str(page: &mut Page, data: &[u8]) -> PageOffset {
let fixed_len_data = [0u8; STR_ROW_SIZE.len()];
unsafe { page.insert_row(&fixed_len_data, &[data], str_var_len_visitor(), &mut NullBlobStore) }
.expect("Failed to insert row")
}
fn read_str_ref(page: &Page, offset: PageOffset) -> VarLenRef {
*unsafe { get_ref(&page.row_data, offset) }
}
#[test]
fn insert_empty_str() {
let mut page = Page::new(STR_ROW_SIZE);
let hash_pre_ins = hash_unmodified_save_get(&mut page);
let offset = insert_str(&mut page, &[]);
let extracted = read_str_ref(&page, offset);
let mut granules_iter = unsafe { page.iter_var_len_object(extracted.first_granule) };
assert!(granules_iter.next().is_none());
drop(granules_iter);
assert_ne!(hash_pre_ins, hash_unmodified_save_get(&mut page));
}
proptest! {
#[test]
fn insert_one_short_str(data in vec(any::<u8>(), 1..VarLenGranule::DATA_SIZE)) {
let mut page = Page::new(STR_ROW_SIZE);
let hash_pre_ins = hash_unmodified_save_get(&mut page);
let offset = insert_str(&mut page, &data);
let hash_pre_iter = hash_unmodified_save_get(&mut page);
assert_ne!(hash_pre_ins, hash_pre_iter);
let extracted = read_str_ref(&page, offset);
let mut data_iter = unsafe { page.iter_vlo_data(extracted.first_granule) };
let (first, next) = (data_iter.next(), data_iter.next());
assert_eq!(first, Some(&*data));
assert_eq!(next, None);
assert_eq!(hash_pre_iter, page.header.unmodified_hash.unwrap());
}
#[test]
fn insert_one_long_str(data in vec(any::<u8>(), (VarLenGranule::OBJECT_SIZE_BLOB_THRESHOLD / 2)..VarLenGranule::OBJECT_SIZE_BLOB_THRESHOLD)) {
let mut page = Page::new(STR_ROW_SIZE);
let hash_pre_ins = hash_unmodified_save_get(&mut page);
let offset = insert_str(&mut page, &data);
let hash_post_ins = hash_unmodified_save_get(&mut page);
assert_ne!(hash_pre_ins, hash_post_ins);
let extracted = read_str_ref(&page, offset);
let mut data_iter = unsafe { page.iter_vlo_data(extracted.first_granule) };
let mut chunks_iter = data.chunks(VarLenGranule::DATA_SIZE);
for (i, (data, chunk)) in (&mut data_iter).zip(&mut chunks_iter).enumerate() {
assert_eq!(
data,
chunk,
"Chunk {i} does not match. Left is found, right is expected.",
);
}
assert!(data_iter.next().is_none());
assert!(chunks_iter.next().is_none());
assert_eq!(hash_post_ins, page.header.unmodified_hash.unwrap());
}
}
#[test]
fn reuse_var_len_space_no_fragmentation_concerns() {
let data_0 = b"Hello, world!";
let data_1 = b"How goes life?";
let data_2 = b"Glad to hear it.";
let mut page = Page::new(STR_ROW_SIZE);
let offset_0 = insert_str(&mut page, data_0);
let offset_1 = insert_str(&mut page, data_1);
assert_eq!(page.header.var.first.idx(), data_sub_n_vlg(2));
assert_ne!(offset_0.idx(), offset_1.idx());
let var_len_0 = read_str_ref(&page, offset_0);
assert_eq!(var_len_0.length_in_bytes as usize, data_0.len());
assert_eq!(var_len_0.first_granule.idx(), data_sub_n_vlg(1));
let var_len_1 = read_str_ref(&page, offset_1);
assert_eq!(var_len_1.length_in_bytes as usize, data_1.len());
assert_eq!(var_len_1.first_granule.idx(), data_sub_n_vlg(2));
let hash_pre_del = hash_unmodified_save_get(&mut page);
unsafe { page.delete_row(offset_0, STR_ROW_SIZE, str_var_len_visitor(), &mut NullBlobStore) };
let hash_pre_ins = hash_unmodified_save_get(&mut page);
let offset_2 = insert_str(&mut page, data_2);
let hash_post_ins = hash_unmodified_save_get(&mut page);
assert_ne!(hash_pre_del, hash_pre_ins);
assert_ne!(hash_pre_del, hash_post_ins);
assert_ne!(hash_pre_ins, hash_post_ins);
assert_eq!(page.header.var.first.idx(), data_sub_n_vlg(2));
assert_eq!(offset_0.idx(), offset_2.idx());
let var_len_2 = read_str_ref(&page, offset_2);
assert_eq!(var_len_2.length_in_bytes as usize, data_2.len());
assert_eq!(var_len_2.first_granule.idx(), var_len_0.first_granule.idx());
}
#[test]
fn free_var_len_obj_multiple_granules() {
let mut page = Page::new(STR_ROW_SIZE);
let data_0 = [0xa5u8].repeat(VarLenGranule::DATA_SIZE * 4);
let offset_0 = insert_str(&mut page, &data_0);
let var_len_0 = read_str_ref(&page, offset_0);
let granules_0 = unsafe { page.iter_var_len_object(var_len_0.first_granule) }
.map(|granule| granule as *const VarLenGranule as usize)
.collect::<Vec<_>>();
assert_eq!(granules_0.len(), 4);
assert_eq!(page.header.var.first.idx(), data_sub_n_vlg(4));
unsafe { page.delete_row(offset_0, STR_ROW_SIZE, str_var_len_visitor(), &mut NullBlobStore) };
let data_1 = [0xffu8].repeat(VarLenGranule::DATA_SIZE * 4);
let offset_1 = insert_str(&mut page, &data_1);
let var_len_1 = read_str_ref(&page, offset_1);
let granules_1 = unsafe { page.iter_var_len_object(var_len_1.first_granule) }
.map(|granule| granule as *const VarLenGranule as usize)
.collect::<Vec<_>>();
assert_eq!(granules_1.len(), 4);
for granule in granules_1.iter().copied() {
assert!(granules_0.iter().copied().any(|other_granule| other_granule == granule));
}
assert_eq!(page.header.var.first.idx(), data_sub_n_vlg(4));
}
#[test]
fn reuse_var_len_space_avoid_fragmentation() {
let data_0 = &[0xa5u8];
let data_1 = &[0xffu8];
let data_2 = [0x11u8].repeat(VarLenGranule::DATA_SIZE + 1);
let data_2 = data_2.as_ref();
let mut page = Page::new(STR_ROW_SIZE);
let hash_pre_ins = hash_unmodified_save_get(&mut page);
let offset_0 = insert_str(&mut page, data_0);
let _offset_1 = insert_str(&mut page, data_1);
assert_eq!(page.header.var.first.idx(), data_sub_n_vlg(2));
let hash_pre_del = hash_unmodified_save_get(&mut page);
assert_ne!(hash_pre_ins, hash_pre_del);
unsafe { page.delete_row(offset_0, STR_ROW_SIZE, str_var_len_visitor(), &mut NullBlobStore) };
let hash_post_del = hash_unmodified_save_get(&mut page);
assert_ne!(hash_pre_ins, hash_post_del);
assert_ne!(hash_pre_del, hash_post_del);
let offset_2 = insert_str(&mut page, data_2);
assert_eq!(page.header.var.first.idx(), data_sub_n_vlg(3));
let hash_post_ins2 = hash_unmodified_save_get(&mut page);
assert_ne!(hash_pre_ins, hash_post_ins2);
assert_ne!(hash_pre_del, hash_post_ins2);
assert_ne!(hash_post_del, hash_post_ins2);
let var_len_2 = read_str_ref(&page, offset_2);
let mut data_iter = unsafe { page.iter_vlo_data(var_len_2.first_granule) };
let mut chunks_iter = data_2.chunks(VarLenGranule::DATA_SIZE);
for (i, (data, chunk)) in (&mut data_iter).zip(&mut chunks_iter).enumerate() {
assert_eq!(
data, chunk,
"Chunk {i} does not match. Left is found, right is expected.",
);
}
assert!(data_iter.next().is_none());
assert!(chunks_iter.next().is_none());
}
fn check_u64_in_str(page: &Page, row_idx: PageOffset, expected_val: u64) {
let vlr = read_str_ref(page, row_idx);
let mut var_len_iter = unsafe { page.iter_vlo_data(vlr.first_granule) };
let data = var_len_iter.next().unwrap();
assert!(var_len_iter.next().is_none());
assert_eq!(data.len(), mem::size_of::<u64>());
let val = u64::from_le_bytes(data.try_into().unwrap());
assert_eq!(val, expected_val);
}
#[test]
fn fill_then_iter_var_len_str() {
let mut page = Page::new(STR_ROW_SIZE);
let hash_pre_ins = hash_unmodified_save_get(&mut page);
let last_val = insert_while(&mut page, 0, STR_ROW_SIZE, 1, |page, val| {
insert_str(page, &val.to_le_bytes());
});
let hash_pre_iter = hash_unmodified_save_get(&mut page);
assert_ne!(hash_pre_ins, hash_pre_iter);
let size_per_row = STR_ROW_SIZE + VarLenGranule::SIZE;
assert_eq!(last_val, (PageOffset::PAGE_END / size_per_row) as u64);
for (row_idx, expected_val) in page.iter_fixed_len(STR_ROW_SIZE).zip(0..last_val) {
check_u64_in_str(&page, row_idx, expected_val);
}
assert_eq!(hash_pre_iter, page.header.unmodified_hash.unwrap());
}
#[test]
fn fill_delete_iter_var_len_str() {
let mut page = Page::new(STR_ROW_SIZE);
let hash_pre_ins = hash_unmodified_save_get(&mut page);
let mut odds = Vec::new();
let last_val = insert_while(&mut page, 0, STR_ROW_SIZE, 1, |page, val| {
let offset = insert_str(page, &val.to_le_bytes());
if val % 2 == 1 {
odds.push(offset);
}
});
let size_per_row = STR_ROW_SIZE + VarLenGranule::SIZE;
let num_rows_inserted = (PageOffset::PAGE_END / size_per_row) as u64;
assert_eq!(last_val, num_rows_inserted);
let hash_pre_del = hash_unmodified_save_get(&mut page);
assert_ne!(hash_pre_ins, hash_pre_del);
for row_offset in odds {
unsafe { page.delete_row(row_offset, STR_ROW_SIZE, str_var_len_visitor(), &mut NullBlobStore) };
}
let hash_pre_iter = hash_unmodified_save_get(&mut page);
assert_ne!(hash_pre_ins, hash_pre_iter);
assert_ne!(hash_pre_del, hash_pre_iter);
let num_rows_retained = num_rows_inserted.div_ceil(2);
let num_rows_removed = num_rows_inserted / 2;
assert_eq!(page.header.fixed.num_rows as u64, num_rows_retained);
assert_eq!(page.header.var.freelist_len as u64, num_rows_removed);
for (row_idx, expected_val) in page.iter_fixed_len(STR_ROW_SIZE).zip((0..last_val).step_by(2)) {
check_u64_in_str(&page, row_idx, expected_val);
}
assert_eq!(hash_pre_iter, page.header.unmodified_hash.unwrap());
}
#[test]
fn serde_round_trip_whole_page() {
let pool = PagePool::new_for_test();
let mut page = Page::new(u64_row_size());
let hash_pre_ins = hash_unmodified_save_get(&mut page);
let ser_pre_ins = bsatn::to_vec(&page).unwrap();
let de_pre_ins = pool.take_deserialize_from(&ser_pre_ins).unwrap();
assert_eq!(de_pre_ins.content_hash(), hash_pre_ins);
assert_eq!(de_pre_ins.header.fixed.num_rows, 0);
assert!(de_pre_ins.header.fixed.present_rows == page.header.fixed.present_rows);
let offsets = (0..64)
.map(|val| insert_u64(&mut page, val))
.collect::<Vec<PageOffset>>();
let hash_ins = hash_unmodified_save_get(&mut page);
let ser_ins = bsatn::to_vec(&page).unwrap();
let de_ins = pool.take_deserialize_from(&ser_ins).unwrap();
assert_eq!(de_ins.content_hash(), hash_ins);
assert_eq!(de_ins.header.fixed.num_rows, 64);
assert!(de_ins.header.fixed.present_rows == page.header.fixed.present_rows);
assert_eq!(
de_ins.iter_fixed_len(u64_row_size()).collect::<Vec<PageOffset>>(),
offsets
);
let offsets = offsets
.into_iter()
.enumerate()
.filter_map(|(i, offset)| {
if i % 2 == 0 {
unsafe { page.delete_row(offset, u64_row_size(), u64_var_len_visitor(), &mut NullBlobStore) };
None
} else {
Some(offset)
}
})
.collect::<Vec<PageOffset>>();
let hash_del = hash_unmodified_save_get(&mut page);
let ser_del = bsatn::to_vec(&page).unwrap();
let de_del = pool.take_deserialize_from(&ser_del).unwrap();
assert_eq!(de_del.content_hash(), hash_del);
assert_eq!(de_del.header.fixed.num_rows, 32);
assert!(de_del.header.fixed.present_rows == page.header.fixed.present_rows);
assert_eq!(
de_del.iter_fixed_len(u64_row_size()).collect::<Vec<PageOffset>>(),
offsets
);
}
}