use std::sync::atomic::{AtomicU64, Ordering};
use fsqlite_types::{CommitSeq, PageSize, SchemaEpoch, TxnId};
use xxhash_rust::xxh3::xxh3_64;
use crate::lifecycle::MvccError;
mod offsets {
pub const MAGIC: usize = 0;
pub const MAGIC_LEN: usize = 8;
pub const VERSION: usize = 8;
pub const PAGE_SIZE: usize = 12;
pub const MAX_TXN_SLOTS: usize = 16;
pub const ALIGN0: usize = 20;
pub const NEXT_TXN_ID: usize = 24;
pub const SNAPSHOT_SEQ: usize = 32;
pub const COMMIT_SEQ: usize = 40;
pub const SCHEMA_EPOCH: usize = 48;
pub const ECS_EPOCH: usize = 56;
pub const GC_HORIZON: usize = 64;
pub const SERIALIZED_WRITER_TXN_ID: usize = 72;
pub const SERIALIZED_WRITER_PID_AND_GEN: usize = 80;
pub const SERIALIZED_WRITER_PID_BIRTH: usize = 88;
pub const SERIALIZED_WRITER_LEASE_EXPIRY: usize = 96;
pub const LOCK_TABLE_OFFSET: usize = 104;
pub const WITNESS_OFFSET: usize = 112;
pub const TXN_SLOT_OFFSET: usize = 120;
pub const COMMITTED_READERS_OFFSET: usize = 128;
pub const COMMITTED_READERS_BYTES: usize = 136;
pub const LAYOUT_CHECKSUM: usize = 144;
pub const _PADDING: usize = 152;
pub const _PADDING_LEN: usize = 64;
pub const HEADER_SIZE: usize = 216;
}
const MAGIC: [u8; 8] = *b"FSQLSHM\0";
const LAYOUT_VERSION: u32 = 1;
const DEFAULT_MAX_TXN_SLOTS: u32 = 128;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ShmSnapshot {
pub commit_seq: CommitSeq,
pub schema_epoch: SchemaEpoch,
pub ecs_epoch: u64,
}
pub struct SharedMemoryLayout {
page_size: PageSize,
max_txn_slots: u32,
lock_table_offset: u64,
witness_offset: u64,
txn_slot_offset: u64,
committed_readers_offset: u64,
committed_readers_bytes: u64,
layout_checksum: u64,
next_txn_id: AtomicU64,
snapshot_seq: AtomicU64,
commit_seq: AtomicU64,
schema_epoch: AtomicU64,
ecs_epoch: AtomicU64,
gc_horizon: AtomicU64,
serialized_writer_txn_id: AtomicU64,
serialized_writer_pid_and_gen: AtomicU64,
serialized_writer_pid_birth: AtomicU64,
serialized_writer_lease_expiry: AtomicU64,
}
impl SharedMemoryLayout {
pub const HEADER_SIZE: usize = offsets::HEADER_SIZE;
#[must_use]
pub fn new(page_size: PageSize, max_txn_slots: u32) -> Self {
let max_txn_slots = if max_txn_slots == 0 {
DEFAULT_MAX_TXN_SLOTS
} else {
max_txn_slots
};
let lock_table_offset = Self::HEADER_SIZE as u64;
let lock_table_size = u64::from(max_txn_slots) * 64;
let witness_offset = lock_table_offset + lock_table_size;
let witness_size = u64::from(max_txn_slots) * 128;
let txn_slot_offset = witness_offset + witness_size;
let txn_slot_size = u64::from(max_txn_slots) * 128;
let committed_readers_offset = txn_slot_offset + txn_slot_size;
let committed_readers_bytes = u64::from(max_txn_slots.div_ceil(8));
let checksum = Self::compute_checksum_from_parts(
page_size,
max_txn_slots,
lock_table_offset,
witness_offset,
txn_slot_offset,
committed_readers_offset,
committed_readers_bytes,
);
Self {
page_size,
max_txn_slots,
lock_table_offset,
witness_offset,
txn_slot_offset,
committed_readers_offset,
committed_readers_bytes,
layout_checksum: checksum,
next_txn_id: AtomicU64::new(1),
snapshot_seq: AtomicU64::new(0),
commit_seq: AtomicU64::new(0),
schema_epoch: AtomicU64::new(0),
ecs_epoch: AtomicU64::new(0),
gc_horizon: AtomicU64::new(0),
serialized_writer_txn_id: AtomicU64::new(0),
serialized_writer_pid_and_gen: AtomicU64::new(0),
serialized_writer_pid_birth: AtomicU64::new(0),
serialized_writer_lease_expiry: AtomicU64::new(0),
}
}
pub fn open(buf: &[u8]) -> Result<Self, MvccError> {
if buf.len() < Self::HEADER_SIZE {
return Err(MvccError::ShmTooSmall);
}
if buf[offsets::MAGIC..offsets::MAGIC + offsets::MAGIC_LEN] != MAGIC {
return Err(MvccError::ShmBadMagic);
}
let version = read_u32(buf, offsets::VERSION);
if version != LAYOUT_VERSION {
return Err(MvccError::ShmVersionMismatch);
}
let page_size_raw = read_u32(buf, offsets::PAGE_SIZE);
let page_size = PageSize::new(page_size_raw).ok_or(MvccError::ShmInvalidPageSize)?;
let max_txn_slots = read_u32(buf, offsets::MAX_TXN_SLOTS);
let lock_table_offset = read_u64(buf, offsets::LOCK_TABLE_OFFSET);
let witness_offset = read_u64(buf, offsets::WITNESS_OFFSET);
let txn_slot_offset = read_u64(buf, offsets::TXN_SLOT_OFFSET);
let committed_readers_offset = read_u64(buf, offsets::COMMITTED_READERS_OFFSET);
let committed_readers_bytes = read_u64(buf, offsets::COMMITTED_READERS_BYTES);
let stored_checksum = read_u64(buf, offsets::LAYOUT_CHECKSUM);
let computed = Self::compute_checksum_from_parts(
page_size,
max_txn_slots,
lock_table_offset,
witness_offset,
txn_slot_offset,
committed_readers_offset,
committed_readers_bytes,
);
if stored_checksum != computed {
return Err(MvccError::ShmChecksumMismatch);
}
let next_txn_id = read_u64(buf, offsets::NEXT_TXN_ID);
let snapshot_seq = read_u64(buf, offsets::SNAPSHOT_SEQ);
let commit_seq = read_u64(buf, offsets::COMMIT_SEQ);
let schema_epoch = read_u64(buf, offsets::SCHEMA_EPOCH);
let ecs_epoch = read_u64(buf, offsets::ECS_EPOCH);
let gc_horizon = read_u64(buf, offsets::GC_HORIZON);
let sw_txn_id = read_u64(buf, offsets::SERIALIZED_WRITER_TXN_ID);
let sw_pid_and_gen = read_u64(buf, offsets::SERIALIZED_WRITER_PID_AND_GEN);
let sw_pid_birth = read_u64(buf, offsets::SERIALIZED_WRITER_PID_BIRTH);
let sw_lease = read_u64(buf, offsets::SERIALIZED_WRITER_LEASE_EXPIRY);
Ok(Self {
page_size,
max_txn_slots,
lock_table_offset,
witness_offset,
txn_slot_offset,
committed_readers_offset,
committed_readers_bytes,
layout_checksum: stored_checksum,
next_txn_id: AtomicU64::new(next_txn_id),
snapshot_seq: AtomicU64::new(snapshot_seq),
commit_seq: AtomicU64::new(commit_seq),
schema_epoch: AtomicU64::new(schema_epoch),
ecs_epoch: AtomicU64::new(ecs_epoch),
gc_horizon: AtomicU64::new(gc_horizon),
serialized_writer_txn_id: AtomicU64::new(sw_txn_id),
serialized_writer_pid_and_gen: AtomicU64::new(sw_pid_and_gen),
serialized_writer_pid_birth: AtomicU64::new(sw_pid_birth),
serialized_writer_lease_expiry: AtomicU64::new(sw_lease),
})
}
#[must_use]
pub fn to_bytes(&self) -> Vec<u8> {
let mut buf = vec![0u8; Self::HEADER_SIZE];
buf[offsets::MAGIC..offsets::MAGIC + offsets::MAGIC_LEN].copy_from_slice(&MAGIC);
write_u32(&mut buf, offsets::VERSION, LAYOUT_VERSION);
write_u32(&mut buf, offsets::PAGE_SIZE, self.page_size.get());
write_u32(&mut buf, offsets::MAX_TXN_SLOTS, self.max_txn_slots);
write_u32(&mut buf, offsets::ALIGN0, 0);
write_u64(
&mut buf,
offsets::NEXT_TXN_ID,
self.next_txn_id.load(Ordering::Acquire),
);
write_u64(
&mut buf,
offsets::SNAPSHOT_SEQ,
self.snapshot_seq.load(Ordering::Acquire),
);
write_u64(
&mut buf,
offsets::COMMIT_SEQ,
self.commit_seq.load(Ordering::Acquire),
);
write_u64(
&mut buf,
offsets::SCHEMA_EPOCH,
self.schema_epoch.load(Ordering::Acquire),
);
write_u64(
&mut buf,
offsets::ECS_EPOCH,
self.ecs_epoch.load(Ordering::Acquire),
);
write_u64(
&mut buf,
offsets::GC_HORIZON,
self.gc_horizon.load(Ordering::Acquire),
);
write_u64(
&mut buf,
offsets::SERIALIZED_WRITER_TXN_ID,
self.serialized_writer_txn_id.load(Ordering::Acquire),
);
write_u64(
&mut buf,
offsets::SERIALIZED_WRITER_PID_AND_GEN,
self.serialized_writer_pid_and_gen.load(Ordering::Acquire),
);
write_u64(
&mut buf,
offsets::SERIALIZED_WRITER_PID_BIRTH,
self.serialized_writer_pid_birth.load(Ordering::Acquire),
);
write_u64(
&mut buf,
offsets::SERIALIZED_WRITER_LEASE_EXPIRY,
self.serialized_writer_lease_expiry.load(Ordering::Acquire),
);
write_u64(&mut buf, offsets::LOCK_TABLE_OFFSET, self.lock_table_offset);
write_u64(&mut buf, offsets::WITNESS_OFFSET, self.witness_offset);
write_u64(&mut buf, offsets::TXN_SLOT_OFFSET, self.txn_slot_offset);
write_u64(
&mut buf,
offsets::COMMITTED_READERS_OFFSET,
self.committed_readers_offset,
);
write_u64(
&mut buf,
offsets::COMMITTED_READERS_BYTES,
self.committed_readers_bytes,
);
write_u64(&mut buf, offsets::LAYOUT_CHECKSUM, self.layout_checksum);
buf
}
pub fn begin_snapshot_publish(&self) {
loop {
let seq = self.snapshot_seq.load(Ordering::Acquire);
if seq % 2 == 1 {
return;
}
if self
.snapshot_seq
.compare_exchange_weak(seq, seq + 1, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
return;
}
}
}
pub fn end_snapshot_publish(&self) {
self.snapshot_seq.fetch_add(1, Ordering::Release);
}
#[must_use]
pub fn load_consistent_snapshot(&self) -> ShmSnapshot {
loop {
let seq1 = self.snapshot_seq.load(Ordering::Acquire);
if seq1 % 2 == 1 {
std::hint::spin_loop();
continue;
}
let cs = self.commit_seq.load(Ordering::Acquire);
let se = self.schema_epoch.load(Ordering::Acquire);
let ee = self.ecs_epoch.load(Ordering::Acquire);
let seq2 = self.snapshot_seq.load(Ordering::Acquire);
if seq1 == seq2 {
return ShmSnapshot {
commit_seq: CommitSeq::new(cs),
schema_epoch: SchemaEpoch::new(se),
ecs_epoch: ee,
};
}
std::hint::spin_loop();
}
}
pub fn publish_snapshot(
&self,
commit_seq: CommitSeq,
schema_epoch: SchemaEpoch,
ecs_epoch: u64,
) {
self.begin_snapshot_publish();
self.schema_epoch
.store(schema_epoch.get(), Ordering::Release);
self.ecs_epoch.store(ecs_epoch, Ordering::Release);
self.commit_seq.store(commit_seq.get(), Ordering::Release);
self.end_snapshot_publish();
}
pub fn reconcile(
&self,
durable_commit_seq: CommitSeq,
durable_schema_epoch: SchemaEpoch,
durable_ecs_epoch: u64,
) {
self.begin_snapshot_publish();
self.schema_epoch
.store(durable_schema_epoch.get(), Ordering::Release);
self.ecs_epoch.store(durable_ecs_epoch, Ordering::Release);
self.commit_seq
.store(durable_commit_seq.get(), Ordering::Release);
self.end_snapshot_publish();
}
pub fn acquire_serialized_writer(
&self,
writer_txn_id_raw: u64,
pid: u32,
pid_birth: u64,
lease_expiry_epoch_secs: u64,
) -> bool {
assert_ne!(
writer_txn_id_raw, 0,
"serialized writer txn id must be non-zero"
);
if self
.serialized_writer_txn_id
.compare_exchange(0, writer_txn_id_raw, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
return false;
}
let old_packed = self.serialized_writer_pid_and_gen.load(Ordering::Acquire);
let old_gen = (old_packed >> 32) as u32;
let new_gen = old_gen.wrapping_add(1);
let new_packed = u64::from(pid) | (u64::from(new_gen) << 32);
self.serialized_writer_pid_and_gen
.store(new_packed, Ordering::Release);
self.serialized_writer_pid_birth
.store(pid_birth, Ordering::Release);
self.serialized_writer_lease_expiry
.store(lease_expiry_epoch_secs, Ordering::Release);
true
}
pub fn release_serialized_writer(&self, writer_txn_id_raw: u64) -> bool {
let my_packed = self.serialized_writer_pid_and_gen.load(Ordering::Acquire);
let my_birth = self.serialized_writer_pid_birth.load(Ordering::Acquire);
let my_lease = self.serialized_writer_lease_expiry.load(Ordering::Acquire);
if self
.serialized_writer_txn_id
.compare_exchange(writer_txn_id_raw, 0, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
return false;
}
let _ = self.serialized_writer_pid_and_gen.compare_exchange(
my_packed,
0,
Ordering::AcqRel,
Ordering::Relaxed,
);
let _ = self.serialized_writer_pid_birth.compare_exchange(
my_birth,
0,
Ordering::AcqRel,
Ordering::Relaxed,
);
let _ = self.serialized_writer_lease_expiry.compare_exchange(
my_lease,
0,
Ordering::AcqRel,
Ordering::Relaxed,
);
true
}
#[must_use]
pub fn check_serialized_writer(&self) -> Option<TxnId> {
let writer_txn_id_raw = self.serialized_writer_txn_id.load(Ordering::Acquire);
TxnId::new(writer_txn_id_raw)
}
pub fn check_serialized_writer_exclusion(
&self,
now_epoch_secs: u64,
process_alive: impl Fn(u32, u64) -> bool,
) -> Result<(), MvccError> {
let mut noop = |_writer_txn_id_raw: u64| {};
self.check_serialized_writer_exclusion_with_hook(now_epoch_secs, process_alive, &mut noop)
}
fn check_serialized_writer_exclusion_with_hook<F>(
&self,
now_epoch_secs: u64,
process_alive: impl Fn(u32, u64) -> bool,
on_stale_before_cas: &mut F,
) -> Result<(), MvccError>
where
F: FnMut(u64),
{
loop {
let writer_txn_id_raw = self.serialized_writer_txn_id.load(Ordering::Acquire);
if writer_txn_id_raw == 0 {
return Ok(());
}
let packed_pid = self.serialized_writer_pid_and_gen.load(Ordering::Acquire);
let pid = packed_pid as u32;
let pid_birth = self.serialized_writer_pid_birth.load(Ordering::Acquire);
let lease_expiry = self.serialized_writer_lease_expiry.load(Ordering::Acquire);
let lease_set = lease_expiry != 0;
let lease_expired = lease_set && now_epoch_secs >= lease_expiry;
let process_dead = pid != 0 && pid_birth != 0 && !process_alive(pid, pid_birth);
let writer_live = if lease_set {
!lease_expired
} else {
!process_dead
};
if writer_live {
tracing::warn!(
writer_txn_id_raw,
pid,
pid_birth,
lease_expiry,
"serialized writer active: concurrent writer excluded"
);
return Err(MvccError::Busy);
}
let mut is_torn = false;
for _ in 0..10 {
std::hint::spin_loop();
if self.serialized_writer_txn_id.load(Ordering::Acquire) != writer_txn_id_raw
|| self.serialized_writer_pid_and_gen.load(Ordering::Acquire) != packed_pid
{
is_torn = true;
break;
}
}
if is_torn {
continue; }
tracing::debug!(
writer_txn_id_raw,
pid,
pid_birth,
lease_expiry,
lease_expired,
process_dead,
"serialized writer indicator appears stale; attempting CAS clear"
);
on_stale_before_cas(writer_txn_id_raw);
if self
.serialized_writer_txn_id
.compare_exchange(writer_txn_id_raw, 0, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
let _ = self.serialized_writer_pid_and_gen.compare_exchange(
packed_pid,
0,
Ordering::AcqRel,
Ordering::Relaxed,
);
let _ = self.serialized_writer_pid_birth.compare_exchange(
pid_birth,
0,
Ordering::AcqRel,
Ordering::Relaxed,
);
let _ = self.serialized_writer_lease_expiry.compare_exchange(
lease_expiry,
0,
Ordering::AcqRel,
Ordering::Relaxed,
);
tracing::info!(
writer_txn_id_raw,
"cleared stale serialized writer indicator via CAS"
);
return Ok(());
}
}
}
#[must_use]
pub fn load_commit_seq(&self) -> CommitSeq {
CommitSeq::new(self.commit_seq.load(Ordering::Acquire))
}
#[must_use]
pub fn load_gc_horizon(&self) -> CommitSeq {
CommitSeq::new(self.gc_horizon.load(Ordering::Acquire))
}
pub fn store_gc_horizon(&self, horizon: CommitSeq) {
self.gc_horizon.store(horizon.get(), Ordering::Release);
}
pub fn alloc_txn_id(&self) -> Option<TxnId> {
loop {
let current = self.next_txn_id.load(Ordering::Acquire);
if current > TxnId::MAX_RAW {
return None;
}
let next = current.checked_add(1)?;
if self
.next_txn_id
.compare_exchange_weak(current, next, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
return TxnId::new(current);
}
}
}
#[must_use]
pub fn load_schema_epoch(&self) -> SchemaEpoch {
SchemaEpoch::new(self.schema_epoch.load(Ordering::Acquire))
}
#[must_use]
pub fn load_ecs_epoch(&self) -> u64 {
self.ecs_epoch.load(Ordering::Acquire)
}
#[must_use]
pub fn page_size(&self) -> PageSize {
self.page_size
}
#[must_use]
pub fn max_txn_slots(&self) -> u32 {
self.max_txn_slots
}
#[must_use]
pub fn lock_table_offset(&self) -> u64 {
self.lock_table_offset
}
#[must_use]
pub fn witness_offset(&self) -> u64 {
self.witness_offset
}
#[must_use]
pub fn txn_slot_offset(&self) -> u64 {
self.txn_slot_offset
}
#[must_use]
pub fn committed_readers_offset(&self) -> u64 {
self.committed_readers_offset
}
#[must_use]
pub fn committed_readers_bytes(&self) -> u64 {
self.committed_readers_bytes
}
#[must_use]
pub fn layout_checksum(&self) -> u64 {
self.layout_checksum
}
fn compute_checksum_from_parts(
page_size: PageSize,
max_txn_slots: u32,
lock_table_offset: u64,
witness_offset: u64,
txn_slot_offset: u64,
committed_readers_offset: u64,
committed_readers_bytes: u64,
) -> u64 {
let mut data = Vec::with_capacity(64);
data.extend_from_slice(&MAGIC);
data.extend_from_slice(&LAYOUT_VERSION.to_le_bytes());
data.extend_from_slice(&page_size.get().to_le_bytes());
data.extend_from_slice(&max_txn_slots.to_le_bytes());
data.extend_from_slice(&lock_table_offset.to_le_bytes());
data.extend_from_slice(&witness_offset.to_le_bytes());
data.extend_from_slice(&txn_slot_offset.to_le_bytes());
data.extend_from_slice(&committed_readers_offset.to_le_bytes());
data.extend_from_slice(&committed_readers_bytes.to_le_bytes());
xxh3_64(&data)
}
}
impl std::fmt::Debug for SharedMemoryLayout {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SharedMemoryLayout")
.field("page_size", &self.page_size)
.field("max_txn_slots", &self.max_txn_slots)
.field("commit_seq", &self.commit_seq.load(Ordering::Relaxed))
.field("schema_epoch", &self.schema_epoch.load(Ordering::Relaxed))
.field("gc_horizon", &self.gc_horizon.load(Ordering::Relaxed))
.field("layout_checksum", &self.layout_checksum)
.finish_non_exhaustive()
}
}
fn read_u32(buf: &[u8], offset: usize) -> u32 {
let bytes: [u8; 4] = buf[offset..offset + 4]
.try_into()
.expect("slice length mismatch");
u32::from_le_bytes(bytes)
}
fn read_u64(buf: &[u8], offset: usize) -> u64 {
let bytes: [u8; 8] = buf[offset..offset + 8]
.try_into()
.expect("slice length mismatch");
u64::from_le_bytes(bytes)
}
fn write_u32(buf: &mut [u8], offset: usize, val: u32) {
buf[offset..offset + 4].copy_from_slice(&val.to_le_bytes());
}
fn write_u64(buf: &mut [u8], offset: usize, val: u64) {
buf[offset..offset + 8].copy_from_slice(&val.to_le_bytes());
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::thread;
#[test]
fn test_default_layout() {
let layout = SharedMemoryLayout::new(PageSize::DEFAULT, 128);
assert_eq!(layout.page_size(), PageSize::DEFAULT);
assert_eq!(layout.max_txn_slots(), 128);
assert!(layout.lock_table_offset() >= SharedMemoryLayout::HEADER_SIZE as u64);
assert!(layout.witness_offset() > layout.lock_table_offset());
assert!(layout.txn_slot_offset() > layout.witness_offset());
assert!(layout.committed_readers_offset() > layout.txn_slot_offset());
}
#[test]
fn test_header_size_is_216() {
assert_eq!(SharedMemoryLayout::HEADER_SIZE, 216);
let layout = SharedMemoryLayout::new(PageSize::DEFAULT, 64);
let bytes = layout.to_bytes();
assert_eq!(bytes.len(), 216);
}
#[test]
fn test_roundtrip_serialization() {
let layout = SharedMemoryLayout::new(PageSize::DEFAULT, 256);
layout.publish_snapshot(CommitSeq::new(42), SchemaEpoch::new(7), 99);
layout.store_gc_horizon(CommitSeq::new(10));
let bytes = layout.to_bytes();
let restored = SharedMemoryLayout::open(&bytes).unwrap();
assert_eq!(restored.page_size(), layout.page_size());
assert_eq!(restored.max_txn_slots(), layout.max_txn_slots());
assert_eq!(restored.lock_table_offset(), layout.lock_table_offset());
assert_eq!(restored.witness_offset(), layout.witness_offset());
assert_eq!(restored.txn_slot_offset(), layout.txn_slot_offset());
assert_eq!(
restored.committed_readers_offset(),
layout.committed_readers_offset()
);
assert_eq!(
restored.committed_readers_bytes(),
layout.committed_readers_bytes()
);
assert_eq!(restored.layout_checksum(), layout.layout_checksum());
assert_eq!(
restored.load_commit_seq().get(),
layout.load_commit_seq().get()
);
assert_eq!(
restored.load_schema_epoch().get(),
layout.load_schema_epoch().get()
);
assert_eq!(restored.load_ecs_epoch(), layout.load_ecs_epoch());
assert_eq!(
restored.load_gc_horizon().get(),
layout.load_gc_horizon().get()
);
}
#[test]
fn test_open_bad_magic() {
let layout = SharedMemoryLayout::new(PageSize::DEFAULT, 64);
let mut bytes = layout.to_bytes();
bytes[0] = b'X'; assert_eq!(
SharedMemoryLayout::open(&bytes).unwrap_err(),
MvccError::ShmBadMagic
);
}
#[test]
fn test_open_bad_version() {
let layout = SharedMemoryLayout::new(PageSize::DEFAULT, 64);
let mut bytes = layout.to_bytes();
write_u32(&mut bytes, offsets::VERSION, 99);
assert_eq!(
SharedMemoryLayout::open(&bytes).unwrap_err(),
MvccError::ShmVersionMismatch
);
}
#[test]
fn test_open_bad_checksum() {
let layout = SharedMemoryLayout::new(PageSize::DEFAULT, 64);
let mut bytes = layout.to_bytes();
write_u32(&mut bytes, offsets::MAX_TXN_SLOTS, 999);
assert_eq!(
SharedMemoryLayout::open(&bytes).unwrap_err(),
MvccError::ShmChecksumMismatch
);
}
#[test]
fn test_atomic_u64_offsets_divisible_by_8() {
let u64_offsets = [
offsets::NEXT_TXN_ID,
offsets::SNAPSHOT_SEQ,
offsets::COMMIT_SEQ,
offsets::SCHEMA_EPOCH,
offsets::ECS_EPOCH,
offsets::GC_HORIZON,
offsets::SERIALIZED_WRITER_TXN_ID,
offsets::SERIALIZED_WRITER_PID_AND_GEN,
offsets::SERIALIZED_WRITER_PID_BIRTH,
offsets::SERIALIZED_WRITER_LEASE_EXPIRY,
offsets::LOCK_TABLE_OFFSET,
offsets::WITNESS_OFFSET,
offsets::TXN_SLOT_OFFSET,
offsets::COMMITTED_READERS_OFFSET,
offsets::COMMITTED_READERS_BYTES,
offsets::LAYOUT_CHECKSUM,
];
for &off in &u64_offsets {
assert_eq!(off % 8, 0, "offset {off} not 8-byte aligned");
}
}
#[test]
fn test_atomic_u32_offsets_divisible_by_4() {
let u32_offsets = [
offsets::VERSION,
offsets::PAGE_SIZE,
offsets::MAX_TXN_SLOTS,
offsets::ALIGN0,
];
for &off in &u32_offsets {
assert_eq!(off % 4, 0, "offset {off} not 4-byte aligned");
}
}
#[test]
fn test_checksum_immutable_only() {
let layout = SharedMemoryLayout::new(PageSize::DEFAULT, 64);
let cksum1 = layout.layout_checksum();
layout.publish_snapshot(CommitSeq::new(999), SchemaEpoch::new(888), 777);
let cksum2 = SharedMemoryLayout::compute_checksum_from_parts(
layout.page_size(),
layout.max_txn_slots(),
layout.lock_table_offset(),
layout.witness_offset(),
layout.txn_slot_offset(),
layout.committed_readers_offset(),
layout.committed_readers_bytes(),
);
assert_eq!(
cksum1, cksum2,
"checksum must only depend on immutable fields"
);
}
#[test]
fn test_checksum_deterministic() {
let a = SharedMemoryLayout::new(PageSize::DEFAULT, 64);
let b = SharedMemoryLayout::new(PageSize::DEFAULT, 64);
assert_eq!(a.layout_checksum(), b.layout_checksum());
}
#[test]
fn test_checksum_different_params_differ() {
let a = SharedMemoryLayout::new(PageSize::DEFAULT, 64);
let b = SharedMemoryLayout::new(PageSize::DEFAULT, 128);
assert_ne!(
a.layout_checksum(),
b.layout_checksum(),
"different max_txn_slots must produce different checksums"
);
let c = SharedMemoryLayout::new(PageSize::new(8192).unwrap(), 64);
assert_ne!(
a.layout_checksum(),
c.layout_checksum(),
"different page_size must produce different checksums"
);
}
#[test]
fn test_seqlock_begin_end_publish() {
let layout = SharedMemoryLayout::new(PageSize::DEFAULT, 64);
assert_eq!(layout.snapshot_seq.load(Ordering::Relaxed), 0);
layout.begin_snapshot_publish();
assert_eq!(
layout.snapshot_seq.load(Ordering::Relaxed) % 2,
1,
"after begin, seq must be odd"
);
layout.end_snapshot_publish();
assert_eq!(
layout.snapshot_seq.load(Ordering::Relaxed) % 2,
0,
"after end, seq must be even"
);
assert_eq!(layout.snapshot_seq.load(Ordering::Relaxed), 2);
}
#[test]
fn test_load_consistent_snapshot() {
let layout = SharedMemoryLayout::new(PageSize::DEFAULT, 64);
layout.publish_snapshot(CommitSeq::new(5), SchemaEpoch::new(3), 17);
let snap = layout.load_consistent_snapshot();
assert_eq!(snap.commit_seq, CommitSeq::new(5));
assert_eq!(snap.schema_epoch, SchemaEpoch::new(3));
assert_eq!(snap.ecs_epoch, 17);
}
#[test]
fn test_seqlock_threaded_retry() {
let layout = Arc::new(SharedMemoryLayout::new(PageSize::DEFAULT, 64));
let layout2 = Arc::clone(&layout);
let writer = thread::spawn(move || {
for i in 1..=1000_u64 {
layout2.publish_snapshot(CommitSeq::new(i), SchemaEpoch::new(i * 2), i * 3);
}
});
let reader_layout = Arc::clone(&layout);
let reader = thread::spawn(move || {
let mut reads = 0_u64;
while reads < 5000 {
let snap = reader_layout.load_consistent_snapshot();
let cs = snap.commit_seq.get();
if cs > 0 {
assert_eq!(
snap.schema_epoch.get(),
cs * 2,
"torn read: schema_epoch mismatch at cs={cs}"
);
assert_eq!(
snap.ecs_epoch,
cs * 3,
"torn read: ecs_epoch mismatch at cs={cs}"
);
}
reads += 1;
}
});
writer.join().unwrap();
reader.join().unwrap();
}
#[test]
fn test_seqlock_crash_repair() {
let layout = SharedMemoryLayout::new(PageSize::DEFAULT, 64);
layout.begin_snapshot_publish();
assert_eq!(layout.snapshot_seq.load(Ordering::Relaxed) % 2, 1);
layout.reconcile(CommitSeq::new(10), SchemaEpoch::new(5), 3);
assert_eq!(
layout.snapshot_seq.load(Ordering::Relaxed) % 2,
0,
"reconcile must repair odd snapshot_seq"
);
let snap = layout.load_consistent_snapshot();
assert_eq!(snap.commit_seq, CommitSeq::new(10));
assert_eq!(snap.schema_epoch, SchemaEpoch::new(5));
assert_eq!(snap.ecs_epoch, 3);
}
#[test]
fn test_seqlock_ddl_ordering() {
let layout = SharedMemoryLayout::new(PageSize::DEFAULT, 64);
layout.publish_snapshot(CommitSeq::new(100), SchemaEpoch::new(50), 25);
let snap = layout.load_consistent_snapshot();
assert_eq!(snap.commit_seq, CommitSeq::new(100));
assert_eq!(snap.schema_epoch, SchemaEpoch::new(50));
assert_eq!(snap.ecs_epoch, 25);
}
#[test]
fn test_seqlock_ecs_epoch_included() {
let layout = SharedMemoryLayout::new(PageSize::DEFAULT, 64);
layout.publish_snapshot(CommitSeq::new(1), SchemaEpoch::new(1), 42);
let snap = layout.load_consistent_snapshot();
assert_eq!(
snap.ecs_epoch, 42,
"ecs_epoch must be included in seqlock-protected snapshot"
);
}
#[test]
fn test_reconcile_clamp_ahead() {
let layout = SharedMemoryLayout::new(PageSize::DEFAULT, 64);
layout.publish_snapshot(CommitSeq::new(100), SchemaEpoch::new(50), 30);
layout.reconcile(CommitSeq::new(80), SchemaEpoch::new(40), 20);
let snap = layout.load_consistent_snapshot();
assert_eq!(snap.commit_seq, CommitSeq::new(80));
assert_eq!(snap.schema_epoch, SchemaEpoch::new(40));
assert_eq!(snap.ecs_epoch, 20);
}
#[test]
fn test_reconcile_advance_behind() {
let layout = SharedMemoryLayout::new(PageSize::DEFAULT, 64);
layout.publish_snapshot(CommitSeq::new(10), SchemaEpoch::new(5), 3);
layout.reconcile(CommitSeq::new(50), SchemaEpoch::new(25), 15);
let snap = layout.load_consistent_snapshot();
assert_eq!(snap.commit_seq, CommitSeq::new(50));
assert_eq!(snap.schema_epoch, SchemaEpoch::new(25));
assert_eq!(snap.ecs_epoch, 15);
}
#[test]
fn test_reconcile_repair_odd_seq() {
let layout = SharedMemoryLayout::new(PageSize::DEFAULT, 64);
layout.begin_snapshot_publish();
let seq_before = layout.snapshot_seq.load(Ordering::Relaxed);
assert_eq!(seq_before % 2, 1);
layout.reconcile(CommitSeq::new(1), SchemaEpoch::new(1), 1);
let seq_after = layout.snapshot_seq.load(Ordering::Relaxed);
assert_eq!(seq_after % 2, 0, "reconcile must leave seqlock even");
}
#[test]
fn test_serialized_writer_acquire_release() {
let layout = SharedMemoryLayout::new(PageSize::DEFAULT, 64);
assert!(layout.check_serialized_writer().is_none());
assert!(layout.acquire_serialized_writer(42, 1234, 999, 10_000));
assert!(layout.check_serialized_writer().is_some());
assert_eq!(layout.check_serialized_writer().unwrap().get(), 42);
assert!(layout.release_serialized_writer(42));
assert!(layout.check_serialized_writer().is_none());
}
#[test]
fn test_serialized_writer_idempotent_release() {
let layout = SharedMemoryLayout::new(PageSize::DEFAULT, 64);
assert!(layout.acquire_serialized_writer(42, 1234, 999, 10_000));
assert!(layout.release_serialized_writer(42));
assert!(!layout.release_serialized_writer(42));
}
#[test]
fn test_serialized_writer_blocks_second_writer_txn_id() {
let layout = SharedMemoryLayout::new(PageSize::DEFAULT, 64);
assert!(layout.acquire_serialized_writer(42, 1234, 999, 10_000));
assert!(!layout.acquire_serialized_writer(99, 5678, 888, 10_000));
assert!(layout.release_serialized_writer(42));
}
#[test]
fn test_serialized_writer_lease_set() {
let layout = SharedMemoryLayout::new(PageSize::DEFAULT, 64);
let lease_expiry = 10_000_u64;
assert!(layout.acquire_serialized_writer(42, 1234, 999, lease_expiry));
assert_eq!(
layout
.serialized_writer_lease_expiry
.load(Ordering::Relaxed),
lease_expiry
);
assert_eq!(
layout.serialized_writer_pid_and_gen.load(Ordering::Relaxed) as u32,
1234
);
assert_eq!(
layout.serialized_writer_pid_birth.load(Ordering::Relaxed),
999
);
}
#[test]
fn test_buffer_too_small() {
let buf = vec![0u8; 100]; assert_eq!(
SharedMemoryLayout::open(&buf).unwrap_err(),
MvccError::ShmTooSmall
);
}
#[test]
fn test_stale_indicator_cleared_by_cas() {
let layout = SharedMemoryLayout::new(PageSize::DEFAULT, 64);
let now = 100_u64;
assert!(layout.acquire_serialized_writer(42, 1234, 999, now.saturating_sub(1)));
assert!(layout.check_serialized_writer().is_some());
let res = layout.check_serialized_writer_exclusion(now, |_pid, _birth| false);
assert!(res.is_ok(), "stale indicator should be cleared");
assert!(layout.check_serialized_writer().is_none());
assert_eq!(
layout.serialized_writer_pid_and_gen.load(Ordering::Relaxed) as u32,
0
);
assert_eq!(
layout.serialized_writer_pid_birth.load(Ordering::Relaxed),
0
);
assert_eq!(
layout
.serialized_writer_lease_expiry
.load(Ordering::Relaxed),
0
);
}
#[test]
fn test_stale_indicator_clears_when_now_equals_lease_expiry() {
let layout = SharedMemoryLayout::new(PageSize::DEFAULT, 64);
let now = 500_u64;
assert!(layout.acquire_serialized_writer(77, 4321, 123, now));
assert!(layout.check_serialized_writer().is_some());
let res = layout.check_serialized_writer_exclusion(now, |_pid, _birth| true);
assert!(res.is_ok(), "lease boundary should be treated as stale");
assert!(
layout.check_serialized_writer().is_none(),
"stale indicator must be cleared at lease boundary"
);
}
#[test]
fn test_cas_retry_on_new_writer_during_stale_clear() {
let layout = SharedMemoryLayout::new(PageSize::DEFAULT, 64);
let now = 100_u64;
assert!(layout.acquire_serialized_writer(42, 1234, 999, now.saturating_sub(1)));
let mut injected = false;
let res = layout.check_serialized_writer_exclusion_with_hook(
now,
|_pid, _birth| true,
&mut |writer_txn_id| {
if injected {
return;
}
injected = true;
assert_eq!(writer_txn_id, 42);
assert!(layout.release_serialized_writer(42));
assert!(layout.acquire_serialized_writer(99, 5678, 888, now + 10_000));
},
);
assert_eq!(
res.unwrap_err(),
MvccError::Busy,
"new writer should block stale cleanup completion"
);
assert_eq!(layout.check_serialized_writer().unwrap().get(), 99);
}
#[test]
fn test_various_page_sizes() {
for &ps_raw in &[512, 1024, 2048, 4096, 8192, 16384, 32768, 65536] {
let ps = PageSize::new(ps_raw).unwrap();
let layout = SharedMemoryLayout::new(ps, 64);
let bytes = layout.to_bytes();
let restored = SharedMemoryLayout::open(&bytes).unwrap();
assert_eq!(restored.page_size(), ps);
}
}
#[test]
fn test_alloc_txn_id() {
let layout = SharedMemoryLayout::new(PageSize::DEFAULT, 64);
let id1 = layout.alloc_txn_id().unwrap();
let id2 = layout.alloc_txn_id().unwrap();
assert_eq!(id1.get(), 1);
assert_eq!(id2.get(), 2);
}
#[test]
fn test_alloc_txn_id_threaded() {
let layout = Arc::new(SharedMemoryLayout::new(PageSize::DEFAULT, 64));
let mut all_ids: Vec<u64> = (0..4)
.map(|_| {
let l = Arc::clone(&layout);
thread::spawn(move || {
let mut ids = Vec::with_capacity(100);
for _ in 0..100 {
ids.push(l.alloc_txn_id().unwrap().get());
}
ids
})
})
.flat_map(|h| h.join().unwrap())
.collect();
all_ids.sort_unstable();
all_ids.dedup();
assert_eq!(all_ids.len(), 400, "all 400 TxnIds must be unique");
}
#[test]
fn test_shm_magic_version_checksum() {
let layout = SharedMemoryLayout::new(PageSize::DEFAULT, 128);
let bytes = layout.to_bytes();
assert_eq!(&bytes[0..8], b"FSQLSHM\0", "magic must be FSQLSHM\\0");
let version = read_u32(&bytes, offsets::VERSION);
assert_eq!(version, 1, "layout version must be 1");
let stored = read_u64(&bytes, offsets::LAYOUT_CHECKSUM);
assert_eq!(
stored,
layout.layout_checksum(),
"stored checksum must match computed checksum"
);
}
#[test]
#[allow(clippy::items_after_statements)]
fn test_align_padding_fields_are_zero() {
let layout = SharedMemoryLayout::new(PageSize::DEFAULT, 64);
let bytes = layout.to_bytes();
let align0 = read_u32(&bytes, offsets::ALIGN0);
assert_eq!(align0, 0, "_align0 padding must be 0");
}
#[test]
fn test_begin_snapshot_publish_stale_odd_handled() {
let layout = SharedMemoryLayout::new(PageSize::DEFAULT, 64);
layout.begin_snapshot_publish();
let seq_odd = layout.snapshot_seq.load(Ordering::Relaxed);
assert_eq!(seq_odd % 2, 1);
layout.begin_snapshot_publish();
let seq_still_odd = layout.snapshot_seq.load(Ordering::Relaxed);
assert_eq!(
seq_still_odd, seq_odd,
"begin on stale odd must not increment further"
);
}
#[test]
fn test_load_consistent_snapshot_retries_until_even() {
let layout = Arc::new(SharedMemoryLayout::new(PageSize::DEFAULT, 64));
layout.publish_snapshot(CommitSeq::new(10), SchemaEpoch::new(20), 30);
layout.begin_snapshot_publish();
layout.schema_epoch.store(200, Ordering::Release);
layout.ecs_epoch.store(300, Ordering::Release);
layout.commit_seq.store(100, Ordering::Release);
let reader_layout = Arc::clone(&layout);
let reader = thread::spawn(move || reader_layout.load_consistent_snapshot());
thread::sleep(std::time::Duration::from_millis(5));
layout.end_snapshot_publish();
let snap = reader.join().unwrap();
assert_eq!(snap.commit_seq.get(), 100);
assert_eq!(snap.schema_epoch.get(), 200);
assert_eq!(snap.ecs_epoch, 300);
}
#[test]
fn test_serialized_writer_zero_means_no_writer() {
let layout = SharedMemoryLayout::new(PageSize::DEFAULT, 64);
assert!(
layout.check_serialized_writer().is_none(),
"writer_txn_id=0 must mean no active serialized writer"
);
}
#[test]
fn test_serialized_writer_aux_cleared_on_release() {
let layout = SharedMemoryLayout::new(PageSize::DEFAULT, 64);
assert!(layout.acquire_serialized_writer(42, 1234, 999, 10_000));
assert_eq!(
layout.serialized_writer_pid_and_gen.load(Ordering::Relaxed) as u32,
1234
);
assert_eq!(
layout.serialized_writer_pid_birth.load(Ordering::Relaxed),
999
);
assert!(layout.release_serialized_writer(42));
assert!(layout.check_serialized_writer().is_none());
assert_eq!(
layout.serialized_writer_pid_and_gen.load(Ordering::Relaxed) as u32,
0,
"PID must be cleared on release"
);
assert_eq!(
layout.serialized_writer_pid_birth.load(Ordering::Relaxed),
0,
"PID birth must be cleared on release"
);
assert_eq!(
layout
.serialized_writer_lease_expiry
.load(Ordering::Relaxed),
0,
"lease expiry must be cleared on release"
);
}
#[test]
fn test_zero_max_txn_slots_uses_default() {
let layout = SharedMemoryLayout::new(PageSize::DEFAULT, 0);
assert_eq!(
layout.max_txn_slots(),
128,
"zero max_txn_slots must use default (128)"
);
}
#[test]
fn test_shm_reconciliation_never_ahead_of_durable() {
let layout = SharedMemoryLayout::new(PageSize::DEFAULT, 64);
layout.publish_snapshot(CommitSeq::new(100), SchemaEpoch::new(50), 30);
layout.reconcile(CommitSeq::new(95), SchemaEpoch::new(45), 25);
let snap = layout.load_consistent_snapshot();
assert_eq!(
snap.commit_seq.get(),
95,
"commit_seq must be clamped to durable"
);
assert_eq!(
snap.schema_epoch.get(),
45,
"schema_epoch must be clamped to durable"
);
layout.reconcile(CommitSeq::new(200), SchemaEpoch::new(100), 60);
let snap = layout.load_consistent_snapshot();
assert_eq!(
snap.commit_seq.get(),
200,
"commit_seq must advance to durable"
);
assert_eq!(
snap.schema_epoch.get(),
100,
"schema_epoch must advance to durable"
);
}
#[test]
fn test_ddl_schema_epoch_stored_before_commit_seq() {
let layout = Arc::new(SharedMemoryLayout::new(PageSize::DEFAULT, 64));
let writer_layout = Arc::clone(&layout);
let writer = thread::spawn(move || {
for i in 1..=500_u64 {
writer_layout.publish_snapshot(CommitSeq::new(i), SchemaEpoch::new(i * 2), i * 3);
}
});
let reader_layout = Arc::clone(&layout);
let reader = thread::spawn(move || {
for _ in 0..2000 {
let snap = reader_layout.load_consistent_snapshot();
let cs = snap.commit_seq.get();
if cs > 0 {
assert_eq!(
snap.schema_epoch.get(),
cs * 2,
"DDL ordering violation: schema_epoch inconsistent with commit_seq"
);
assert_eq!(
snap.ecs_epoch,
cs * 3,
"ecs_epoch inconsistent with commit_seq"
);
}
}
});
writer.join().unwrap();
reader.join().unwrap();
}
#[test]
fn test_no_reinterpret_cast_safe_mmap_only() {
let mem_size = std::mem::size_of::<SharedMemoryLayout>();
let layout = SharedMemoryLayout::new(PageSize::DEFAULT, 64);
let bytes = layout.to_bytes();
assert_eq!(bytes.len(), 216);
let _restored = SharedMemoryLayout::open(&bytes).unwrap();
assert_eq!(
SharedMemoryLayout::HEADER_SIZE,
216,
"wire-format header must be exactly 216 bytes"
);
let _ = mem_size; }
mod proptests {
use super::*;
use proptest::prelude::*;
proptest! {
#[test]
fn seqlock_never_returns_mixed_snapshot(
cs in 0_u64..10_000,
se in 0_u64..10_000,
ee in 0_u64..10_000,
) {
let layout = SharedMemoryLayout::new(PageSize::DEFAULT, 64);
layout.publish_snapshot(CommitSeq::new(cs), SchemaEpoch::new(se), ee);
let snap = layout.load_consistent_snapshot();
prop_assert_eq!(snap.commit_seq.get(), cs);
prop_assert_eq!(snap.schema_epoch.get(), se);
prop_assert_eq!(snap.ecs_epoch, ee);
}
#[test]
fn reconciliation_sets_exact_values(
durable_cs in 0_u64..10_000,
durable_se in 0_u64..10_000,
durable_ee in 0_u64..10_000,
) {
let layout = SharedMemoryLayout::new(PageSize::DEFAULT, 64);
layout.publish_snapshot(CommitSeq::new(5000), SchemaEpoch::new(5000), 5000);
layout.reconcile(
CommitSeq::new(durable_cs),
SchemaEpoch::new(durable_se),
durable_ee,
);
let snap = layout.load_consistent_snapshot();
prop_assert_eq!(snap.commit_seq.get(), durable_cs);
prop_assert_eq!(snap.schema_epoch.get(), durable_se);
prop_assert_eq!(snap.ecs_epoch, durable_ee);
}
#[test]
fn checksum_deterministic_property(
ps_idx in 0_usize..8,
slots in 1_u32..512,
) {
let page_sizes = [512, 1024, 2048, 4096, 8192, 16384, 32768, 65536_u32];
let ps = PageSize::new(page_sizes[ps_idx]).unwrap();
let a = SharedMemoryLayout::new(ps, slots);
let b = SharedMemoryLayout::new(ps, slots);
prop_assert_eq!(a.layout_checksum(), b.layout_checksum());
}
}
}
}