use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use smallvec::SmallVec;
use fsqlite_types::sync_primitives::{Mutex, RwLock};
use fsqlite_types::{
CommitSeq, PageNumber, PageNumberBuildHasher, PageSize, PageVersion, Snapshot, TxnId,
VersionPointer,
};
use crate::cache_aligned::CacheAligned;
use crate::core_types::{Transaction, VersionArena, VersionIdx};
use crate::ebr::{EbrRetireQueue, VersionGuardRegistry};
use crate::gc::{GcTickResult, GcTodo, gc_tick_with_registry, prune_page_chain_with_registry};
use crate::observability::record_cas_attempt;
pub struct TxnManager {
next_txn_id: AtomicU64,
next_commit_seq: AtomicU64,
active_commits: Mutex<SmallVec<[u64; 16]>>,
stable_commit_seq: AtomicU64,
}
impl TxnManager {
#[must_use]
pub fn new(initial_txn_id: u64, initial_commit_seq: u64) -> Self {
Self {
next_txn_id: AtomicU64::new(initial_txn_id),
next_commit_seq: AtomicU64::new(initial_commit_seq),
active_commits: Mutex::new(SmallVec::new()),
stable_commit_seq: AtomicU64::new(initial_commit_seq.saturating_sub(1)),
}
}
pub fn alloc_txn_id(&self) -> Option<TxnId> {
loop {
let current = self.next_txn_id.load(Ordering::Acquire);
if current > TxnId::MAX_RAW {
return None; }
let next = current.checked_add(1)?;
if self
.next_txn_id
.compare_exchange_weak(current, next, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
return TxnId::new(current);
}
}
}
#[allow(clippy::significant_drop_tightening)]
pub fn alloc_commit_seq(&self) -> CommitSeq {
let mut active = self.active_commits.lock();
let seq = self.next_commit_seq.fetch_add(1, Ordering::Release);
active.push(seq);
CommitSeq::new(seq)
}
pub fn finish_commit_seq(&self, seq: CommitSeq) {
let mut active = self.active_commits.lock();
let raw = seq.get();
if let Some(pos) = active.iter().position(|&s| s == raw) {
active.remove(pos);
} else {
debug_assert!(false, "finished commit seq {raw} was not active");
}
let new_stable = if let Some(&min_active) = active.first() {
min_active.saturating_sub(1)
} else {
self.next_commit_seq
.load(Ordering::Acquire)
.saturating_sub(1)
};
drop(active);
self.stable_commit_seq
.fetch_max(new_stable, Ordering::Release);
}
#[must_use]
pub fn current_txn_counter(&self) -> u64 {
self.next_txn_id.load(Ordering::Acquire)
}
#[must_use]
pub fn current_commit_counter(&self) -> u64 {
self.stable_commit_seq
.load(Ordering::Acquire)
.saturating_add(1)
}
}
impl Default for TxnManager {
fn default() -> Self {
Self::new(1, 1)
}
}
impl std::fmt::Debug for TxnManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TxnManager")
.field("next_txn_id", &self.next_txn_id.load(Ordering::Relaxed))
.field(
"next_commit_seq",
&self.next_commit_seq.load(Ordering::Relaxed),
)
.field("active_commits", &*self.active_commits.lock())
.field(
"stable_commit_seq",
&self.stable_commit_seq.load(Ordering::Relaxed),
)
.finish()
}
}
#[inline]
#[must_use]
pub fn visible(version: &PageVersion, snapshot: &Snapshot) -> bool {
version.commit_seq.get() != 0 && version.commit_seq <= snapshot.high
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct VersionVisibilityRange {
pub begin_ts: CommitSeq,
pub end_ts: Option<CommitSeq>,
}
impl VersionVisibilityRange {
#[must_use]
pub fn contains(self, snapshot_ts: CommitSeq) -> bool {
if snapshot_ts < self.begin_ts {
return false;
}
match self.end_ts {
Some(end) => snapshot_ts < end,
None => true,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SnapshotResolveTrace {
pub version_idx: Option<VersionIdx>,
pub versions_traversed: u64,
}
pub const CHAIN_HEAD_SHARDS: usize = 64;
pub const CHAIN_HEAD_EMPTY: u64 = u64::MAX;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CasInstallResult {
Installed { previous: Option<VersionIdx> },
Retry,
}
struct ChainHeadShard {
directory: RwLock<HashMap<PageNumber, usize, PageNumberBuildHasher>>,
slots: RwLock<Vec<CacheAligned<AtomicU64>>>,
}
impl ChainHeadShard {
fn new() -> Self {
Self {
directory: RwLock::new(HashMap::with_hasher(PageNumberBuildHasher::default())),
slots: RwLock::new(Vec::new()),
}
}
fn ensure_slot(&self, pgno: PageNumber) -> usize {
{
let dir = self.directory.read();
if let Some(&idx) = dir.get(&pgno) {
return idx;
}
}
let mut dir = self.directory.write();
if let Some(&idx) = dir.get(&pgno) {
return idx;
}
let mut slots = self.slots.write();
let slot_idx = slots.len();
slots.push(CacheAligned::new(AtomicU64::new(CHAIN_HEAD_EMPTY)));
dir.insert(pgno, slot_idx);
slot_idx
}
fn slot_index(&self, pgno: PageNumber) -> Option<usize> {
let dir = self.directory.read();
dir.get(&pgno).copied()
}
}
pub struct ChainHeadTable {
shards: Box<[ChainHeadShard; CHAIN_HEAD_SHARDS]>,
}
impl ChainHeadTable {
#[must_use]
pub fn new() -> Self {
Self {
shards: Box::new(std::array::from_fn(|_| ChainHeadShard::new())),
}
}
#[inline]
fn shard_index(pgno: PageNumber) -> usize {
(pgno.get() as usize) & (CHAIN_HEAD_SHARDS - 1)
}
#[inline]
fn pack_idx(idx: VersionIdx) -> u64 {
let chunk = u64::from(idx.chunk());
let offset = u64::from(idx.offset());
let generation = u64::from(idx.generation());
assert!(chunk <= 0xF_FFFF, "VersionIdx chunk overflow (max 20 bits)");
assert!(offset <= 0xFFF, "VersionIdx offset overflow (max 12 bits)");
(generation << 32) | (chunk << 12) | offset
}
#[inline]
fn unpack_idx(raw: u64) -> Option<VersionIdx> {
if raw == CHAIN_HEAD_EMPTY {
return None;
}
#[allow(clippy::cast_possible_truncation)]
let offset = (raw & 0xFFF) as u32;
#[allow(clippy::cast_possible_truncation)]
let chunk = ((raw >> 12) & 0xF_FFFF) as u32;
#[allow(clippy::cast_possible_truncation)]
let generation = (raw >> 32) as u32;
Some(VersionIdx::new(chunk, offset, generation))
}
#[must_use]
pub fn get_head(&self, pgno: PageNumber) -> Option<VersionIdx> {
let shard = &self.shards[Self::shard_index(pgno)];
let slot_idx = shard.slot_index(pgno)?;
let slots = shard.slots.read();
let raw = slots[slot_idx].load(Ordering::Acquire);
Self::unpack_idx(raw)
}
pub fn install(
&self,
pgno: PageNumber,
new_head: VersionIdx,
expected_prev: Option<VersionIdx>,
) -> CasInstallResult {
let shard = &self.shards[Self::shard_index(pgno)];
let slot_idx = shard.ensure_slot(pgno);
let slots = shard.slots.read();
let expected_raw = expected_prev.map_or(CHAIN_HEAD_EMPTY, Self::pack_idx);
let new_raw = Self::pack_idx(new_head);
match slots[slot_idx].compare_exchange(
expected_raw,
new_raw,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => CasInstallResult::Installed {
previous: expected_prev,
},
Err(_) => CasInstallResult::Retry,
}
}
pub fn install_with_retry(
&self,
pgno: PageNumber,
new_head: VersionIdx,
) -> (Option<VersionIdx>, u32) {
let shard = &self.shards[Self::shard_index(pgno)];
let slot_idx = shard.ensure_slot(pgno);
let slots = shard.slots.read();
let new_raw = Self::pack_idx(new_head);
let mut attempts = 0_u32;
loop {
attempts += 1;
let current_raw = slots[slot_idx].load(Ordering::Acquire);
match slots[slot_idx].compare_exchange_weak(
current_raw,
new_raw,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
let previous = Self::unpack_idx(current_raw);
return (previous, attempts);
}
Err(_) => {
std::hint::spin_loop();
}
}
}
}
pub fn remove(&self, pgno: PageNumber, expected: VersionIdx) -> bool {
let shard = &self.shards[Self::shard_index(pgno)];
let Some(slot_idx) = shard.slot_index(pgno) else {
return false;
};
let slots = shard.slots.read();
let expected_raw = Self::pack_idx(expected);
slots[slot_idx]
.compare_exchange(
expected_raw,
CHAIN_HEAD_EMPTY,
Ordering::AcqRel,
Ordering::Acquire,
)
.is_ok()
}
pub fn for_each_head(&self, mut f: impl FnMut(PageNumber, VersionIdx)) {
for shard in self.shards.iter() {
let dir = shard.directory.read();
let slots = shard.slots.read();
for (&pgno, &slot_idx) in dir.iter() {
let raw = slots[slot_idx].load(Ordering::Acquire);
if let Some(idx) = Self::unpack_idx(raw) {
f(pgno, idx);
}
}
}
}
#[must_use]
pub fn page_count(&self) -> usize {
let mut count = 0;
for shard in self.shards.iter() {
let dir = shard.directory.read();
let slots = shard.slots.read();
for &slot_idx in dir.values() {
let raw = slots[slot_idx].load(Ordering::Relaxed);
if raw != CHAIN_HEAD_EMPTY {
count += 1;
}
}
}
count
}
}
impl Default for ChainHeadTable {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Debug for ChainHeadTable {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ChainHeadTable")
.field("shards", &CHAIN_HEAD_SHARDS)
.field("page_count", &self.page_count())
.finish()
}
}
pub struct VersionStore {
arena: RwLock<VersionArena>,
chain_heads: ChainHeadTable,
visibility_ranges: RwLock<HashMap<VersionIdx, VersionVisibilityRange>>,
page_size: PageSize,
guard_registry: Arc<VersionGuardRegistry>,
retire_queue: EbrRetireQueue,
gc_epoch: AtomicU64,
}
impl VersionStore {
#[must_use]
pub fn new(page_size: PageSize) -> Self {
Self::new_with_guard_registry(page_size, Arc::new(VersionGuardRegistry::default()))
}
#[must_use]
pub fn new_with_guard_registry(
page_size: PageSize,
guard_registry: Arc<VersionGuardRegistry>,
) -> Self {
Self {
arena: RwLock::new(VersionArena::new()),
chain_heads: ChainHeadTable::new(),
visibility_ranges: RwLock::new(HashMap::new()),
page_size,
guard_registry,
retire_queue: EbrRetireQueue::new(),
gc_epoch: AtomicU64::new(0),
}
}
#[must_use]
pub fn guard_registry(&self) -> &Arc<VersionGuardRegistry> {
&self.guard_registry
}
pub fn publish(&self, version: PageVersion) -> VersionIdx {
let pgno = version.pgno;
let begin_ts = version.commit_seq;
let shard = &self.chain_heads.shards[ChainHeadTable::shard_index(pgno)];
let slot_idx = shard.ensure_slot(pgno);
let mut arena = self.arena.write();
let idx = arena.alloc(version);
let new_raw = ChainHeadTable::pack_idx(idx);
let mut cas_attempts = 0_u32;
let previous_head = loop {
cas_attempts += 1;
let slots = shard.slots.read();
let current_raw = slots[slot_idx].load(Ordering::Acquire);
let prev = ChainHeadTable::unpack_idx(current_raw);
let v = arena.get_mut(idx).expect("just allocated");
v.prev = prev.map(idx_to_version_pointer);
match slots[slot_idx].compare_exchange_weak(
current_raw,
new_raw,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => break prev,
Err(_) => {
std::hint::spin_loop();
}
}
};
drop(arena);
record_cas_attempt(cas_attempts);
let mut ranges = self.visibility_ranges.write();
ranges.insert(
idx,
VersionVisibilityRange {
begin_ts,
end_ts: None,
},
);
if let Some(old_head) = previous_head {
if let Some(old_range) = ranges.get_mut(&old_head) {
old_range.end_ts = Some(begin_ts);
}
}
drop(ranges);
tracing::debug!(pgno = pgno.get(), "version published to chain head");
idx
}
#[must_use]
#[allow(clippy::significant_drop_tightening)]
pub fn resolve(&self, page: PageNumber, snapshot: &Snapshot) -> Option<VersionIdx> {
self.resolve_with_trace(page, snapshot).version_idx
}
#[must_use]
#[allow(clippy::significant_drop_tightening)]
pub fn resolve_visible_version(
&self,
page: PageNumber,
snapshot: &Snapshot,
) -> Option<PageVersion> {
'retry: loop {
let Some(head_idx) = self.chain_heads.get_head(page) else {
crate::observability::record_snapshot_read_versions_traversed(0);
return None;
};
let arena = self.arena.read();
let mut current_idx = head_idx;
let mut traversed = 0;
loop {
let Some(version) = arena.get(current_idx) else {
continue 'retry;
};
traversed += 1;
if visible(version, snapshot) {
crate::observability::record_snapshot_read_versions_traversed(traversed);
return Some(version.clone());
}
if let Some(prev_ptr) = version.prev {
current_idx = version_pointer_to_idx(prev_ptr);
} else {
crate::observability::record_snapshot_read_versions_traversed(traversed);
return None;
}
}
}
}
#[must_use]
#[allow(clippy::significant_drop_tightening)]
pub fn resolve_visible_commit_seq(
&self,
page: PageNumber,
snapshot: &Snapshot,
) -> Option<CommitSeq> {
'retry: loop {
let Some(head_idx) = self.chain_heads.get_head(page) else {
crate::observability::record_snapshot_read_versions_traversed(0);
return None;
};
let arena = self.arena.read();
let mut current_idx = head_idx;
let mut traversed = 0;
loop {
let Some(version) = arena.get(current_idx) else {
continue 'retry;
};
traversed += 1;
if visible(version, snapshot) {
crate::observability::record_snapshot_read_versions_traversed(traversed);
return Some(version.commit_seq);
}
if let Some(prev_ptr) = version.prev {
current_idx = version_pointer_to_idx(prev_ptr);
} else {
crate::observability::record_snapshot_read_versions_traversed(traversed);
return None;
}
}
}
}
#[must_use]
#[allow(clippy::significant_drop_tightening)]
pub fn chain_head_version(&self, page: PageNumber) -> Option<PageVersion> {
'retry: loop {
let head_idx = self.chain_heads.get_head(page)?;
let arena = self.arena.read();
let Some(version) = arena.get(head_idx) else {
continue 'retry;
};
return Some(version.clone());
}
}
#[must_use]
#[allow(clippy::significant_drop_tightening)]
#[inline(never)]
pub fn resolve_with_trace(
&self,
page: PageNumber,
snapshot: &Snapshot,
) -> SnapshotResolveTrace {
'retry: loop {
let Some(head_idx) = self.chain_heads.get_head(page) else {
return SnapshotResolveTrace {
version_idx: None,
versions_traversed: 0,
};
};
let arena = self.arena.read();
let mut current_idx = head_idx;
let mut traversed = 0_u64;
loop {
let Some(version) = arena.get(current_idx) else {
continue 'retry;
};
traversed = traversed.saturating_add(1);
if visible(version, snapshot) {
return SnapshotResolveTrace {
version_idx: Some(current_idx),
versions_traversed: traversed,
};
}
let Some(prev_ptr) = version.prev else {
return SnapshotResolveTrace {
version_idx: None,
versions_traversed: traversed,
};
};
current_idx = version_pointer_to_idx(prev_ptr);
}
}
}
#[must_use]
pub fn resolve_for_txn(&self, page: PageNumber, txn: &Transaction) -> Option<VersionIdx> {
if txn.write_set.contains(&page) {
return self.resolve(page, &txn.snapshot);
}
self.resolve(page, &txn.snapshot)
}
#[must_use]
pub fn get_version(&self, idx: VersionIdx) -> Option<PageVersion> {
let arena = self.arena.read();
arena.get(idx).cloned()
}
#[must_use]
pub fn chain_head(&self, page: PageNumber) -> Option<VersionIdx> {
self.chain_heads.get_head(page)
}
#[must_use]
pub fn visibility_range(&self, idx: VersionIdx) -> Option<VersionVisibilityRange> {
let ranges = self.visibility_ranges.read();
ranges.get(&idx).copied()
}
#[must_use]
#[allow(clippy::significant_drop_tightening)]
pub fn walk_chain(&self, page: PageNumber) -> Vec<PageVersion> {
loop {
let Some(head_idx) = self.chain_heads.get_head(page) else {
return Vec::new();
};
let arena = self.arena.read();
let mut result = Vec::new();
let mut current_idx = head_idx;
let mut race_detected = false;
while let Some(version) = arena.get(current_idx) {
let prev = version.prev;
result.push(version.clone());
match prev {
Some(ptr) => current_idx = version_pointer_to_idx(ptr),
None => break,
}
}
if let Some(last) = result.last() {
if last.prev.is_some() {
race_detected = true;
}
} else {
race_detected = true;
}
if race_detected {
continue;
}
return result;
}
}
#[must_use]
pub fn page_size(&self) -> PageSize {
self.page_size
}
#[must_use]
pub fn page_count(&self) -> usize {
self.chain_heads.page_count()
}
#[allow(clippy::significant_drop_tightening)]
pub fn gc_tick(&self, todo: &mut GcTodo, horizon: CommitSeq) -> GcTickResult {
let current_epoch = self.gc_epoch.fetch_add(1, Ordering::Relaxed);
self.try_recycle_retired_slots(current_epoch);
let mut arena = self.arena.write();
let result = gc_tick_with_registry(
todo,
horizon,
&mut arena,
&self.chain_heads,
self.guard_registry(),
);
drop(arena);
let mut ranges = self.visibility_ranges.write();
for idx in &result.pruned_indices {
ranges.remove(idx);
}
drop(ranges);
if !result.pruned_indices.is_empty() {
self.retire_queue
.retire_batch(result.pruned_indices.iter().copied(), current_epoch);
}
result
}
pub fn try_recycle_retired_slots(&self, current_epoch: u64) -> usize {
const MIN_EPOCH_GAP: u64 = 2;
let drained = self
.retire_queue
.drain_if_safe(current_epoch, MIN_EPOCH_GAP);
if drained.is_empty() {
return 0;
}
let count = drained.len();
let mut arena = self.arena.write();
arena.recycle_slots(drained);
drop(arena);
tracing::debug!(
target: "fsqlite_mvcc::gc",
recycled = count,
current_epoch,
"recycled retired arena slots"
);
count
}
pub fn force_recycle_all_retired_slots(&self) -> usize {
let drained = self.retire_queue.force_drain();
if drained.is_empty() {
return 0;
}
let count = drained.len();
let mut arena = self.arena.write();
arena.recycle_slots(drained);
drop(arena);
count
}
#[must_use]
pub fn pending_recycle_count(&self) -> usize {
self.retire_queue.pending_count()
}
#[must_use]
#[allow(clippy::cast_precision_loss)]
pub fn sample_chain_pressure(&self, sample_limit: usize) -> f64 {
let arena = self.arena.read();
let mut total_length = 0_usize;
let mut sampled = 0_usize;
self.chain_heads.for_each_head(|_pgno, head_idx| {
if sampled >= sample_limit {
return;
}
let mut current_idx = head_idx;
let mut chain_len = 0_usize;
while let Some(version) = arena.get(current_idx) {
chain_len += 1;
match version.prev {
Some(ptr) => current_idx = version_pointer_to_idx(ptr),
None => break,
}
}
total_length += chain_len;
sampled += 1;
});
if sampled == 0 {
0.0
} else {
total_length as f64 / sampled as f64
}
}
#[must_use]
#[allow(clippy::significant_drop_tightening)]
pub fn chain_length(&self, page: PageNumber) -> usize {
loop {
let Some(head_idx) = self.chain_heads.get_head(page) else {
return 0;
};
let arena = self.arena.read();
let mut len = 0_usize;
let mut current_idx = head_idx;
let mut raced = false;
loop {
let Some(version) = arena.get(current_idx) else {
raced = true;
break;
};
len = len.saturating_add(1);
match version.prev {
Some(ptr) => current_idx = version_pointer_to_idx(ptr),
None => break,
}
}
if raced {
continue;
}
return len;
}
}
#[must_use]
#[allow(clippy::significant_drop_tightening)]
pub fn prune_page_chain_eager(&self, page: PageNumber, horizon: CommitSeq) -> usize {
let mut arena = self.arena.write();
let result = prune_page_chain_with_registry(
page,
horizon,
&mut arena,
&self.chain_heads,
self.guard_registry(),
);
drop(arena);
if !result.pruned_indices.is_empty() {
let mut ranges = self.visibility_ranges.write();
for idx in &result.pruned_indices {
ranges.remove(idx);
}
let current_epoch = self.gc_epoch.load(Ordering::Relaxed);
self.retire_queue
.retire_batch(result.pruned_indices.iter().copied(), current_epoch);
}
usize::try_from(result.freed).unwrap_or(usize::MAX)
}
}
impl std::fmt::Debug for VersionStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let arena = self.arena.read();
let ranges = self.visibility_ranges.read();
f.debug_struct("VersionStore")
.field("page_size", &self.page_size.get())
.field("page_count", &self.chain_heads.page_count())
.field("visibility_range_count", &ranges.len())
.field("arena_high_water", &arena.high_water())
.field("pending_recycle_count", &self.pending_recycle_count())
.field("gc_epoch", &self.gc_epoch.load(Ordering::Relaxed))
.field("guard_registry", &self.guard_registry)
.finish_non_exhaustive()
}
}
pub struct SerializedWriteMutex {
inner: Mutex<Option<TxnId>>,
}
impl SerializedWriteMutex {
#[must_use]
pub fn new() -> Self {
Self {
inner: Mutex::new(None),
}
}
pub fn try_acquire(&self, txn: TxnId) -> Result<(), TxnId> {
let mut guard = self.inner.lock();
match *guard {
Some(holder) if holder != txn => Err(holder),
Some(_) => Ok(()), None => {
*guard = Some(txn);
drop(guard);
tracing::info!(txn_id = %txn, "serialized write mutex acquired");
Ok(())
}
}
}
pub fn release(&self, txn: TxnId) -> bool {
let mut guard = self.inner.lock();
if *guard == Some(txn) {
*guard = None;
drop(guard);
tracing::info!(txn_id = %txn, "serialized write mutex released");
true
} else {
false
}
}
#[must_use]
pub fn holder(&self) -> Option<TxnId> {
*self.inner.lock()
}
}
impl Default for SerializedWriteMutex {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Debug for SerializedWriteMutex {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SerializedWriteMutex")
.field("holder", &self.holder())
.finish()
}
}
#[inline]
#[must_use]
fn version_pointer_to_idx(ptr: VersionPointer) -> VersionIdx {
let raw = ptr.get();
#[allow(clippy::cast_possible_truncation)]
let offset = (raw & 0xFFF) as u32;
#[allow(clippy::cast_possible_truncation)]
let chunk = ((raw >> 12) & 0xF_FFFF) as u32;
#[allow(clippy::cast_possible_truncation)]
let generation = (raw >> 32) as u32;
VersionIdx::new(chunk, offset, generation)
}
#[inline]
#[must_use]
pub fn idx_to_version_pointer(idx: VersionIdx) -> VersionPointer {
let chunk = u64::from(idx.chunk());
let offset = u64::from(idx.offset());
let generation = u64::from(idx.generation());
assert!(chunk <= 0xF_FFFF, "VersionIdx chunk overflow (max 20 bits)");
assert!(offset <= 0xFFF, "VersionIdx offset overflow (max 12 bits)");
VersionPointer::new((generation << 32) | (chunk << 12) | offset)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core_types::{InProcessPageLockTable, TransactionMode, TransactionState};
use fsqlite_types::{PageData, SchemaEpoch, TxnEpoch, TxnToken};
use proptest::prelude::*;
fn make_snapshot(high: u64) -> Snapshot {
Snapshot::new(CommitSeq::new(high), SchemaEpoch::ZERO)
}
fn make_version(pgno: u32, commit_seq: u64, prev: Option<VersionPointer>) -> PageVersion {
PageVersion {
pgno: PageNumber::new(pgno).unwrap(),
commit_seq: CommitSeq::new(commit_seq),
created_by: TxnToken::new(TxnId::new(1).unwrap(), TxnEpoch::new(0)),
data: PageData::zeroed(PageSize::DEFAULT),
prev,
}
}
#[test]
fn test_inv1_txnid_monotonic_cas_loop() {
let mgr = TxnManager::default();
let mut prev = 0_u64;
for _ in 0..1000 {
let id = mgr.alloc_txn_id().expect("should not exhaust id space");
let raw = id.get();
assert!(
raw > prev,
"TxnId must be strictly increasing: {raw} <= {prev}"
);
assert_ne!(raw, 0, "TxnId must never be zero");
assert!(raw <= TxnId::MAX_RAW, "TxnId must not exceed MAX_RAW");
prev = raw;
}
}
#[test]
fn test_inv1_txnid_exhaustion() {
let mgr = TxnManager::new(TxnId::MAX_RAW, 1);
let id = mgr.alloc_txn_id();
assert!(id.is_some(), "should allocate the last valid TxnId");
assert_eq!(id.unwrap().get(), TxnId::MAX_RAW);
let id = mgr.alloc_txn_id();
assert!(id.is_none(), "should fail when id space is exhausted");
}
#[test]
fn test_inv1_commit_seq_monotonic() {
let mgr = TxnManager::default();
let mut prev = CommitSeq::ZERO;
for _ in 0..100 {
let seq = mgr.alloc_commit_seq();
assert!(seq > prev, "CommitSeq must be strictly increasing");
prev = seq;
}
}
#[test]
fn test_inv1_txnid_multithreaded_monotonicity() {
use std::sync::Arc;
let mgr = Arc::new(TxnManager::default());
let mut handles = Vec::new();
for _ in 0..4 {
let mgr = Arc::clone(&mgr);
handles.push(std::thread::spawn(move || {
let mut ids = Vec::with_capacity(250);
for _ in 0..250 {
ids.push(mgr.alloc_txn_id().unwrap().get());
}
ids
}));
}
let mut all_ids: Vec<u64> = handles
.into_iter()
.flat_map(|h| h.join().unwrap())
.collect();
let unique_count = {
let mut sorted = all_ids.clone();
sorted.sort_unstable();
sorted.dedup();
sorted.len()
};
assert_eq!(unique_count, 1000, "all TxnIds must be unique");
all_ids.sort_unstable();
for window in all_ids.windows(2) {
assert!(
window[0] < window[1],
"global TxnId sequence must be strictly increasing: {} >= {}",
window[0],
window[1]
);
}
}
#[test]
fn test_bd6883_first_attempt_ratio_64_threads_moderate_contention() {
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::thread;
const THREADS: u32 = 64;
const INSTALLS_PER_THREAD: u32 = 256;
const PAGE_FANOUT: u32 = 512;
const BEAD_ID: &str = "bd-688.3";
const RUN_ID: &str = "bd6883-cas-ratio-run";
const TRACE_ID: &str = "bd6883-cas-ratio-trace";
const SCENARIO_ID: &str = "cas_first_attempt_ratio_moderate_contention";
let chain_heads = Arc::new(ChainHeadTable::new());
let next_idx_raw = Arc::new(AtomicU64::new(0));
let first_attempts = Arc::new(AtomicU64::new(0));
let total_installs = Arc::new(AtomicU64::new(0));
let handles: Vec<_> = (0..THREADS)
.map(|tid| {
let chain_heads = Arc::clone(&chain_heads);
let next_idx_raw = Arc::clone(&next_idx_raw);
let first_attempts = Arc::clone(&first_attempts);
let total_installs = Arc::clone(&total_installs);
thread::spawn(move || {
for op in 0..INSTALLS_PER_THREAD {
let global = tid * INSTALLS_PER_THREAD + op;
let pgno = PageNumber::new((global % PAGE_FANOUT) + 1)
.expect("page number must be non-zero");
let raw = next_idx_raw.fetch_add(1, Ordering::Relaxed);
#[allow(clippy::cast_possible_truncation)]
let chunk = (raw / 4096) as u32;
#[allow(clippy::cast_possible_truncation)]
let offset = (raw % 4096) as u32;
let idx = VersionIdx::new(chunk, offset, 1);
let (_previous, attempts) = chain_heads.install_with_retry(pgno, idx);
total_installs.fetch_add(1, Ordering::Relaxed);
if attempts == 1 {
first_attempts.fetch_add(1, Ordering::Relaxed);
}
}
})
})
.collect();
for handle in handles {
handle.join().expect("stress thread must not panic");
}
let total = total_installs.load(Ordering::Relaxed);
let first = first_attempts.load(Ordering::Relaxed);
assert_eq!(
total,
u64::from(THREADS) * u64::from(INSTALLS_PER_THREAD),
"all install attempts must be accounted for"
);
#[allow(clippy::cast_precision_loss)]
let ratio = first as f64 / total as f64;
tracing::info!(
bead_id = BEAD_ID,
run_id = RUN_ID,
trace_id = TRACE_ID,
scenario_id = SCENARIO_ID,
total_installs = total,
first_attempts = first,
first_attempt_ratio = ratio,
"chain-head CAS first-attempt ratio stress result"
);
assert!(
ratio >= 0.95,
"bead_id={BEAD_ID} run_id={RUN_ID} trace_id={TRACE_ID} scenario_id={SCENARIO_ID} expected first-attempt ratio >= 0.95, got {ratio:.6}"
);
}
#[test]
fn loom_chain_head_publication_linearizable() {
use loom::sync::Arc;
use loom::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use loom::thread;
loom::model(|| {
const EMPTY: u64 = u64::MAX;
const HEAD_A: u64 = 0x1001;
const HEAD_B: u64 = 0x2002;
let head = Arc::new(AtomicU64::new(EMPTY));
let completions = Arc::new(AtomicUsize::new(0));
let spawn_installer =
|next_head: u64, head: Arc<AtomicU64>, completions: Arc<AtomicUsize>| {
thread::spawn(move || {
loop {
let current = head.load(Ordering::Acquire);
if head
.compare_exchange(
current,
next_head,
Ordering::AcqRel,
Ordering::Acquire,
)
.is_ok()
{
completions.fetch_add(1, Ordering::Release);
break;
}
}
})
};
let thread_a = spawn_installer(HEAD_A, Arc::clone(&head), Arc::clone(&completions));
let thread_b = spawn_installer(HEAD_B, Arc::clone(&head), Arc::clone(&completions));
thread_a.join().expect("loom installer A must join");
thread_b.join().expect("loom installer B must join");
let final_head = head.load(Ordering::Acquire);
assert!(
final_head == HEAD_A || final_head == HEAD_B,
"final head must equal one of the published values"
);
assert_eq!(
completions.load(Ordering::Acquire),
2,
"both installers must eventually complete"
);
});
}
#[test]
fn test_inv2_page_lock_exclusivity() {
let table = InProcessPageLockTable::new();
let page = PageNumber::new(42).unwrap();
let txn_a = TxnId::new(1).unwrap();
let txn_b = TxnId::new(2).unwrap();
assert!(table.try_acquire(page, txn_a).is_ok());
let err = table.try_acquire(page, txn_b);
assert_eq!(err, Err(txn_a), "second txn must see the holder");
assert!(table.try_acquire(page, txn_a).is_ok());
assert!(table.release(page, txn_a));
assert!(table.try_acquire(page, txn_b).is_ok());
}
#[test]
fn test_inv3_version_chain_descending() {
let store = VersionStore::new(PageSize::DEFAULT);
let pgno = PageNumber::new(1).unwrap();
let mut prev_ptr: Option<VersionPointer> = None;
for seq in 1..=5_u64 {
let version = PageVersion {
pgno,
commit_seq: CommitSeq::new(seq),
created_by: TxnToken::new(TxnId::new(seq).unwrap(), TxnEpoch::new(0)),
data: PageData::zeroed(PageSize::DEFAULT),
prev: prev_ptr,
};
let idx = store.publish(version);
prev_ptr = Some(idx_to_version_pointer(idx));
}
let chain = store.walk_chain(pgno);
assert_eq!(chain.len(), 5);
for window in chain.windows(2) {
assert!(
window[0].commit_seq > window[1].commit_seq,
"version chain must be strictly descending: {} <= {}",
window[0].commit_seq.get(),
window[1].commit_seq.get()
);
}
}
#[test]
fn test_inv4_write_set_requires_lock() {
let table = InProcessPageLockTable::new();
let txn_id = TxnId::new(1).unwrap();
let snap = make_snapshot(0);
let mut txn = Transaction::new(txn_id, TxnEpoch::new(0), snap, TransactionMode::Concurrent);
let page = PageNumber::new(10).unwrap();
table.try_acquire(page, txn_id).unwrap();
txn.page_locks.insert(page);
txn.write_set.push(page);
for &p in &txn.write_set {
assert!(
txn.page_locks.contains(&p),
"INV-4 violated: page {p:?} in write_set but not in page_locks"
);
}
}
#[test]
fn test_inv5_deferred_snapshot_provisional() {
let txn_id = TxnId::new(1).unwrap();
let provisional_snap = make_snapshot(0);
let mut txn = Transaction::new(
txn_id,
TxnEpoch::new(0),
provisional_snap,
TransactionMode::Serialized,
);
txn.snapshot_established = false;
assert!(
!txn.snapshot_established,
"DEFERRED snapshot should be provisional"
);
let current_high = CommitSeq::new(5);
txn.snapshot = Snapshot::new(current_high, SchemaEpoch::ZERO);
txn.snapshot_established = true;
assert!(
txn.snapshot_established,
"snapshot should now be established"
);
assert_eq!(txn.snapshot.high, current_high);
let established = txn.snapshot;
assert_eq!(established.high.get(), 5);
}
#[test]
fn test_inv6_commit_atomicity_all_visible_or_none() {
let store = VersionStore::new(PageSize::DEFAULT);
let pages = [1_u32, 2, 3];
for &p in &pages {
let version = make_version(p, 5, None);
store.publish(version);
}
let snap_before = make_snapshot(4);
for &p in &pages {
let pgno = PageNumber::new(p).unwrap();
assert!(
store.resolve(pgno, &snap_before).is_none(),
"page {p} should NOT be visible at snapshot high=4"
);
}
let snap_at = make_snapshot(5);
for &p in &pages {
let pgno = PageNumber::new(p).unwrap();
assert!(
store.resolve(pgno, &snap_at).is_some(),
"page {p} should be visible at snapshot high=5"
);
}
let snap_after = make_snapshot(10);
for &p in &pages {
let pgno = PageNumber::new(p).unwrap();
assert!(
store.resolve(pgno, &snap_after).is_some(),
"page {p} should be visible at snapshot high=10"
);
}
}
#[test]
fn test_inv7_serialized_write_mutex_exclusivity() {
let mutex = SerializedWriteMutex::new();
let txn_a = TxnId::new(1).unwrap();
let txn_b = TxnId::new(2).unwrap();
assert!(mutex.try_acquire(txn_a).is_ok());
assert_eq!(mutex.holder(), Some(txn_a));
assert_eq!(mutex.try_acquire(txn_b), Err(txn_a));
assert!(mutex.try_acquire(txn_a).is_ok());
assert!(mutex.release(txn_a));
assert!(mutex.holder().is_none());
assert!(mutex.try_acquire(txn_b).is_ok());
assert_eq!(mutex.holder(), Some(txn_b));
assert!(mutex.release(txn_b));
}
#[test]
fn test_visible_predicate_committed_within_range() {
let snap = make_snapshot(10);
let v5 = make_version(1, 5, None);
assert!(visible(&v5, &snap));
let v10 = make_version(1, 10, None);
assert!(visible(&v10, &snap));
let v15 = make_version(1, 15, None);
assert!(!visible(&v15, &snap));
let v0 = make_version(1, 0, None);
assert!(!visible(&v0, &snap));
}
#[test]
fn test_resolve_returns_first_visible_from_head() {
let store = VersionStore::new(PageSize::DEFAULT);
let pgno = PageNumber::new(1).unwrap();
let v1 = make_version(1, 1, None);
let idx1 = store.publish(v1);
let v2 = make_version(1, 5, Some(idx_to_version_pointer(idx1)));
let idx2 = store.publish(v2);
let v3 = make_version(1, 10, Some(idx_to_version_pointer(idx2)));
store.publish(v3);
let snap = make_snapshot(7);
let resolved = store.resolve(pgno, &snap).unwrap();
let version = store.get_version(resolved).unwrap();
assert_eq!(
version.commit_seq,
CommitSeq::new(5),
"should resolve to V2 (seq=5)"
);
let snap_at_ten = make_snapshot(10);
let resolved_ten = store.resolve(pgno, &snap_at_ten).unwrap();
let version_ten = store.get_version(resolved_ten).unwrap();
assert_eq!(version_ten.commit_seq, CommitSeq::new(10));
let snap_at_zero = make_snapshot(0);
assert!(store.resolve(pgno, &snap_at_zero).is_none());
}
#[test]
fn test_version_visibility_ranges_track_begin_end_timestamps() {
let store = VersionStore::new(PageSize::DEFAULT);
let v1 = make_version(1, 1, None);
let idx1 = store.publish(v1);
let v2 = make_version(1, 5, Some(idx_to_version_pointer(idx1)));
let idx2 = store.publish(v2);
let v3 = make_version(1, 10, Some(idx_to_version_pointer(idx2)));
let idx3 = store.publish(v3);
let r1 = store.visibility_range(idx1).unwrap();
let r2 = store.visibility_range(idx2).unwrap();
let r3 = store.visibility_range(idx3).unwrap();
assert_eq!(r1.begin_ts, CommitSeq::new(1));
assert_eq!(r1.end_ts, Some(CommitSeq::new(5)));
assert_eq!(r2.begin_ts, CommitSeq::new(5));
assert_eq!(r2.end_ts, Some(CommitSeq::new(10)));
assert_eq!(r3.begin_ts, CommitSeq::new(10));
assert_eq!(r3.end_ts, None);
}
#[test]
fn test_resolve_with_trace_reports_versions_traversed() {
let store = VersionStore::new(PageSize::DEFAULT);
let pgno = PageNumber::new(1).unwrap();
let v1 = make_version(1, 1, None);
let idx1 = store.publish(v1);
let v2 = make_version(1, 5, Some(idx_to_version_pointer(idx1)));
let idx2 = store.publish(v2);
let v3 = make_version(1, 10, Some(idx_to_version_pointer(idx2)));
store.publish(v3);
let trace = store.resolve_with_trace(pgno, &make_snapshot(7));
assert_eq!(
trace
.version_idx
.map(|idx| store.get_version(idx).unwrap().commit_seq),
Some(CommitSeq::new(5))
);
assert_eq!(trace.versions_traversed, 2);
let head_trace = store.resolve_with_trace(pgno, &make_snapshot(10));
assert_eq!(head_trace.versions_traversed, 1);
}
#[test]
fn test_resolve_visible_version_returns_first_visible_from_head() {
let store = VersionStore::new(PageSize::DEFAULT);
let pgno = PageNumber::new(1).unwrap();
let v1 = make_version(1, 1, None);
let idx1 = store.publish(v1);
let v2 = make_version(1, 5, Some(idx_to_version_pointer(idx1)));
let idx2 = store.publish(v2);
let v3 = make_version(1, 10, Some(idx_to_version_pointer(idx2)));
store.publish(v3);
let visible = store
.resolve_visible_version(pgno, &make_snapshot(7))
.expect("snapshot should resolve to the first visible version");
assert_eq!(visible.commit_seq, CommitSeq::new(5));
let latest = store
.resolve_visible_version(pgno, &make_snapshot(10))
.expect("snapshot at chain head should resolve");
assert_eq!(latest.commit_seq, CommitSeq::new(10));
assert_eq!(
store.resolve_visible_commit_seq(pgno, &make_snapshot(7)),
Some(CommitSeq::new(5))
);
assert_eq!(
store.resolve_visible_commit_seq(pgno, &make_snapshot(10)),
Some(CommitSeq::new(10))
);
assert!(
store
.resolve_visible_version(pgno, &make_snapshot(0))
.is_none(),
"snapshot before first commit should not see a version"
);
assert_eq!(
store.resolve_visible_commit_seq(pgno, &make_snapshot(0)),
None
);
}
#[test]
fn test_chain_head_version_returns_latest_version() {
let store = VersionStore::new(PageSize::DEFAULT);
let pgno = PageNumber::new(1).unwrap();
let v1 = make_version(1, 1, None);
let idx1 = store.publish(v1);
let v2 = make_version(1, 5, Some(idx_to_version_pointer(idx1)));
let idx2 = store.publish(v2);
let v3 = make_version(1, 10, Some(idx_to_version_pointer(idx2)));
store.publish(v3);
let head = store
.chain_head_version(pgno)
.expect("published page should have a latest chain head");
assert_eq!(head.commit_seq, CommitSeq::new(10));
}
#[test]
fn test_resolve_for_txn_checks_write_set_first() {
let store = VersionStore::new(PageSize::DEFAULT);
let pgno = PageNumber::new(1).unwrap();
let v1 = make_version(1, 1, None);
store.publish(v1);
let txn_id = TxnId::new(2).unwrap();
let snap = make_snapshot(1);
let mut txn = Transaction::new(txn_id, TxnEpoch::new(0), snap, TransactionMode::Concurrent);
txn.write_set.push(pgno);
let resolved = store.resolve_for_txn(pgno, &txn);
assert!(
resolved.is_some(),
"should resolve even with write_set entry"
);
let other_page = PageNumber::new(99).unwrap();
let resolved_other = store.resolve_for_txn(other_page, &txn);
assert!(resolved_other.is_none(), "page 99 has no versions");
}
#[test]
#[allow(clippy::too_many_lines)]
fn test_worked_example_5txn_scenario() {
let mgr = TxnManager::default();
let store = VersionStore::new(PageSize::DEFAULT);
let lock_table = InProcessPageLockTable::new();
let p1 = PageNumber::new(1).unwrap();
let t1_id = mgr.alloc_txn_id().unwrap();
let snap0 = make_snapshot(0);
let mut t1 = Transaction::new(t1_id, TxnEpoch::new(0), snap0, TransactionMode::Concurrent);
let t2_id = mgr.alloc_txn_id().unwrap();
let mut t2 = Transaction::new(t2_id, TxnEpoch::new(0), snap0, TransactionMode::Concurrent);
lock_table.try_acquire(p1, t1_id).unwrap();
t1.page_locks.insert(p1);
t1.write_set.push(p1);
let t3_id = mgr.alloc_txn_id().unwrap();
let mut t3 = Transaction::new(t3_id, TxnEpoch::new(0), snap0, TransactionMode::Concurrent);
let seq1 = mgr.alloc_commit_seq();
assert_eq!(seq1.get(), 1);
let v1 = PageVersion {
pgno: p1,
commit_seq: seq1,
created_by: t1.token(),
data: PageData::zeroed(PageSize::DEFAULT),
prev: None,
};
store.publish(v1);
lock_table.release_all(t1_id);
t1.commit();
lock_table.try_acquire(p1, t2_id).unwrap();
t2.page_locks.insert(p1);
t2.write_set.push(p1);
let t4_id = mgr.alloc_txn_id().unwrap();
let snap1 = make_snapshot(1);
let t4 = Transaction::new(t4_id, TxnEpoch::new(0), snap1, TransactionMode::Concurrent);
let base = store.resolve(
p1,
&Snapshot::new(CommitSeq::new(u64::MAX), SchemaEpoch::ZERO),
);
let base_version = store.get_version(base.unwrap()).unwrap();
let fcw_fail_t2 = base_version.commit_seq.get() > t2.snapshot.high.get();
assert!(
fcw_fail_t2,
"T2 must fail FCW: base seq=1 > snapshot high=0"
);
lock_table.release_all(t2_id);
t2.abort();
assert_eq!(t2.state, TransactionState::Aborted);
let t5_id = mgr.alloc_txn_id().unwrap();
let mut t5 = Transaction::new(t5_id, TxnEpoch::new(0), snap1, TransactionMode::Concurrent);
lock_table.try_acquire(p1, t3_id).unwrap();
t3.page_locks.insert(p1);
t3.write_set.push(p1);
let fcw_fail_t3 = base_version.commit_seq.get() > t3.snapshot.high.get();
assert!(
fcw_fail_t3,
"T3 must fail FCW: base seq=1 > snapshot high=0"
);
lock_table.release_all(t3_id);
t3.abort();
assert_eq!(t3.state, TransactionState::Aborted);
lock_table.try_acquire(p1, t5_id).unwrap();
t5.page_locks.insert(p1);
t5.write_set.push(p1);
let fcw_pass_t5 = base_version.commit_seq.get() <= t5.snapshot.high.get();
assert!(
fcw_pass_t5,
"T5 must pass FCW: base seq=1 <= snapshot high=1"
);
let seq2 = mgr.alloc_commit_seq();
assert_eq!(seq2.get(), 2);
let head_idx = store.chain_head(p1).unwrap();
let v2 = PageVersion {
pgno: p1,
commit_seq: seq2,
created_by: t5.token(),
data: PageData::zeroed(PageSize::DEFAULT),
prev: Some(idx_to_version_pointer(head_idx)),
};
store.publish(v2);
lock_table.release_all(t5_id);
t5.commit();
let snap_t2 = make_snapshot(0);
assert!(store.resolve(p1, &snap_t2).is_none());
let resolved_t4 = store.resolve(p1, &t4.snapshot).unwrap();
let ver_t4 = store.get_version(resolved_t4).unwrap();
assert_eq!(ver_t4.commit_seq.get(), 1, "T4 should see V1");
let resolved_t5_before = store.resolve(p1, &snap1).unwrap();
let ver_t5 = store.get_version(resolved_t5_before).unwrap();
assert_eq!(
ver_t5.commit_seq.get(),
1,
"T5 should see V1 at snap high=1"
);
let snap2 = make_snapshot(2);
let resolved_snap2 = store.resolve(p1, &snap2).unwrap();
let ver_snap2 = store.get_version(resolved_snap2).unwrap();
assert_eq!(
ver_snap2.commit_seq.get(),
2,
"snapshot high=2 should see V2"
);
let chain = store.walk_chain(p1);
assert_eq!(chain.len(), 2, "should have 2 committed versions");
assert_eq!(chain[0].commit_seq.get(), 2, "head should be V2");
assert_eq!(chain[1].commit_seq.get(), 1, "tail should be V1");
}
#[test]
fn test_theorem4_gc_never_removes_needed_version() {
let store = VersionStore::new(PageSize::DEFAULT);
let pgno = PageNumber::new(7).unwrap();
let v1 = make_version(7, 1, None);
let idx1 = store.publish(v1);
let v2 = make_version(7, 5, Some(idx_to_version_pointer(idx1)));
let idx2 = store.publish(v2);
let v3 = make_version(7, 10, Some(idx_to_version_pointer(idx2)));
let idx3 = store.publish(v3);
let active_snap = make_snapshot(7);
let visible_idx = store.resolve(pgno, &active_snap).unwrap();
let visible = store.get_version(visible_idx).unwrap();
assert_eq!(
visible.commit_seq.get(),
5,
"active snapshot must keep V2 reachable"
);
let gc_horizon = CommitSeq::new(7);
let v1_ref = store.get_version(idx1).unwrap();
let v2_ref = store.get_version(idx2).unwrap();
let v3_ref = store.get_version(idx3).unwrap();
let v1_reclaimable = v1_ref.commit_seq < gc_horizon && v2_ref.commit_seq <= gc_horizon;
let v2_reclaimable = v2_ref.commit_seq < gc_horizon && v3_ref.commit_seq <= gc_horizon;
let v3_reclaimable = v3_ref.commit_seq < gc_horizon;
assert!(v1_reclaimable, "V1 should be reclaimable");
assert!(!v2_reclaimable, "V2 must be retained for snapshot high=7");
assert!(!v3_reclaimable, "head version is never reclaimable here");
}
#[test]
fn test_theorem5_version_chain_bounded_by_rd_plus_1() {
let store = VersionStore::new(PageSize::DEFAULT);
let pgno = PageNumber::new(11).unwrap();
let commit_rate_per_sec = 100_u64;
let max_txn_duration_secs = 1_u64;
let bound = commit_rate_per_sec * max_txn_duration_secs + 1;
let mut prev: Option<VersionPointer> = None;
for seq in 1..=bound {
let version = PageVersion {
pgno,
commit_seq: CommitSeq::new(seq),
created_by: TxnToken::new(TxnId::new(seq).unwrap(), TxnEpoch::new(0)),
data: PageData::zeroed(PageSize::DEFAULT),
prev,
};
let idx = store.publish(version);
prev = Some(idx_to_version_pointer(idx));
}
let chain = store.walk_chain(pgno);
assert_eq!(
chain.len(),
usize::try_from(bound).unwrap(),
"version chain should respect R*D+1 bound in bounded workload"
);
}
#[test]
fn test_theorem4_no_active_txns_gc_all_but_latest() {
let store = VersionStore::new(PageSize::DEFAULT);
let pgno = PageNumber::new(12).unwrap();
let mut prev: Option<VersionPointer> = None;
for seq in 1_u64..=3 {
let version = PageVersion {
pgno,
commit_seq: CommitSeq::new(seq),
created_by: TxnToken::new(TxnId::new(seq).unwrap(), TxnEpoch::new(0)),
data: PageData::zeroed(PageSize::DEFAULT),
prev,
};
let idx = store.publish(version);
prev = Some(idx_to_version_pointer(idx));
}
let horizon = CommitSeq::new(3);
let chain = store.walk_chain(pgno); assert_eq!(chain.len(), 3);
assert_eq!(chain[0].commit_seq, CommitSeq::new(3));
let reclaimable = chain
.windows(2)
.filter(|pair| pair[1].commit_seq < horizon && pair[0].commit_seq <= horizon)
.count();
assert_eq!(reclaimable, 2, "older versions should be reclaimable");
}
#[test]
fn test_theorem4_gc_horizon_min_active_snapshot() {
let active_highs = [CommitSeq::new(10), CommitSeq::new(20), CommitSeq::new(30)];
let safe_gc_seq = active_highs.iter().copied().min().unwrap();
assert_eq!(
safe_gc_seq,
CommitSeq::new(10),
"gc horizon must track min active snapshot.high"
);
}
#[test]
fn test_theorem4_reclaimability_predicate() {
let store = VersionStore::new(PageSize::DEFAULT);
let v1 = make_version(13, 3, None);
let idx1 = store.publish(v1);
let v2 = make_version(13, 5, Some(idx_to_version_pointer(idx1)));
let idx2 = store.publish(v2);
let v3 = make_version(13, 9, Some(idx_to_version_pointer(idx2)));
let idx3 = store.publish(v3);
let horizon = CommitSeq::new(7);
let v1_ref = store.get_version(idx1).unwrap();
let v2_ref = store.get_version(idx2).unwrap();
let v3_ref = store.get_version(idx3).unwrap();
let v1_reclaimable = v1_ref.commit_seq < horizon && v2_ref.commit_seq <= horizon;
let v2_reclaimable = v2_ref.commit_seq < horizon && v3_ref.commit_seq <= horizon;
assert!(v1_reclaimable, "V1 should satisfy reclaimability predicate");
assert!(
!v2_reclaimable,
"V2 must be retained because newer V3 is beyond horizon"
);
}
#[test]
fn test_theorem5_version_chain_bounded() {
test_theorem5_version_chain_bounded_by_rd_plus_1();
}
#[test]
fn test_theorem5_gc_prunes_old_versions() {
let store = VersionStore::new(PageSize::DEFAULT);
let pgno = PageNumber::new(14).unwrap();
let mut prev: Option<VersionPointer> = None;
for seq in 1_u64..=32 {
let version = PageVersion {
pgno,
commit_seq: CommitSeq::new(seq),
created_by: TxnToken::new(TxnId::new(seq).unwrap(), TxnEpoch::new(0)),
data: PageData::zeroed(PageSize::DEFAULT),
prev,
};
let idx = store.publish(version);
prev = Some(idx_to_version_pointer(idx));
}
let chain = store.walk_chain(pgno);
let horizon = CommitSeq::new(32);
let reclaimable = chain
.iter()
.skip(1)
.filter(|version| version.commit_seq <= horizon)
.count();
assert_eq!(reclaimable, 31, "all non-head versions are reclaimable");
assert_eq!(chain[0].commit_seq, CommitSeq::new(32));
}
#[test]
fn test_prune_page_chain_eager_enqueues_slots_for_ebr_recycling() {
let store = VersionStore::new(PageSize::DEFAULT);
let pgno = PageNumber::new(16).unwrap();
store.publish(make_version(16, 1, None));
store.publish(make_version(16, 2, None));
store.publish(make_version(16, 3, None));
assert_eq!(store.chain_length(pgno), 3);
assert_eq!(store.pending_recycle_count(), 0);
let freed = store.prune_page_chain_eager(pgno, CommitSeq::new(2));
assert_eq!(freed, 1, "horizon=2 should prune only the oldest version");
assert_eq!(store.chain_length(pgno), 2);
assert_eq!(
store.pending_recycle_count(),
1,
"eager prune must queue retired slots for later EBR recycling"
);
assert_eq!(store.try_recycle_retired_slots(1), 0);
assert_eq!(store.try_recycle_retired_slots(2), 1);
assert_eq!(store.pending_recycle_count(), 0);
}
proptest! {
#[test]
fn prop_gc_safety_holds(horizon in 1_u64..40_u64) {
let store = VersionStore::new(PageSize::DEFAULT);
let pgno = PageNumber::new(15).unwrap();
let mut prev: Option<VersionPointer> = None;
for seq in 1_u64..=horizon + 2 {
let version = PageVersion {
pgno,
commit_seq: CommitSeq::new(seq),
created_by: TxnToken::new(TxnId::new(seq).unwrap(), TxnEpoch::new(0)),
data: PageData::zeroed(PageSize::DEFAULT),
prev,
};
let idx = store.publish(version);
prev = Some(idx_to_version_pointer(idx));
}
let active_snapshot = make_snapshot(horizon);
let visible_idx = store.resolve(pgno, &active_snapshot).expect("visible version must exist");
let visible = store.get_version(visible_idx).expect("arena lookup must succeed");
prop_assert_eq!(visible.commit_seq, CommitSeq::new(horizon));
let visible_reclaimable = visible.commit_seq < active_snapshot.high;
prop_assert!(!visible_reclaimable);
}
}
#[test]
fn test_e2e_invariants_under_concurrent_schedule() {
let mgr = TxnManager::default();
let store = VersionStore::new(PageSize::DEFAULT);
let lock_table = InProcessPageLockTable::new();
let write_mutex = SerializedWriteMutex::new();
let mut committed_ids = Vec::new();
for i in 1..=10_u64 {
let id = mgr.alloc_txn_id().unwrap();
let snap = make_snapshot(i.saturating_sub(1));
let mode = if i % 3 == 0 {
TransactionMode::Serialized
} else {
TransactionMode::Concurrent
};
let mut txn = Transaction::new(id, TxnEpoch::new(0), snap, mode);
let pgno = PageNumber::new(u32::try_from(i).unwrap()).unwrap();
if txn.mode == TransactionMode::Serialized {
write_mutex.try_acquire(txn.txn_id).unwrap();
txn.serialized_write_lock_held = true;
}
lock_table.try_acquire(pgno, txn.txn_id).unwrap();
txn.page_locks.insert(pgno);
txn.write_set.push(pgno);
for &p in &txn.write_set {
assert!(txn.page_locks.contains(&p), "INV-4 violated for {p:?}");
}
let seq = mgr.alloc_commit_seq();
let version = PageVersion {
pgno,
commit_seq: seq,
created_by: txn.token(),
data: PageData::zeroed(PageSize::DEFAULT),
prev: None,
};
store.publish(version);
lock_table.release_all(txn.txn_id);
if txn.serialized_write_lock_held {
write_mutex.release(txn.txn_id);
txn.serialized_write_lock_held = false;
}
txn.commit();
committed_ids.push(txn.txn_id.get());
}
for window in committed_ids.windows(2) {
assert!(window[0] < window[1], "INV-1: TxnIds must be increasing");
}
assert_eq!(lock_table.lock_count(), 0, "all locks must be released");
let snap_all = make_snapshot(10);
for i in 1..=10_u32 {
let pgno = PageNumber::new(i).unwrap();
assert!(
store.resolve(pgno, &snap_all).is_some(),
"INV-6: page {} must be visible at high=10",
i
);
}
assert!(
write_mutex.holder().is_none(),
"INV-7: mutex must be released"
);
}
#[test]
fn test_e2e_version_resolve_allocation_free() {
const BEAD_22N8: &str = "bd-22n.8";
let store = VersionStore::new(PageSize::DEFAULT);
let p1 = PageNumber::new(1).unwrap();
let v1 = make_version(1, 1, None);
let idx1 = store.publish(v1);
let v2 = make_version(1, 3, Some(idx_to_version_pointer(idx1)));
let idx2 = store.publish(v2);
let v3 = make_version(1, 5, Some(idx_to_version_pointer(idx2)));
store.publish(v3);
let snap = make_snapshot(4);
let first_idx = store.resolve(p1, &snap).unwrap();
for round in 0..100u32 {
let idx = store.resolve(p1, &snap).unwrap();
assert_eq!(
idx, first_idx,
"bead_id={BEAD_22N8} case=e2e_version_resolve_stable \
round={round} resolve must return same VersionIdx"
);
}
let resolved = store.get_version(first_idx).unwrap();
assert_eq!(
resolved.commit_seq,
CommitSeq::new(3),
"bead_id={BEAD_22N8} case=e2e_resolved_correct_version"
);
let snap5 = make_snapshot(5);
let idx5 = store.resolve(p1, &snap5).unwrap();
let v5_resolved = store.get_version(idx5).unwrap();
assert_eq!(
v5_resolved.commit_seq,
CommitSeq::new(5),
"bead_id={BEAD_22N8} case=e2e_latest_version_resolved"
);
let snap0 = make_snapshot(0);
assert!(
store.resolve(p1, &snap0).is_none(),
"bead_id={BEAD_22N8} case=e2e_no_visible_version_at_zero"
);
}
proptest! {
#[test]
fn prop_visible_uncommitted_never_visible(
high in 0_u64..1_000_000,
) {
let snap = make_snapshot(high);
let uncommitted = make_version(1, 0, None);
prop_assert!(!visible(&uncommitted, &snap), "uncommitted (seq=0) must never be visible");
}
#[test]
fn prop_visible_committed_iff_in_range(
seq in 1_u64..1_000_000,
high in 0_u64..1_000_000,
) {
let snap = make_snapshot(high);
let version = make_version(1, seq, None);
let expected = seq <= high;
prop_assert_eq!(
visible(&version, &snap),
expected,
"visible(seq={}, high={}) should be {}", seq, high, expected
);
}
#[test]
fn prop_txn_manager_ids_unique(
count in 1_usize..500,
) {
let mgr = TxnManager::default();
let mut ids = Vec::with_capacity(count);
for _ in 0..count {
ids.push(mgr.alloc_txn_id().unwrap().get());
}
let mut deduped = ids.clone();
deduped.sort_unstable();
deduped.dedup();
prop_assert_eq!(ids.len(), deduped.len(), "all TxnIds must be unique");
}
#[test]
fn prop_snapshot_isolation_multi_writer(
num_writers in 2_usize..8,
ops_per_writer in 1_usize..20,
num_pages in 1_u32..16,
snapshot_highs in proptest::collection::vec(0_u64..100, 1..10),
) {
let store = VersionStore::new(PageSize::DEFAULT);
let mgr = TxnManager::default();
let mut page_history: std::collections::BTreeMap<u32, Vec<u64>> =
std::collections::BTreeMap::new();
for _ in 0..num_writers {
let txn_id = mgr.alloc_txn_id().unwrap();
let seq = mgr.alloc_commit_seq();
for _ in 0..ops_per_writer {
#[allow(clippy::cast_possible_truncation)] let pgno_raw =
((seq.get() * 7 + txn_id.get() * 3) % u64::from(num_pages)) as u32 + 1;
let pgno = PageNumber::new(pgno_raw).unwrap();
let prev = store.chain_head(pgno).map(idx_to_version_pointer);
let version = PageVersion {
pgno,
commit_seq: seq,
created_by: TxnToken::new(txn_id, TxnEpoch::new(0)),
data: PageData::zeroed(PageSize::DEFAULT),
prev,
};
store.publish(version);
page_history.entry(pgno_raw).or_default().push(seq.get());
}
}
for seqs in page_history.values_mut() {
seqs.sort_unstable();
}
for &high in &snapshot_highs {
let snap = make_snapshot(high);
for (&pgno_raw, seqs) in &page_history {
let pgno = PageNumber::new(pgno_raw).unwrap();
let expected_seq = seqs.iter().copied().filter(|&s| s <= high).max();
let resolved = store.resolve(pgno, &snap);
match (expected_seq, resolved) {
(None, None) => {} (Some(exp), Some(idx)) => {
let v = store.get_version(idx).unwrap();
prop_assert!(
v.commit_seq.get() <= high,
"page {} resolved commit_seq {} > snapshot high {}",
pgno_raw, v.commit_seq.get(), high
);
prop_assert_eq!(
v.commit_seq.get(), exp,
"page {} expected seq {} but got {}",
pgno_raw, exp, v.commit_seq.get()
);
}
(Some(exp), None) => {
prop_assert!(
false,
"page {} expected visible at seq {} but resolve returned None",
pgno_raw, exp
);
}
(None, Some(idx)) => {
let v = store.get_version(idx).unwrap();
prop_assert!(
false,
"page {} expected invisible but resolved to seq {}",
pgno_raw, v.commit_seq.get()
);
}
}
let resolved2 = store.resolve(pgno, &snap);
prop_assert_eq!(
resolved, resolved2,
"resolve must be deterministic for page {} at high {}",
pgno_raw, high
);
}
}
}
#[test]
fn prop_version_chain_strictly_descending(
chain_len in 2_usize..30,
) {
let store = VersionStore::new(PageSize::DEFAULT);
let pgno = PageNumber::new(99).unwrap();
let mut prev: Option<VersionPointer> = None;
for seq in 1..=chain_len as u64 {
let version = PageVersion {
pgno,
commit_seq: CommitSeq::new(seq),
created_by: TxnToken::new(TxnId::new(seq).unwrap(), TxnEpoch::new(0)),
data: PageData::zeroed(PageSize::DEFAULT),
prev,
};
let idx = store.publish(version);
prev = Some(idx_to_version_pointer(idx));
}
let chain = store.walk_chain(pgno);
prop_assert!(chain.len() >= 2, "chain too short: {}", chain.len());
for window in chain.windows(2) {
prop_assert!(
window[0].commit_seq > window[1].commit_seq,
"version chain not strictly descending: {} >= {}",
window[0].commit_seq.get(),
window[1].commit_seq.get()
);
}
}
}
}