use super::{
bflatn_from::read_tag,
indexes::{Bytes, PageOffset},
page::Page,
row_hash::read_from_bytes,
static_layout::StaticLayout,
util::range_move,
var_len::VarLenRef,
};
use spacetimedb_sats::layout::{align_to, AlgebraicTypeLayout, HasLayout, ProductTypeLayoutView, RowTypeLayout};
pub unsafe fn eq_row_in_page(
page_a: &Page,
page_b: &Page,
fixed_offset_a: PageOffset,
fixed_offset_b: PageOffset,
ty: &RowTypeLayout,
static_layout: Option<&StaticLayout>,
) -> bool {
let a = BytesPage::new(page_a, fixed_offset_a, ty);
let b = BytesPage::new(page_b, fixed_offset_b, ty);
match static_layout {
None => {
let mut ctx = EqCtx { a, b, curr_offset: 0 };
unsafe { eq_product(&mut ctx, ty.product()) }
}
Some(static_bsatn_layout) => {
unsafe { static_bsatn_layout.eq(a.bytes, b.bytes) }
}
}
}
#[derive(Clone, Copy)]
pub(crate) struct BytesPage<'page> {
pub(crate) bytes: &'page Bytes,
pub(crate) page: &'page Page,
}
impl<'page> BytesPage<'page> {
pub(crate) fn new(page: &'page Page, offset: PageOffset, ty: &RowTypeLayout) -> Self {
let bytes = page.get_row_data(offset, ty.size());
Self { page, bytes }
}
}
#[derive(Clone, Copy)]
struct EqCtx<'page_a, 'page_b> {
a: BytesPage<'page_a>,
b: BytesPage<'page_b>,
curr_offset: usize,
}
unsafe fn eq_product(ctx: &mut EqCtx<'_, '_>, ty: ProductTypeLayoutView<'_>) -> bool {
let base_offset = ctx.curr_offset;
ty.elements.iter().all(|elem_ty| {
ctx.curr_offset = base_offset + elem_ty.offset as usize;
unsafe { eq_value(ctx, &elem_ty.ty) }
})
}
unsafe fn eq_value(ctx: &mut EqCtx<'_, '_>, ty: &AlgebraicTypeLayout) -> bool {
debug_assert_eq!(
ctx.curr_offset,
align_to(ctx.curr_offset, ty.align()),
"curr_offset {} insufficiently aligned for type {:?}",
ctx.curr_offset,
ty
);
match ty {
AlgebraicTypeLayout::Sum(ty) => {
let (tag_a, data_ty) = read_tag(ctx.a.bytes, ty, ctx.curr_offset);
let (tag_b, _) = read_tag(ctx.b.bytes, ty, ctx.curr_offset);
if tag_a != tag_b {
return false;
}
let curr_offset = ctx.curr_offset + ty.offset_of_variant_data(tag_a);
ctx.curr_offset += ty.size();
let mut ctx = EqCtx { curr_offset, ..*ctx };
unsafe { eq_value(&mut ctx, data_ty) }
}
AlgebraicTypeLayout::Product(ty) => {
unsafe { eq_product(ctx, ty.view()) }
}
&AlgebraicTypeLayout::Bool
| &AlgebraicTypeLayout::I8
| &AlgebraicTypeLayout::U8
| &AlgebraicTypeLayout::I16
| &AlgebraicTypeLayout::U16
| &AlgebraicTypeLayout::I32
| &AlgebraicTypeLayout::U32
| &AlgebraicTypeLayout::I64
| &AlgebraicTypeLayout::U64
| &AlgebraicTypeLayout::I128
| &AlgebraicTypeLayout::U128
| &AlgebraicTypeLayout::I256
| &AlgebraicTypeLayout::U256
| &AlgebraicTypeLayout::F32
| &AlgebraicTypeLayout::F64 => eq_byte_array(ctx, ty.size()),
&AlgebraicTypeLayout::String | AlgebraicTypeLayout::VarLen(_) => {
unsafe { eq_vlo(ctx) }
}
}
}
unsafe fn eq_vlo(ctx: &mut EqCtx<'_, '_>) -> bool {
let vlr_a = unsafe { read_from_bytes::<VarLenRef>(ctx.a.bytes, &mut { ctx.curr_offset }) };
let vlr_b = unsafe { read_from_bytes::<VarLenRef>(ctx.b.bytes, &mut ctx.curr_offset) };
if vlr_a.length_in_bytes != vlr_b.length_in_bytes {
return false;
}
let var_iter_a = unsafe { ctx.a.page.iter_vlo_data(vlr_a.first_granule) };
let var_iter_b = unsafe { ctx.b.page.iter_vlo_data(vlr_b.first_granule) };
var_iter_a.zip(var_iter_b).all(|(da, db)| da == db)
}
fn eq_byte_array(ctx: &mut EqCtx<'_, '_>, len: usize) -> bool {
let data_a = &ctx.a.bytes[range_move(0..len, ctx.curr_offset)];
let data_b = &ctx.b.bytes[range_move(0..len, ctx.curr_offset)];
ctx.curr_offset += len;
data_a == data_b
}
#[cfg(test)]
mod test {
use crate::{blob_store::NullBlobStore, page_pool::PagePool};
use spacetimedb_sats::{product, AlgebraicType, AlgebraicValue, ProductType};
#[test]
fn sum_with_variant_with_distinct_layout() {
let ty = ProductType::from([AlgebraicType::sum([
AlgebraicType::U64, AlgebraicType::product([AlgebraicType::U8, AlgebraicType::U32]), ])]);
let pool = PagePool::new_for_test();
let bs = &mut NullBlobStore;
let mut table_a = crate::table::test::table(ty.clone());
let mut table_b = crate::table::test::table(ty);
let a0 = product![AlgebraicValue::sum(0, u64::MAX.into())];
let (_, a0_rr) = table_a.insert(&pool, bs, &a0).unwrap();
let a0_ptr = a0_rr.pointer();
assert!(table_a.delete(bs, a0_ptr, |_| {}).is_some());
let b0 = 0b01010101_01010101_01010101_01010101_01010101_01010101_01010101_01010101u64;
let b0 = product![AlgebraicValue::sum(0, b0.into())];
let (_, b0_rr) = table_b.insert(&pool, bs, &b0).unwrap();
let b0_ptr = b0_rr.pointer();
assert!(table_b.delete(bs, b0_ptr, |_| {}).is_some());
let v1 = product![AlgebraicValue::sum(1, product![0u8, 0u32].into())];
let (_, a1_rr) = table_a.insert(&pool, bs, &v1).unwrap();
let bs = &mut NullBlobStore;
let (_, b1_rr) = table_b.insert(&pool, bs, &v1).unwrap();
assert_eq!(a0_ptr, a1_rr.pointer());
assert_eq!(b0_ptr, b1_rr.pointer());
assert_eq!(a1_rr, b1_rr);
}
}