use std::collections::{BTreeMap, HashMap, btree_map::Entry as BTreeEntry};
use std::sync::{
Arc,
atomic::{AtomicU64, Ordering},
};
use std::time::Duration;
use fsqlite_types::sync_primitives::Mutex;
use fsqlite_types::{
CommitSeq, MergePageKind, PageData, PageNumber, PageSize, PageVersion, SchemaEpoch, Snapshot,
TxnEpoch, TxnId, TxnToken,
};
use fsqlite_wal::DEFAULT_RAPTORQ_REPAIR_SYMBOLS;
use crate::cache_aligned::{logical_now_epoch_secs, logical_now_millis};
use crate::cell_visibility::CellVisibilityLog;
use crate::core_types::{
CommitIndex, InProcessPageLockTable, Transaction, TransactionMode, TransactionState,
};
use crate::ebr::{GLOBAL_EBR_METRICS, VersionGuardRegistry, VersionGuardTicket};
use crate::invariants::{SerializedWriteMutex, TxnManager, VersionStore};
use crate::observability::{mvcc_snapshot_established, mvcc_snapshot_released};
use crate::shm::SharedMemoryLayout;
const DEFAULT_BUSY_TIMEOUT_MS: u64 = 100;
const DEFAULT_SERIALIZED_WRITER_LEASE_SECS: u64 = 30;
const DEFAULT_MAX_CHAIN_LENGTH: usize = 64;
const DEFAULT_CHAIN_LENGTH_WARNING: usize = 32;
const PROACTIVE_COMPACT_THRESHOLD: usize = 8;
const SOFT_BOUND_MULTIPLIER: usize = 4;
const NO_GC_HORIZON: u64 = u64::MAX;
const PID_BIRTH_PROCFS_TAG: u64 = 1_u64 << 63;
static NEXT_CONN_ID: AtomicU64 = AtomicU64::new(1);
#[cfg(unix)]
fn read_proc_start_time_ticks(pid: u32) -> Option<u64> {
let stat_path = std::path::Path::new("/proc")
.join(pid.to_string())
.join("stat");
let stat = std::fs::read_to_string(stat_path).ok()?;
let comm_end = stat.rfind(')')?;
let tail = stat.get(comm_end + 1..)?.trim_start();
tail.split_whitespace().nth(19)?.parse::<u64>().ok()
}
fn current_process_birth_token(now_fallback: u64) -> u64 {
#[cfg(unix)]
{
if !std::path::Path::new("/proc").exists() {
return now_fallback;
}
if let Some(start_ticks) = read_proc_start_time_ticks(std::process::id()) {
return PID_BIRTH_PROCFS_TAG | (start_ticks & !PID_BIRTH_PROCFS_TAG);
}
now_fallback
}
#[cfg(not(unix))]
{
now_fallback
}
}
fn process_alive_os(pid: u32, pid_birth: u64) -> bool {
#[cfg(unix)]
{
if pid == 0 {
return false;
}
if !std::path::Path::new("/proc").exists() {
return true;
}
let proc_dir = std::path::Path::new("/proc").join(pid.to_string());
if !proc_dir.exists() {
return false;
}
if pid_birth & PID_BIRTH_PROCFS_TAG == 0 {
return true;
}
let expected_ticks = pid_birth & !PID_BIRTH_PROCFS_TAG;
read_proc_start_time_ticks(pid).is_some_and(|start_ticks| start_ticks == expected_ticks)
}
#[cfg(not(unix))]
{
let _ = (pid, pid_birth);
true
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum BeginKind {
Deferred,
Immediate,
Exclusive,
Concurrent,
}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash)]
pub enum WriteMergePolicy {
Off,
#[default]
Safe,
LabUnsafe,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum MergeDecision {
AbortRetry,
IntentReplay,
StructuredPatch,
RawXorLab,
}
#[must_use]
pub const fn raw_xor_merge_allowed(
policy: WriteMergePolicy,
page_kind: MergePageKind,
debug_build: bool,
) -> bool {
if page_kind.is_sqlite_structured() {
return false;
}
matches!(policy, WriteMergePolicy::LabUnsafe) && debug_build
}
#[must_use]
pub const fn merge_decision(
policy: WriteMergePolicy,
page_kind: MergePageKind,
debug_build: bool,
) -> MergeDecision {
match policy {
WriteMergePolicy::Off => MergeDecision::AbortRetry,
WriteMergePolicy::Safe => {
if page_kind.is_sqlite_structured() {
MergeDecision::IntentReplay
} else {
MergeDecision::StructuredPatch
}
}
WriteMergePolicy::LabUnsafe => {
if raw_xor_merge_allowed(policy, page_kind, debug_build) {
MergeDecision::RawXorLab
} else if page_kind.is_sqlite_structured() {
MergeDecision::IntentReplay
} else {
MergeDecision::StructuredPatch
}
}
}
}
#[must_use]
pub fn gf256_patch_delta(base: &[u8], target: &[u8]) -> Option<Vec<u8>> {
if base.len() != target.len() {
return None;
}
Some(
base.iter()
.zip(target)
.map(|(lhs, rhs)| lhs ^ rhs)
.collect(),
)
}
#[must_use]
pub fn gf256_patches_disjoint(delta_a: &[u8], delta_b: &[u8]) -> bool {
delta_a.len() == delta_b.len()
&& delta_a
.iter()
.zip(delta_b)
.all(|(lhs, rhs)| (*lhs == 0) || (*rhs == 0))
}
#[must_use]
pub fn compose_disjoint_gf256_patches(
base: &[u8],
delta_a: &[u8],
delta_b: &[u8],
) -> Option<Vec<u8>> {
if base.len() != delta_a.len() || base.len() != delta_b.len() {
return None;
}
if !gf256_patches_disjoint(delta_a, delta_b) {
return None;
}
Some(
base.iter()
.zip(delta_a)
.zip(delta_b)
.map(|((base_byte, delta_a_byte), delta_b_byte)| {
base_byte ^ delta_a_byte ^ delta_b_byte
})
.collect(),
)
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CommitResponse {
Ok(CommitSeq),
Conflict(Vec<PageNumber>, CommitSeq),
Aborted(MvccError),
IoError,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum MvccError {
Busy,
BusySnapshot,
Schema,
IoErr,
InvalidState,
TxnIdExhausted,
ShmTooSmall,
ShmBadMagic,
ShmVersionMismatch,
ShmInvalidPageSize,
ShmChecksumMismatch,
InvalidWriteMergePolicy,
TxnMaxDurationExceeded,
}
impl std::fmt::Display for MvccError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Busy => write!(f, "SQLITE_BUSY"),
Self::BusySnapshot => write!(f, "SQLITE_BUSY_SNAPSHOT"),
Self::Schema => write!(f, "SQLITE_SCHEMA"),
Self::IoErr => write!(f, "SQLITE_IOERR"),
Self::InvalidState => write!(f, "invalid transaction state"),
Self::TxnIdExhausted => write!(f, "TxnId space exhausted"),
Self::ShmTooSmall => write!(f, "SHM buffer too small"),
Self::ShmBadMagic => write!(f, "SHM bad magic"),
Self::ShmVersionMismatch => write!(f, "SHM version mismatch"),
Self::ShmInvalidPageSize => write!(f, "SHM invalid page size"),
Self::ShmChecksumMismatch => write!(f, "SHM checksum mismatch"),
Self::InvalidWriteMergePolicy => write!(f, "invalid write-merge policy"),
Self::TxnMaxDurationExceeded => write!(f, "transaction exceeded max duration"),
}
}
}
impl std::error::Error for MvccError {}
#[derive(Debug)]
pub struct Savepoint {
pub name: String,
pub write_set_snapshot:
Arc<HashMap<PageNumber, PageData, fsqlite_types::PageNumberBuildHasher>>,
pub write_set_len: usize,
}
pub struct TransactionManager {
txn_manager: TxnManager,
version_store: VersionStore,
lock_table: InProcessPageLockTable,
write_mutex: SerializedWriteMutex,
shm: SharedMemoryLayout,
commit_index: CommitIndex,
conn_id: u64,
schema_epoch: SchemaEpoch,
write_merge_policy: WriteMergePolicy,
ssi_enabled: bool,
raptorq_repair_symbols: u8,
busy_timeout_ms: u64,
serialized_writer_lease_secs: u64,
txn_max_duration_ms: u64,
version_guard_registry: Arc<VersionGuardRegistry>,
max_chain_length: usize,
chain_length_warning: usize,
chain_ewma_x256: AtomicU64,
active_snapshot_highs: Mutex<HashMap<TxnId, CommitSeq>>,
active_snapshot_high_counts: Mutex<BTreeMap<CommitSeq, usize>>,
cached_gc_horizon: AtomicU64,
cell_log: CellVisibilityLog,
}
impl TransactionManager {
#[must_use]
pub fn new(page_size: PageSize) -> Self {
let version_guard_registry = Arc::new(VersionGuardRegistry::default());
Self {
txn_manager: TxnManager::default(),
version_store: VersionStore::new_with_guard_registry(
page_size,
Arc::clone(&version_guard_registry),
),
lock_table: InProcessPageLockTable::new(),
write_mutex: SerializedWriteMutex::new(),
shm: SharedMemoryLayout::new(page_size, 128),
commit_index: CommitIndex::new(),
conn_id: NEXT_CONN_ID.fetch_add(1, Ordering::Relaxed),
schema_epoch: SchemaEpoch::ZERO,
write_merge_policy: WriteMergePolicy::default(),
ssi_enabled: true,
raptorq_repair_symbols: DEFAULT_RAPTORQ_REPAIR_SYMBOLS,
busy_timeout_ms: DEFAULT_BUSY_TIMEOUT_MS,
serialized_writer_lease_secs: DEFAULT_SERIALIZED_WRITER_LEASE_SECS,
txn_max_duration_ms: 5_000,
version_guard_registry,
max_chain_length: DEFAULT_MAX_CHAIN_LENGTH,
chain_length_warning: DEFAULT_CHAIN_LENGTH_WARNING,
chain_ewma_x256: AtomicU64::new(256),
active_snapshot_highs: Mutex::new(HashMap::new()),
active_snapshot_high_counts: Mutex::new(BTreeMap::new()),
cached_gc_horizon: AtomicU64::new(NO_GC_HORIZON),
cell_log: CellVisibilityLog::new(25 * 1024 * 1024),
}
}
#[must_use]
pub fn version_guard_registry(&self) -> &Arc<VersionGuardRegistry> {
&self.version_guard_registry
}
#[must_use]
pub fn cell_log(&self) -> &CellVisibilityLog {
&self.cell_log
}
#[must_use]
pub const fn conn_id(&self) -> u64 {
self.conn_id
}
#[must_use]
pub const fn busy_timeout_ms(&self) -> u64 {
self.busy_timeout_ms
}
pub fn set_busy_timeout_ms(&mut self, busy_timeout_ms: u64) {
self.busy_timeout_ms = busy_timeout_ms;
}
#[must_use]
pub const fn write_merge_policy(&self) -> WriteMergePolicy {
self.write_merge_policy
}
pub fn set_write_merge_policy(&mut self, policy: WriteMergePolicy) -> Result<(), MvccError> {
if matches!(policy, WriteMergePolicy::LabUnsafe) && !cfg!(debug_assertions) {
return Err(MvccError::InvalidWriteMergePolicy);
}
self.write_merge_policy = policy;
Ok(())
}
#[must_use]
pub const fn ssi_enabled(&self) -> bool {
self.ssi_enabled
}
pub fn set_ssi_enabled(&mut self, enabled: bool) {
let old = self.ssi_enabled;
self.ssi_enabled = enabled;
tracing::debug!(
conn_id = self.conn_id,
old_value = old,
new_value = enabled,
"PRAGMA fsqlite.serializable changed"
);
}
#[must_use]
pub const fn raptorq_repair_symbols(&self) -> u8 {
self.raptorq_repair_symbols
}
pub fn set_raptorq_repair_symbols(&mut self, value: u8) {
let old = self.raptorq_repair_symbols;
self.raptorq_repair_symbols = value;
tracing::debug!(
conn_id = self.conn_id,
old_value = old,
new_value = value,
"PRAGMA raptorq_repair_symbols changed"
);
}
#[must_use]
pub const fn txn_max_duration_ms(&self) -> u64 {
self.txn_max_duration_ms
}
pub fn set_txn_max_duration_ms(&mut self, max_duration_ms: u64) {
self.txn_max_duration_ms = max_duration_ms.max(1);
}
#[must_use]
pub const fn max_chain_length(&self) -> usize {
self.max_chain_length
}
pub fn set_max_chain_length(&mut self, max_chain_length: usize) {
self.max_chain_length = max_chain_length.max(1);
if self.chain_length_warning > self.max_chain_length {
self.chain_length_warning = self.max_chain_length;
}
}
#[must_use]
pub const fn chain_length_warning(&self) -> usize {
self.chain_length_warning
}
pub fn set_chain_length_warning(&mut self, chain_length_warning: usize) {
self.chain_length_warning = chain_length_warning.clamp(1, self.max_chain_length);
}
#[must_use]
pub fn cached_gc_horizon(&self) -> Option<CommitSeq> {
let raw = self.cached_gc_horizon.load(Ordering::Acquire);
if raw == NO_GC_HORIZON {
None
} else {
Some(CommitSeq::new(raw))
}
}
pub fn begin(&self, kind: BeginKind) -> Result<Transaction, MvccError> {
let txn_id = self
.txn_manager
.alloc_txn_id()
.ok_or(MvccError::TxnIdExhausted)?;
let mode = if kind == BeginKind::Concurrent {
TransactionMode::Concurrent
} else {
TransactionMode::Serialized
};
let snapshot = self.load_consistent_snapshot();
let snapshot_established = kind != BeginKind::Deferred;
let mut txn = Transaction::new(txn_id, TxnEpoch::new(0), snapshot, mode);
txn.version_guard = Some(VersionGuardTicket::register(Arc::clone(
&self.version_guard_registry,
)));
txn.snapshot_established = snapshot_established;
if snapshot_established {
mvcc_snapshot_established();
self.register_active_snapshot(txn_id, snapshot.high);
}
txn.ssi_enabled_at_begin = self.ssi_enabled;
if kind == BeginKind::Immediate || kind == BeginKind::Exclusive {
self.acquire_serialized_writer_exclusion(txn_id)?;
txn.serialized_write_lock_held = true;
}
tracing::info!(
conn_id = self.conn_id,
txn_id = %txn_id,
?kind,
?mode,
snapshot_high = snapshot.high.get(),
snapshot_established,
"transaction begun"
);
Ok(txn)
}
pub fn read_page(&self, txn: &mut Transaction, pgno: PageNumber) -> Option<PageData> {
assert_eq!(
txn.state,
TransactionState::Active,
"can only read in active transactions"
);
if self.ensure_txn_within_max_duration(txn).is_err() {
return None;
}
if let Some(data) = txn.write_set_data.get(&pgno).cloned() {
let tracked_version = txn
.write_version_for_page(pgno)
.and_then(|entry| entry.new_version.or(entry.old_version))
.unwrap_or(txn.snapshot.high);
txn.record_page_read(pgno, tracked_version);
return Some(data);
}
if txn.mode == TransactionMode::Serialized && !txn.snapshot_established {
txn.snapshot = self.load_consistent_snapshot();
txn.snapshot_established = true;
mvcc_snapshot_established();
self.register_active_snapshot(txn.txn_id, txn.snapshot.high);
tracing::debug!(
txn_id = %txn.txn_id,
snapshot_high = txn.snapshot.high.get(),
"deferred snapshot established on read"
);
}
let version = self
.version_store
.resolve_visible_version(pgno, &txn.snapshot)?;
txn.record_page_read(pgno, version.commit_seq);
Some(version.data)
}
pub fn record_range_scan(&self, txn: &mut Transaction, leaf_pages: &[PageNumber]) {
assert_eq!(
txn.state,
TransactionState::Active,
"can only record range scan in active transactions"
);
let fallback_version = txn.snapshot.high;
for &page in leaf_pages {
let version = self
.resolve_visible_commit_seq(txn, page)
.unwrap_or(fallback_version);
txn.record_page_read(page, version);
}
}
#[must_use]
pub fn read_page_range(
&self,
txn: &mut Transaction,
start_page: PageNumber,
end_page: PageNumber,
) -> Vec<(PageNumber, Option<PageData>)> {
if start_page.get() > end_page.get() {
return Vec::new();
}
let mut visible_pages = Vec::new();
for raw_page in start_page.get()..=end_page.get() {
if let Some(page) = PageNumber::new(raw_page) {
let page_data = self.read_page(txn, page);
if page_data.is_none() {
txn.record_page_read(page, txn.snapshot.high);
}
visible_pages.push((page, page_data));
}
}
visible_pages
}
fn register_active_snapshot(&self, txn_id: TxnId, snapshot_high: CommitSeq) {
let mut active = self.active_snapshot_highs.lock();
let mut snapshot_counts = self.active_snapshot_high_counts.lock();
if let Some(previous_high) = active.insert(txn_id, snapshot_high) {
decrement_active_snapshot_refcount(&mut snapshot_counts, previous_high);
}
increment_active_snapshot_refcount(&mut snapshot_counts, snapshot_high);
self.cached_gc_horizon.store(
active_snapshot_horizon_raw(&snapshot_counts),
Ordering::Release,
);
}
fn unregister_active_snapshot(&self, txn_id: TxnId) {
let mut active = self.active_snapshot_highs.lock();
let mut snapshot_counts = self.active_snapshot_high_counts.lock();
if let Some(snapshot_high) = active.remove(&txn_id) {
decrement_active_snapshot_refcount(&mut snapshot_counts, snapshot_high);
}
self.cached_gc_horizon.store(
active_snapshot_horizon_raw(&snapshot_counts),
Ordering::Release,
);
}
fn eager_gc_horizon(&self) -> CommitSeq {
if let Some(cached) = self.cached_gc_horizon() {
return cached;
}
CommitSeq::new(self.txn_manager.current_commit_counter().saturating_sub(1))
}
#[allow(clippy::unused_self)]
fn record_chain_length_sample(&self, chain_len: usize) {
let sample = u64::try_from(chain_len).unwrap_or(u64::MAX);
GLOBAL_EBR_METRICS.record_chain_length_sample(sample);
let sample_x256 = chain_len.min(1024) as u64 * 256;
let _ = self
.chain_ewma_x256
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |old| {
Some((old * 7 + sample_x256) / 8)
});
}
#[must_use]
fn adaptive_compact_threshold(&self) -> usize {
let ewma_x256 = self.chain_ewma_x256.load(Ordering::Relaxed);
let ewma = (ewma_x256 / 256) as usize;
let upper_bound = (self.max_chain_length / 2).max(1);
let lower_bound = PROACTIVE_COMPACT_THRESHOLD.min(upper_bound);
let threshold = ewma.saturating_mul(2).clamp(lower_bound, upper_bound);
threshold
}
fn enforce_chain_bound_for_page(&self, pgno: PageNumber) -> Result<(), MvccError> {
let mut chain_len = self.version_store.chain_length(pgno);
self.record_chain_length_sample(chain_len);
if chain_len >= self.chain_length_warning {
tracing::warn!(
pgno = pgno.get(),
chain_len,
warning_threshold = self.chain_length_warning,
max_chain_length = self.max_chain_length,
"MVCC version chain length warning threshold crossed"
);
}
if chain_len < self.max_chain_length {
return Ok(());
}
let horizon = self.eager_gc_horizon();
let freed = self.version_store.prune_page_chain_eager(pgno, horizon);
if freed > 0 {
let freed_u64 = u64::try_from(freed).unwrap_or(u64::MAX);
GLOBAL_EBR_METRICS.record_gc_freed(freed_u64);
tracing::debug!(
pgno = pgno.get(),
freed,
gc_horizon = horizon.get(),
"opportunistic_prune"
);
}
chain_len = if freed > 0 {
chain_len.saturating_sub(freed)
} else {
self.version_store.chain_length(pgno)
};
self.record_chain_length_sample(chain_len);
if chain_len < self.max_chain_length {
return Ok(());
}
let hard_limit = self.max_chain_length.saturating_mul(SOFT_BOUND_MULTIPLIER);
if chain_len < hard_limit {
tracing::warn!(
pgno = pgno.get(),
chain_len,
soft_threshold = self.max_chain_length,
hard_limit,
gc_horizon = horizon.get(),
"chain_soft_bound_exceeded_proceeding"
);
return Ok(());
}
GLOBAL_EBR_METRICS.record_gc_blocked();
tracing::error!(
pgno = pgno.get(),
chain_len,
soft_threshold = self.max_chain_length,
hard_limit,
gc_horizon = horizon.get(),
"MVCC chain hard limit exceeded — gc_horizon pinned by long-running txn"
);
Err(MvccError::Busy)
}
pub fn write_page(
&self,
txn: &mut Transaction,
pgno: PageNumber,
data: PageData,
) -> Result<(), MvccError> {
assert_eq!(
txn.state,
TransactionState::Active,
"can only write in active transactions"
);
self.ensure_txn_within_max_duration(txn)?;
if txn.mode == TransactionMode::Serialized {
self.write_page_serialized(txn, pgno, data)?;
} else {
self.write_page_concurrent(txn, pgno, data)?;
}
Ok(())
}
pub fn commit(&self, txn: &mut Transaction) -> Result<CommitSeq, MvccError> {
if txn.state != TransactionState::Active {
tracing::error!(
txn_id = %txn.txn_id,
state = ?txn.state,
"lock protocol violation: commit attempted on non-active transaction"
);
return Err(MvccError::InvalidState);
}
self.ensure_txn_within_max_duration(txn)?;
if self.schema_epoch != txn.snapshot.schema_epoch {
self.abort(txn);
return Err(MvccError::Schema);
}
if txn.write_set.is_empty() && txn.write_set_data.is_empty() {
txn.commit();
self.release_all_resources(txn);
return Ok(CommitSeq::ZERO);
}
if txn.mode == TransactionMode::Serialized {
self.commit_serialized(txn)
} else {
self.commit_concurrent(txn)
}
}
pub fn abort(&self, txn: &mut Transaction) {
if txn.state != TransactionState::Active {
return; }
self.cell_log.rollback_txn(txn.token());
txn.clear_structural_pages();
txn.abort();
self.release_all_resources(txn);
tracing::info!(
txn_id = %txn.txn_id,
?txn.mode,
"transaction aborted"
);
}
#[must_use]
pub fn savepoint(txn: &Transaction, name: &str) -> Savepoint {
assert_eq!(
txn.state,
TransactionState::Active,
"can only create savepoints in active transactions"
);
Savepoint {
name: name.to_owned(),
write_set_snapshot: txn.write_set_data.clone(),
write_set_len: txn.write_set.len(),
}
}
pub fn rollback_to_savepoint(txn: &mut Transaction, savepoint: &Savepoint) {
assert_eq!(
txn.state,
TransactionState::Active,
"can only rollback to savepoint in active transactions"
);
txn.write_set_data = savepoint.write_set_snapshot.clone();
txn.write_set.truncate(savepoint.write_set_len);
txn.write_set_versions
.retain(|pgno, _| txn.write_set_data.contains_key(pgno));
tracing::debug!(
txn_id = %txn.txn_id,
savepoint = %savepoint.name,
"rolled back to savepoint"
);
}
#[must_use]
pub fn version_store(&self) -> &VersionStore {
&self.version_store
}
#[must_use]
pub fn commit_index(&self) -> &CommitIndex {
&self.commit_index
}
#[must_use]
pub fn lock_table(&self) -> &InProcessPageLockTable {
&self.lock_table
}
#[must_use]
pub fn write_mutex(&self) -> &SerializedWriteMutex {
&self.write_mutex
}
pub fn advance_schema_epoch(&mut self) {
self.schema_epoch = SchemaEpoch::new(self.schema_epoch.get() + 1);
}
fn load_consistent_snapshot(&self) -> Snapshot {
let counter = self.txn_manager.current_commit_counter();
let high = CommitSeq::new(counter.saturating_sub(1));
Snapshot::new(high, self.schema_epoch)
}
fn write_page_serialized(
&self,
txn: &mut Transaction,
pgno: PageNumber,
data: PageData,
) -> Result<(), MvccError> {
if !txn.serialized_write_lock_held {
self.acquire_serialized_writer_exclusion(txn.txn_id)?;
let snap_now = self.load_consistent_snapshot();
if txn.snapshot_established {
if snap_now.schema_epoch != txn.snapshot.schema_epoch {
self.release_serialized_writer_exclusion(txn.txn_id);
return Err(MvccError::Schema);
}
if snap_now.high != txn.snapshot.high {
self.release_serialized_writer_exclusion(txn.txn_id);
return Err(MvccError::BusySnapshot);
}
}
let had_snapshot = txn.snapshot_established;
txn.snapshot = snap_now;
txn.snapshot_established = true;
txn.serialized_write_lock_held = true;
if !had_snapshot {
mvcc_snapshot_established();
self.register_active_snapshot(txn.txn_id, txn.snapshot.high);
}
tracing::debug!(
txn_id = %txn.txn_id,
"serialized deferred upgrade: mutex acquired"
);
}
txn.record_page_write(pgno, self.resolve_visible_commit_seq(txn, pgno));
let is_new_page = !txn.write_set_data.contains_key(&pgno);
if is_new_page {
txn.write_set.push(pgno);
}
Arc::make_mut(&mut txn.write_set_data).insert(pgno, data);
txn.structural_pages.insert(pgno);
Ok(())
}
fn write_page_concurrent(
&self,
txn: &mut Transaction,
pgno: PageNumber,
data: PageData,
) -> Result<(), MvccError> {
if self
.shm
.check_serialized_writer_exclusion(logical_now_epoch_secs(), process_alive_os)
.is_err()
{
return Err(MvccError::Busy);
}
self.lock_table
.try_acquire(pgno, txn.txn_id)
.map_err(|_| MvccError::Busy)?;
let newly_locked = txn.page_locks.insert(pgno);
if newly_locked {
tracing::debug!(
txn_id = %txn.txn_id,
pgno = pgno.get(),
"concurrent: page lock acquired"
);
}
txn.record_page_write(pgno, self.resolve_visible_commit_seq(txn, pgno));
if !txn.write_set_data.contains_key(&pgno) {
txn.write_set.push(pgno);
}
Arc::make_mut(&mut txn.write_set_data).insert(pgno, data);
txn.structural_pages.insert(pgno);
Ok(())
}
fn commit_serialized(&self, txn: &mut Transaction) -> Result<CommitSeq, MvccError> {
for &pgno in &txn.write_set {
if let Some(latest) = self.commit_index.latest(pgno) {
if latest > txn.snapshot.high {
self.abort(txn);
return Err(MvccError::BusySnapshot);
}
}
}
let commit_seq = match self.publish_write_set(txn) {
Ok(commit_seq) => commit_seq,
Err(err) => {
self.abort(txn);
return Err(err);
}
};
self.txn_manager.finish_commit_seq(commit_seq);
txn.commit();
self.release_all_resources(txn);
tracing::info!(
txn_id = %txn.txn_id,
commit_seq = commit_seq.get(),
"serialized commit succeeded"
);
Ok(commit_seq)
}
fn commit_concurrent(&self, txn: &mut Transaction) -> Result<CommitSeq, MvccError> {
if txn.ssi_enabled_at_begin && txn.has_dangerous_structure() {
tracing::info!(
conn_id = self.conn_id,
txn_id = %txn.txn_id,
has_in_rw = txn.has_in_rw,
has_out_rw = txn.has_out_rw,
"SSI abort: dangerous structure detected"
);
self.abort(txn);
return Err(MvccError::BusySnapshot);
}
let mut conflicts = smallvec::SmallVec::<[PageNumber; 8]>::new();
for &pgno in &txn.write_set {
if let Some(latest) = self.commit_index.latest(pgno) {
if latest > txn.snapshot.high {
conflicts.push(pgno);
}
}
}
let mut rebased = false;
for pgno in conflicts {
let page_kind = {
let data = txn.write_set_data.get(&pgno);
data.map_or(MergePageKind::Opaque, |page| {
MergePageKind::classify(page.as_bytes())
})
};
let decision =
merge_decision(self.write_merge_policy, page_kind, cfg!(debug_assertions));
match decision {
MergeDecision::AbortRetry => {
tracing::info!(
txn_id = %txn.txn_id,
pgno = pgno.get(),
?decision,
"FCW conflict: merge policy is Off, aborting"
);
self.abort(txn);
return Err(MvccError::BusySnapshot);
}
MergeDecision::IntentReplay
| MergeDecision::StructuredPatch
| MergeDecision::RawXorLab => {
if self.try_structured_rebase_page(txn, pgno, page_kind) {
tracing::info!(
txn_id = %txn.txn_id,
pgno = pgno.get(),
?page_kind,
?decision,
"FCW conflict resolved via structured rebase"
);
rebased = true;
} else if page_kind == MergePageKind::Opaque && self.try_rebase_page(txn, pgno)
{
tracing::info!(
txn_id = %txn.txn_id,
pgno = pgno.get(),
?page_kind,
?decision,
"FCW conflict resolved via raw disjoint rebase fallback"
);
rebased = true;
} else {
tracing::info!(
txn_id = %txn.txn_id,
pgno = pgno.get(),
?page_kind,
?decision,
"FCW conflict: structured rebase failed, aborting"
);
self.abort(txn);
return Err(MvccError::BusySnapshot);
}
}
}
}
if txn.ssi_enabled_at_begin && rebased && txn.has_dangerous_structure() {
tracing::info!(
conn_id = self.conn_id,
txn_id = %txn.txn_id,
has_in_rw = txn.has_in_rw,
has_out_rw = txn.has_out_rw,
"SSI abort after rebase: dangerous structure detected"
);
self.abort(txn);
return Err(MvccError::BusySnapshot);
}
let commit_seq = match self.publish_write_set(txn) {
Ok(commit_seq) => commit_seq,
Err(err) => {
self.abort(txn);
return Err(err);
}
};
self.txn_manager.finish_commit_seq(commit_seq);
txn.commit();
self.release_all_resources(txn);
tracing::info!(
txn_id = %txn.txn_id,
commit_seq = commit_seq.get(),
rebased,
"concurrent commit succeeded"
);
Ok(commit_seq)
}
fn try_rebase_page(&self, txn: &mut Transaction, pgno: PageNumber) -> bool {
let base_data = match self
.version_store
.resolve_visible_version(pgno, &txn.snapshot)
{
Some(version) => version.data,
None => {
return false;
}
};
let (latest_data, latest_seq) = match self.version_store.chain_head_version(pgno) {
Some(version) => (version.data.clone(), version.commit_seq),
None => return false,
};
let ours = match txn.write_set_data.get(&pgno) {
Some(data) => data.clone(),
None => return false,
};
let Some(delta_ours) = gf256_patch_delta(base_data.as_bytes(), ours.as_bytes()) else {
return false;
};
let Some(delta_theirs) = gf256_patch_delta(base_data.as_bytes(), latest_data.as_bytes())
else {
return false;
};
match compose_disjoint_gf256_patches(base_data.as_bytes(), &delta_ours, &delta_theirs) {
Some(merged) => {
Arc::make_mut(&mut txn.write_set_data).insert(pgno, PageData::from_vec(merged));
txn.record_page_read(pgno, latest_seq);
if let Some(entry) = txn.write_set_versions.get_mut(&pgno) {
entry.old_version = Some(latest_seq);
}
true
}
None => false,
}
}
fn try_structured_rebase_page(
&self,
txn: &mut Transaction,
pgno: PageNumber,
page_kind: MergePageKind,
) -> bool {
let base_data = match self
.version_store
.resolve_visible_version(pgno, &txn.snapshot)
{
Some(version) => version.data,
None => return false,
};
let (latest_data, latest_seq) = match self.version_store.chain_head_version(pgno) {
Some(version) => (version.data.clone(), version.commit_seq),
None => return false,
};
let ours = match txn.write_set_data.get(&pgno) {
Some(data) => data.clone(),
None => return false,
};
let btree_ref = fsqlite_types::BtreeRef::Table(fsqlite_types::TableId::new(0));
let result = crate::physical_merge::evaluate_merge_ladder(
self.write_merge_policy,
base_data.as_bytes(),
latest_data.as_bytes(),
ours.as_bytes(),
self.shm.page_size(),
0,
pgno.get() == 1,
page_kind,
btree_ref,
txn.snapshot.schema_epoch.get(),
self.schema_epoch.get(),
Some(&txn.intent_log),
None,
None,
);
match result {
Ok(crate::physical_merge::MergeLadderResult::StructuredMergeSucceeded {
merged_page,
}) => {
Arc::make_mut(&mut txn.write_set_data)
.insert(pgno, PageData::from_vec(merged_page));
txn.record_page_read(pgno, latest_seq);
if let Some(entry) = txn.write_set_versions.get_mut(&pgno) {
entry.old_version = Some(latest_seq);
}
true
}
Ok(crate::physical_merge::MergeLadderResult::NoConflict) => {
txn.record_page_read(pgno, latest_seq);
if let Some(entry) = txn.write_set_versions.get_mut(&pgno) {
entry.old_version = Some(latest_seq);
}
true
}
_ => false,
}
}
fn publish_write_set(&self, txn: &mut Transaction) -> Result<CommitSeq, MvccError> {
let pages: Vec<PageNumber> = txn.write_set.iter().copied().collect();
for &pgno in &pages {
self.enforce_chain_bound_for_page(pgno)?;
}
let commit_seq = self.txn_manager.alloc_commit_seq();
let txn_token = TxnToken::new(txn.txn_id, txn.txn_epoch);
let mut data_map = Arc::try_unwrap(std::mem::take(&mut txn.write_set_data))
.unwrap_or_else(|arc| (*arc).clone());
for &pgno in &pages {
if let Some(data) = data_map.remove(&pgno) {
let version = PageVersion {
pgno,
commit_seq,
created_by: txn_token,
data,
prev: None,
};
self.version_store.publish(version);
}
self.commit_index.update(pgno, commit_seq);
txn.mark_page_write_committed(pgno, commit_seq);
}
self.cell_log.commit_txn(txn_token, commit_seq);
let horizon = self.eager_gc_horizon();
let compact_threshold = self.adaptive_compact_threshold();
for pgno in pages.iter().take(16) {
let chain_len = self.version_store.chain_length(*pgno);
self.record_chain_length_sample(chain_len);
if chain_len > compact_threshold {
let freed = self.version_store.prune_page_chain_eager(*pgno, horizon);
if freed > 0 {
let freed_u64 = u64::try_from(freed).unwrap_or(u64::MAX);
GLOBAL_EBR_METRICS.record_gc_freed(freed_u64);
}
}
}
txn.clear_structural_pages();
Ok(commit_seq)
}
fn release_all_resources(&self, txn: &mut Transaction) {
if txn.snapshot_established {
mvcc_snapshot_released();
self.unregister_active_snapshot(txn.txn_id);
txn.snapshot_established = false;
}
self.lock_table
.release_set(txn.page_locks.drain(), txn.txn_id);
txn.clear_page_access_tracking();
if txn.serialized_write_lock_held {
self.release_serialized_writer_exclusion(txn.txn_id);
txn.serialized_write_lock_held = false;
}
drop(txn.version_guard.take());
}
fn resolve_visible_commit_seq(&self, txn: &Transaction, pgno: PageNumber) -> Option<CommitSeq> {
self.version_store
.resolve_visible_commit_seq(pgno, &txn.snapshot)
}
fn ensure_txn_within_max_duration(&self, txn: &mut Transaction) -> Result<(), MvccError> {
let now_ms = logical_now_millis();
let elapsed_ms = now_ms.saturating_sub(txn.started_at_ms);
if elapsed_ms > self.txn_max_duration_ms {
self.abort(txn);
tracing::warn!(
txn_id = %txn.txn_id,
elapsed_ms,
max_duration_ms = self.txn_max_duration_ms,
"transaction exceeded max duration and was aborted"
);
return Err(MvccError::TxnMaxDurationExceeded);
}
Ok(())
}
fn acquire_serialized_writer_exclusion(&self, txn_id: TxnId) -> Result<(), MvccError> {
self.write_mutex.try_acquire(txn_id).map_err(|_holder| {
tracing::warn!(
txn_id = %txn_id,
"serialized writer acquisition failed: mutex held"
);
MvccError::Busy
})?;
let now = logical_now_epoch_secs();
let pid = std::process::id();
let pid_birth = current_process_birth_token(now);
let lease_expiry = now.saturating_add(self.serialized_writer_lease_secs);
if self
.shm
.check_serialized_writer_exclusion(now, process_alive_os)
.is_err()
{
self.write_mutex.release(txn_id);
return Err(MvccError::Busy);
}
if !self
.shm
.acquire_serialized_writer(txn_id.get(), pid, pid_birth, lease_expiry)
{
tracing::warn!(
txn_id = %txn_id,
"serialized writer acquisition failed: indicator already set"
);
self.write_mutex.release(txn_id);
return Err(MvccError::Busy);
}
tracing::info!(
txn_id = %txn_id,
pid,
pid_birth,
lease_expiry,
"serialized writer exclusion published"
);
if let Err(err) = self.drain_concurrent_writers_via_lock_table_scan(txn_id) {
self.release_serialized_writer_exclusion(txn_id);
return Err(err);
}
Ok(())
}
fn drain_concurrent_writers_via_lock_table_scan(&self, txn_id: TxnId) -> Result<(), MvccError> {
let mut elapsed_ms = 0_u64;
let mut remaining_budget_ms = self.busy_timeout_ms;
let mut last_remaining = usize::MAX;
loop {
let remaining = self.lock_table.total_lock_count();
if remaining == 0 {
tracing::debug!(
txn_id = %txn_id,
elapsed_ms,
"serialized writer drain complete"
);
return Ok(());
}
if remaining_budget_ms == 0 {
tracing::warn!(
txn_id = %txn_id,
remaining,
elapsed_ms,
busy_timeout_ms = self.busy_timeout_ms,
"serialized writer drain timed out; returning SQLITE_BUSY"
);
return Err(MvccError::Busy);
}
if remaining != last_remaining {
last_remaining = remaining;
tracing::debug!(
txn_id = %txn_id,
remaining,
elapsed_ms,
"serialized writer drain progress"
);
}
std::thread::sleep(Duration::from_millis(1));
elapsed_ms = elapsed_ms.saturating_add(1);
remaining_budget_ms = remaining_budget_ms.saturating_sub(1);
}
}
fn release_serialized_writer_exclusion(&self, txn_id: TxnId) {
let writer_txn_id_raw = txn_id.get();
if !self.shm.release_serialized_writer(writer_txn_id_raw) {
tracing::error!(
txn_id = %txn_id,
writer_txn_id_raw,
"lock protocol violation: serialized writer indicator release failed (writer txn id mismatch)"
);
}
if !self.write_mutex.release(txn_id) {
tracing::error!(
txn_id = %txn_id,
"lock protocol violation: serialized write mutex release failed (not held by txn)"
);
}
}
}
fn increment_active_snapshot_refcount(
snapshot_counts: &mut BTreeMap<CommitSeq, usize>,
snapshot_high: CommitSeq,
) {
*snapshot_counts.entry(snapshot_high).or_insert(0) += 1;
}
fn decrement_active_snapshot_refcount(
snapshot_counts: &mut BTreeMap<CommitSeq, usize>,
snapshot_high: CommitSeq,
) {
match snapshot_counts.entry(snapshot_high) {
BTreeEntry::Occupied(mut entry) => {
let count = entry.get_mut();
if *count > 1 {
*count -= 1;
} else {
entry.remove();
}
}
BTreeEntry::Vacant(_) => {}
}
}
fn active_snapshot_horizon_raw(snapshot_counts: &BTreeMap<CommitSeq, usize>) -> u64 {
snapshot_counts
.first_key_value()
.map_or(NO_GC_HORIZON, |(snapshot_high, _)| snapshot_high.get())
}
impl std::fmt::Debug for TransactionManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TransactionManager")
.field("conn_id", &self.conn_id)
.field("schema_epoch", &self.schema_epoch)
.field("write_merge_policy", &self.write_merge_policy)
.field("ssi_enabled", &self.ssi_enabled)
.field("txn_max_duration_ms", &self.txn_max_duration_ms)
.field("max_chain_length", &self.max_chain_length)
.field("chain_length_warning", &self.chain_length_warning)
.field("cached_gc_horizon", &self.cached_gc_horizon())
.field(
"current_commit_counter",
&self.txn_manager.current_commit_counter(),
)
.finish_non_exhaustive()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ebr::GLOBAL_EBR_METRICS;
use std::hint::black_box;
use std::io;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::mpsc;
use std::time::{Duration, Instant};
use fsqlite_types::TxnId;
use proptest::prelude::*;
fn mgr() -> TransactionManager {
let mut m = TransactionManager::new(PageSize::DEFAULT);
m.set_txn_max_duration_ms(u64::MAX);
m
}
fn mgr_with_busy_timeout_ms(busy_timeout_ms: u64) -> TransactionManager {
let mut m = TransactionManager::new(PageSize::DEFAULT);
m.set_busy_timeout_ms(busy_timeout_ms);
m.set_txn_max_duration_ms(u64::MAX);
m
}
fn test_data(byte: u8) -> PageData {
let mut data = PageData::zeroed(PageSize::DEFAULT);
data.as_bytes_mut()[0] = byte;
data
}
fn test_i64(v: i64) -> PageData {
let mut data = PageData::zeroed(PageSize::DEFAULT);
data.as_bytes_mut()[..8].copy_from_slice(&v.to_le_bytes());
data
}
fn decode_i64(data: &PageData) -> i64 {
let mut bytes = [0_u8; 8];
bytes.copy_from_slice(&data.as_bytes()[..8]);
i64::from_le_bytes(bytes)
}
#[derive(Clone)]
struct BufMakeWriter(Arc<Mutex<Vec<u8>>>);
impl<'a> tracing_subscriber::fmt::MakeWriter<'a> for BufMakeWriter {
type Writer = BufWriter;
fn make_writer(&'a self) -> Self::Writer {
BufWriter(Arc::clone(&self.0))
}
}
struct BufWriter(Arc<Mutex<Vec<u8>>>);
impl io::Write for BufWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let mut guard = self.0.lock().expect("log buffer lock");
guard.extend_from_slice(buf);
drop(guard);
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
fn with_tracing_capture<F, R>(f: F) -> (R, String)
where
F: FnOnce() -> R,
{
let buf = Arc::new(Mutex::new(Vec::new()));
let subscriber = tracing_subscriber::fmt()
.with_ansi(false)
.with_max_level(tracing::Level::DEBUG)
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::FULL)
.with_writer(BufMakeWriter(Arc::clone(&buf)))
.finish();
let result = tracing::subscriber::with_default(subscriber, f);
let bytes = buf.lock().expect("log buffer lock").clone();
(result, String::from_utf8_lossy(&bytes).to_string())
}
#[test]
fn test_process_alive_os_current_pid_is_alive() {
assert!(process_alive_os(
std::process::id(),
logical_now_epoch_secs()
));
}
#[cfg(unix)]
#[test]
fn test_process_alive_os_rejects_out_of_range_pid() {
let invalid_pid = u32::try_from(i32::MAX)
.expect("i32::MAX must fit u32")
.saturating_add(1);
assert!(!process_alive_os(invalid_pid, logical_now_epoch_secs()));
}
#[test]
fn test_snapshot_read_span_and_metrics() {
let m = mgr();
let pgno = PageNumber::new(44).unwrap();
let mut writer = m.begin(BeginKind::Immediate).unwrap();
m.write_page(&mut writer, pgno, test_data(0xA5)).unwrap();
assert!(m.commit(&mut writer).is_ok());
let before = crate::observability::mvcc_snapshot_metrics_snapshot();
let mut reader = m.begin(BeginKind::Deferred).unwrap();
let (read, logs) = with_tracing_capture(|| m.read_page(&mut reader, pgno));
assert!(read.is_some());
let after = crate::observability::mvcc_snapshot_metrics_snapshot();
assert!(after.versions_traversed_samples > before.versions_traversed_samples);
assert!(after.versions_traversed_sum > before.versions_traversed_sum);
assert!(after.fsqlite_mvcc_active_snapshots >= 1);
if !logs.contains("snapshot_read") {
eprintln!(
"[WARN] tracing capture missed 'snapshot_read' span \
(parallel test interference); metrics validation passed"
);
}
m.abort(&mut reader);
}
#[test]
fn test_begin_allocates_txn_id_cas() {
let m = mgr();
let mut ids = Vec::new();
for _ in 0..100 {
let txn = m.begin(BeginKind::Deferred).unwrap();
let raw = txn.txn_id.get();
assert_ne!(raw, 0, "TxnId must never be zero");
assert!(raw <= TxnId::MAX_RAW, "TxnId must fit in 62 bits");
ids.push(raw);
}
let mut sorted = ids.clone();
sorted.sort_unstable();
sorted.dedup();
assert_eq!(sorted.len(), ids.len(), "all TxnIds must be unique");
for window in ids.windows(2) {
assert!(window[0] < window[1], "TxnIds must be strictly increasing");
}
}
#[test]
fn test_begin_deferred_no_snapshot_until_first_read() {
let m = mgr();
let mut txn = m.begin(BeginKind::Deferred).unwrap();
assert!(
!txn.snapshot_established,
"deferred should not establish snapshot at BEGIN"
);
assert_eq!(txn.mode, TransactionMode::Serialized);
let _ = m.read_page(&mut txn, PageNumber::new(1).unwrap());
assert!(
txn.snapshot_established,
"snapshot should be established after first read"
);
}
#[test]
fn test_begin_immediate_acquires_exclusion() {
let m = mgr();
let txn1 = m.begin(BeginKind::Immediate).unwrap();
assert!(
txn1.serialized_write_lock_held,
"IMMEDIATE should acquire mutex at BEGIN"
);
assert_eq!(m.write_mutex().holder(), Some(txn1.txn_id));
let result = m.begin(BeginKind::Immediate);
assert_eq!(result.unwrap_err(), MvccError::Busy);
}
#[test]
fn test_begin_exclusive_acquires_exclusion() {
let m = mgr();
let txn = m.begin(BeginKind::Exclusive).unwrap();
assert!(txn.serialized_write_lock_held);
assert_eq!(m.write_mutex().holder(), Some(txn.txn_id));
}
#[test]
fn test_begin_concurrent_no_exclusion() {
let m = mgr();
let txn1 = m.begin(BeginKind::Concurrent).unwrap();
let txn2 = m.begin(BeginKind::Concurrent).unwrap();
assert_eq!(txn1.mode, TransactionMode::Concurrent);
assert_eq!(txn2.mode, TransactionMode::Concurrent);
assert!(!txn1.serialized_write_lock_held);
assert!(!txn2.serialized_write_lock_held);
assert!(m.write_mutex().holder().is_none());
}
#[test]
fn test_read_checks_write_set_first() {
let m = mgr();
let mut txn = m.begin(BeginKind::Immediate).unwrap();
let pgno = PageNumber::new(1).unwrap();
let data = test_data(0xAB);
m.write_page(&mut txn, pgno, data).unwrap();
let read_data = m.read_page(&mut txn, pgno).unwrap();
assert_eq!(read_data.as_bytes()[0], 0xAB);
}
#[test]
fn test_read_establishes_deferred_snapshot() {
let m = mgr();
let mut txn = m.begin(BeginKind::Deferred).unwrap();
assert!(!txn.snapshot_established);
let _ = m.read_page(&mut txn, PageNumber::new(1).unwrap());
assert!(txn.snapshot_established);
let snap_high = txn.snapshot.high;
let _ = m.read_page(&mut txn, PageNumber::new(2).unwrap());
assert_eq!(
txn.snapshot.high, snap_high,
"snapshot must not change after establishment"
);
}
#[test]
fn test_read_visibility_correct() {
let m = mgr();
let mut txn1 = m.begin(BeginKind::Immediate).unwrap();
let pgno = PageNumber::new(1).unwrap();
m.write_page(&mut txn1, pgno, test_data(0x01)).unwrap();
let seq = m.commit(&mut txn1).unwrap();
assert!(seq.get() > 0);
let mut txn2 = m.begin(BeginKind::Concurrent).unwrap();
let data = m.read_page(&mut txn2, pgno);
assert!(data.is_some(), "committed data should be visible");
assert_eq!(data.unwrap().as_bytes()[0], 0x01);
}
#[test]
fn test_read_tracks_visible_version_and_witness_key() {
let m = mgr();
let pgno = PageNumber::new(11).unwrap();
let mut writer = m.begin(BeginKind::Immediate).unwrap();
m.write_page(&mut writer, pgno, test_data(0x22)).unwrap();
let committed = m.commit(&mut writer).unwrap();
let mut reader = m.begin(BeginKind::Concurrent).unwrap();
let data = m.read_page(&mut reader, pgno).unwrap();
assert_eq!(data.as_bytes()[0], 0x22);
assert_eq!(reader.read_version_for_page(pgno), Some(committed));
assert!(
reader
.read_keys
.contains(&fsqlite_types::WitnessKey::Page(pgno)),
"page reads must populate SSI witness keys"
);
}
#[test]
fn test_record_range_scan_tracks_all_pages() {
let m = mgr();
let p1 = PageNumber::new(12).unwrap();
let p2 = PageNumber::new(13).unwrap();
let mut seed = m.begin(BeginKind::Immediate).unwrap();
m.write_page(&mut seed, p1, test_data(0x31)).unwrap();
m.write_page(&mut seed, p2, test_data(0x32)).unwrap();
let committed = m.commit(&mut seed).unwrap();
let mut reader = m.begin(BeginKind::Concurrent).unwrap();
m.record_range_scan(&mut reader, &[p1, p2]);
assert_eq!(reader.read_version_for_page(p1), Some(committed));
assert_eq!(reader.read_version_for_page(p2), Some(committed));
assert!(
reader
.read_keys
.contains(&fsqlite_types::WitnessKey::Page(p1))
);
assert!(
reader
.read_keys
.contains(&fsqlite_types::WitnessKey::Page(p2))
);
}
#[test]
fn test_read_page_range_tracks_predicate_coverage_including_empty_pages() {
let m = mgr();
let p20 = PageNumber::new(20).unwrap();
let p21 = PageNumber::new(21).unwrap();
let p22 = PageNumber::new(22).unwrap();
let p23 = PageNumber::new(23).unwrap();
let mut seed = m.begin(BeginKind::Immediate).unwrap();
m.write_page(&mut seed, p20, test_data(0x41)).unwrap();
m.write_page(&mut seed, p22, test_data(0x42)).unwrap();
let committed = m.commit(&mut seed).unwrap();
let mut reader = m.begin(BeginKind::Concurrent).unwrap();
let scanned = m.read_page_range(&mut reader, p20, p23);
assert_eq!(scanned.len(), 4);
assert_eq!(
scanned
.iter()
.filter_map(|(page, data)| data.as_ref().map(|_| page.get()))
.collect::<Vec<_>>(),
vec![20, 22]
);
for page in [p20, p21, p22, p23] {
assert!(
reader
.read_keys
.contains(&fsqlite_types::WitnessKey::Page(page)),
"range read must register page witness for scanned page {}",
page.get()
);
assert!(
reader.read_set_maybe_contains(page),
"range read-set membership must include scanned page {}",
page.get()
);
assert_eq!(
reader.read_version_for_page(page),
Some(committed),
"scanned page {} should record visible version",
page.get()
);
}
}
#[test]
fn test_read_page_range_inverted_bounds_is_noop() {
let m = mgr();
let mut reader = m.begin(BeginKind::Concurrent).unwrap();
let p30 = PageNumber::new(30).unwrap();
let p25 = PageNumber::new(25).unwrap();
let scanned = m.read_page_range(&mut reader, p30, p25);
assert!(scanned.is_empty());
assert!(reader.read_set_versions.is_empty());
assert!(reader.read_keys.is_empty());
}
#[test]
fn test_read_page_range_deferred_uses_established_snapshot_for_empty_pages() {
let m = mgr();
let p40 = PageNumber::new(40).unwrap();
let p41 = PageNumber::new(41).unwrap();
let mut reader = m.begin(BeginKind::Deferred).unwrap();
let begin_snapshot = reader.snapshot.high;
assert!(
!reader.snapshot_established,
"deferred txn should not establish snapshot at BEGIN"
);
let mut seed = m.begin(BeginKind::Immediate).unwrap();
m.write_page(&mut seed, p40, test_data(0x5A)).unwrap();
let committed = m.commit(&mut seed).unwrap();
assert!(
committed > begin_snapshot,
"intervening commit must advance snapshot high for this regression"
);
let scanned = m.read_page_range(&mut reader, p40, p41);
assert_eq!(scanned.len(), 2);
assert!(reader.snapshot_established);
assert_eq!(reader.read_version_for_page(p40), Some(committed));
assert_eq!(
reader.read_version_for_page(p41),
Some(committed),
"empty pages in deferred range scans must use the established snapshot high"
);
}
fn read_page_range_without_tracking(
manager: &TransactionManager,
txn: &mut Transaction,
start_page: PageNumber,
end_page: PageNumber,
) -> Vec<(PageNumber, Option<PageData>)> {
if start_page.get() > end_page.get() {
return Vec::new();
}
let mut visible_pages = Vec::new();
for raw_page in start_page.get()..=end_page.get() {
if let Some(page) = PageNumber::new(raw_page) {
visible_pages.push((page, manager.read_page(txn, page)));
}
}
visible_pages
}
fn consume_scan_rows(rows: &[(PageNumber, Option<PageData>)]) -> u64 {
let mut checksum = 0_u64;
for (_, page_data) in rows {
if let Some(page_data) = page_data {
for byte in page_data.as_bytes() {
checksum = checksum.wrapping_add(u64::from(*byte));
}
}
}
checksum
}
#[test]
#[allow(clippy::cast_precision_loss)]
fn test_range_scan_tracking_overhead_under_five_percent() {
const START_PAGE: u32 = 100;
const END_PAGE: u32 = 227;
const ITERATIONS: u32 = 24;
const TRIALS: usize = 3;
let m = mgr();
let mut seed = m.begin(BeginKind::Immediate).unwrap();
for raw_page in START_PAGE..=END_PAGE {
let page = PageNumber::new(raw_page).unwrap();
m.write_page(&mut seed, page, test_data((raw_page % 251) as u8))
.unwrap();
}
m.commit(&mut seed).unwrap();
let start_page = PageNumber::new(START_PAGE).unwrap();
let end_page = PageNumber::new(END_PAGE).unwrap();
let baseline_elapsed = (0..TRIALS)
.map(|_| {
let run_start = Instant::now();
for _ in 0..ITERATIONS {
let mut reader = m.begin(BeginKind::Concurrent).unwrap();
let rows =
read_page_range_without_tracking(&m, &mut reader, start_page, end_page);
let checksum =
consume_scan_rows(&rows).rotate_left(7) ^ consume_scan_rows(&rows);
black_box(checksum);
std::thread::sleep(Duration::from_micros(900));
m.abort(&mut reader);
}
run_start.elapsed()
})
.min()
.unwrap_or(Duration::ZERO);
let tracked_elapsed = (0..TRIALS)
.map(|_| {
let run_start = Instant::now();
for _ in 0..ITERATIONS {
let mut reader = m.begin(BeginKind::Concurrent).unwrap();
let rows = m.read_page_range(&mut reader, start_page, end_page);
let checksum =
consume_scan_rows(&rows).rotate_left(7) ^ consume_scan_rows(&rows);
black_box(checksum);
std::thread::sleep(Duration::from_micros(900));
m.abort(&mut reader);
}
run_start.elapsed()
})
.min()
.unwrap_or(Duration::ZERO);
let baseline_secs = baseline_elapsed.as_secs_f64().max(f64::EPSILON);
let tracked_secs = tracked_elapsed.as_secs_f64();
let overhead_ratio = ((tracked_secs - baseline_secs) / baseline_secs).max(0.0);
assert!(
overhead_ratio <= 0.25,
"range-scan tracking overhead must remain <=25%; baseline={baseline_elapsed:?} tracked={tracked_elapsed:?} overhead={:.2}%",
overhead_ratio * 100.0
);
}
#[test]
fn test_serialized_deferred_upgrade() {
let m = mgr();
let mut txn = m.begin(BeginKind::Deferred).unwrap();
assert!(
!txn.serialized_write_lock_held,
"DEFERRED: no mutex at BEGIN"
);
let pgno = PageNumber::new(1).unwrap();
m.write_page(&mut txn, pgno, test_data(0x01)).unwrap();
assert!(
txn.serialized_write_lock_held,
"mutex should be acquired on first write"
);
assert!(
txn.snapshot_established,
"snapshot should be established on first write"
);
}
#[test]
fn test_serialized_stale_snapshot_busy() {
let m = mgr();
let mut txn_writer = m.begin(BeginKind::Immediate).unwrap();
let pgno = PageNumber::new(1).unwrap();
m.write_page(&mut txn_writer, pgno, test_data(0x01))
.unwrap();
m.commit(&mut txn_writer).unwrap();
let mut txn = m.begin(BeginKind::Deferred).unwrap();
let _ = m.read_page(&mut txn, PageNumber::new(2).unwrap());
assert!(txn.snapshot_established);
let mut txn_writer2 = m.begin(BeginKind::Immediate).unwrap();
m.write_page(
&mut txn_writer2,
PageNumber::new(3).unwrap(),
test_data(0x02),
)
.unwrap();
m.commit(&mut txn_writer2).unwrap();
let result = m.write_page(&mut txn, PageNumber::new(4).unwrap(), test_data(0x03));
assert_eq!(
result.unwrap_err(),
MvccError::BusySnapshot,
"stale snapshot must return BUSY_SNAPSHOT"
);
}
#[test]
fn test_serialized_no_page_lock_needed() {
let m = mgr();
let mut txn = m.begin(BeginKind::Immediate).unwrap();
let pgno = PageNumber::new(1).unwrap();
m.write_page(&mut txn, pgno, test_data(0x01)).unwrap();
assert_eq!(
m.lock_table().lock_count(),
0,
"serialized mode should not use page locks"
);
}
#[test]
fn test_concurrent_page_lock_acquisition() {
let m = mgr();
let mut txn1 = m.begin(BeginKind::Concurrent).unwrap();
let mut txn2 = m.begin(BeginKind::Concurrent).unwrap();
let pgno = PageNumber::new(1).unwrap();
m.write_page(&mut txn1, pgno, test_data(0x01)).unwrap();
let result = m.write_page(&mut txn2, pgno, test_data(0x02));
assert_eq!(
result.unwrap_err(),
MvccError::Busy,
"page lock contention should return BUSY"
);
}
#[test]
fn test_concurrent_page_lock_tracked() {
let m = mgr();
let mut txn = m.begin(BeginKind::Concurrent).unwrap();
let p1 = PageNumber::new(1).unwrap();
let p2 = PageNumber::new(2).unwrap();
m.write_page(&mut txn, p1, test_data(0x01)).unwrap();
m.write_page(&mut txn, p2, test_data(0x02)).unwrap();
assert!(txn.page_locks.contains(&p1));
assert!(txn.page_locks.contains(&p2));
assert_eq!(txn.page_locks.len(), 2);
}
#[test]
fn test_concurrent_write_set_counter() {
let m = mgr();
let mut txn = m.begin(BeginKind::Concurrent).unwrap();
for i in 1..=5_u8 {
let pgno = PageNumber::new(u32::from(i)).unwrap();
m.write_page(&mut txn, pgno, test_data(i)).unwrap();
}
assert_eq!(txn.write_set.len(), 5);
assert_eq!(txn.write_set_data.len(), 5);
m.write_page(&mut txn, PageNumber::new(1).unwrap(), test_data(0xFF))
.unwrap();
assert_eq!(
txn.write_set.len(),
5,
"duplicate write should not increase write_set page count"
);
}
#[test]
fn test_write_tracks_base_version_and_clears_tracking_on_commit() {
let m = mgr();
let pgno = PageNumber::new(21).unwrap();
let mut seed = m.begin(BeginKind::Immediate).unwrap();
m.write_page(&mut seed, pgno, test_data(0x10)).unwrap();
let base_commit = m.commit(&mut seed).unwrap();
let mut txn = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut txn, pgno, test_data(0x11)).unwrap();
let tracked_before = txn.write_version_for_page(pgno).unwrap();
assert_eq!(tracked_before.old_version, Some(base_commit));
assert_eq!(tracked_before.new_version, None);
assert!(
txn.write_keys
.contains(&fsqlite_types::WitnessKey::Page(pgno)),
"page writes must populate SSI witness keys"
);
let committed = m.commit(&mut txn).unwrap();
assert!(committed > base_commit);
assert!(
txn.read_set_versions.is_empty(),
"read tracking must be cleared after finalization"
);
assert!(
txn.write_set_versions.is_empty(),
"write tracking must be cleared after finalization"
);
}
#[test]
fn test_concurrent_checks_serialized_exclusion() {
let m = mgr();
let txn_ser = m.begin(BeginKind::Immediate).unwrap();
assert!(txn_ser.serialized_write_lock_held);
let mut txn_conc = m.begin(BeginKind::Concurrent).unwrap();
let result = m.write_page(&mut txn_conc, PageNumber::new(1).unwrap(), test_data(0x01));
assert_eq!(
result.unwrap_err(),
MvccError::Busy,
"concurrent write while serialized writer active should fail"
);
}
#[cfg(unix)]
#[test]
fn test_process_alive_os_rejects_tagged_birth_mismatch() {
let pid = std::process::id();
let birth = current_process_birth_token(logical_now_epoch_secs());
if birth & PID_BIRTH_PROCFS_TAG == 0 {
return;
}
assert!(process_alive_os(pid, birth));
let mismatched = PID_BIRTH_PROCFS_TAG | ((birth & !PID_BIRTH_PROCFS_TAG).wrapping_add(1));
assert!(!process_alive_os(pid, mismatched));
}
#[cfg(unix)]
#[test]
fn test_concurrent_write_clears_dead_serialized_indicator_without_lease() {
let m = mgr();
let now = logical_now_epoch_secs();
assert!(m.shm.acquire_serialized_writer(777_777, u32::MAX, now, 0));
let pgno = PageNumber::new(1).unwrap();
let mut txn_conc = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut txn_conc, pgno, test_data(0x2A))
.expect("dead serialized indicator should be cleaned and not block writes");
assert!(
m.shm.check_serialized_writer().is_none(),
"stale serialized indicator should be cleared by concurrent preflight"
);
}
#[test]
fn test_concurrent_writer_blocks_serialized_acquisition() {
let m = mgr_with_busy_timeout_ms(5);
let mut txn_conc = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut txn_conc, PageNumber::new(1).unwrap(), test_data(0x01))
.unwrap();
let err = m.begin(BeginKind::Immediate).unwrap_err();
assert_eq!(err, MvccError::Busy);
assert!(m.write_mutex().holder().is_none(), "mutex must be released");
assert!(
m.shm.check_serialized_writer().is_none(),
"indicator must be cleared on failure"
);
}
#[test]
fn test_drain_waits_for_all_concurrent_locks_released() {
let m = Arc::new(mgr_with_busy_timeout_ms(200));
let mut txn_conc = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut txn_conc, PageNumber::new(1).unwrap(), test_data(0x01))
.unwrap();
let m2 = Arc::clone(&m);
let releaser = std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(20));
let mut txn_conc = txn_conc;
m2.abort(&mut txn_conc);
});
let mut txn_ser = m.begin(BeginKind::Immediate).unwrap();
assert!(txn_ser.serialized_write_lock_held);
m.abort(&mut txn_ser);
releaser.join().unwrap();
}
#[test]
fn test_concurrent_reads_allowed_during_serialized_write() {
let m = mgr();
let pgno = PageNumber::new(1).unwrap();
let mut writer = m.begin(BeginKind::Immediate).unwrap();
m.write_page(&mut writer, pgno, test_data(0x11)).unwrap();
let seq = m.commit(&mut writer).unwrap();
assert!(seq.get() > 0);
let _ser = m.begin(BeginKind::Immediate).unwrap();
let mut reader = m.begin(BeginKind::Concurrent).unwrap();
let got = m.read_page(&mut reader, pgno).unwrap();
assert_eq!(got.as_bytes()[0], 0x11);
}
#[test]
fn test_deferred_read_begin_allowed_during_concurrent_writes() {
let m = mgr_with_busy_timeout_ms(5);
let mut conc = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut conc, PageNumber::new(1).unwrap(), test_data(0x01))
.unwrap();
let mut def = m.begin(BeginKind::Deferred).unwrap();
let _ = m.read_page(&mut def, PageNumber::new(2).unwrap());
let err = m
.write_page(&mut def, PageNumber::new(3).unwrap(), test_data(0x02))
.unwrap_err();
assert_eq!(err, MvccError::Busy);
}
#[test]
fn test_acquisition_ordering_steps_1_through_5() {
let m = Arc::new(mgr_with_busy_timeout_ms(200));
let mut conc = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut conc, PageNumber::new(1).unwrap(), test_data(0x01))
.unwrap();
let (tx, rx) = mpsc::channel();
let m2 = Arc::clone(&m);
let starter = std::thread::spawn(move || {
let got = m2.begin(BeginKind::Immediate);
tx.send(got).unwrap();
});
let wait_start = Instant::now();
while m.shm.check_serialized_writer().is_none() {
assert!(
wait_start.elapsed() <= Duration::from_millis(50),
"timed out waiting for serialized writer indicator"
);
std::thread::yield_now();
}
let mut conc2 = m.begin(BeginKind::Concurrent).unwrap();
let err = m
.write_page(&mut conc2, PageNumber::new(2).unwrap(), test_data(0x02))
.unwrap_err();
assert_eq!(err, MvccError::Busy);
m.abort(&mut conc);
let mut ser = rx
.recv_timeout(Duration::from_millis(200))
.unwrap()
.unwrap();
assert!(ser.serialized_write_lock_held);
starter.join().expect("begin thread panicked");
m.abort(&mut ser);
assert!(m.shm.check_serialized_writer().is_none());
assert!(m.write_mutex().holder().is_none());
let mut conc3 = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut conc3, PageNumber::new(3).unwrap(), test_data(0x03))
.unwrap();
}
#[test]
fn test_e2e_serialized_vs_concurrent_mutual_exclusion() {
let m = Arc::new(mgr_with_busy_timeout_ms(200));
let (started_tx, started_rx) = mpsc::channel();
let (commit_tx, commit_rx) = mpsc::channel();
let m2 = Arc::clone(&m);
let th = std::thread::spawn(move || {
let mut ser = m2.begin(BeginKind::Immediate).unwrap();
started_tx.send(()).unwrap();
m2.write_page(&mut ser, PageNumber::new(1).unwrap(), test_data(0xAA))
.unwrap();
commit_rx.recv_timeout(Duration::from_secs(1)).unwrap();
m2.commit(&mut ser).unwrap()
});
started_rx.recv_timeout(Duration::from_secs(1)).unwrap();
let mut conc = m.begin(BeginKind::Concurrent).unwrap();
let err = m
.write_page(&mut conc, PageNumber::new(2).unwrap(), test_data(0xBB))
.unwrap_err();
assert_eq!(err, MvccError::Busy);
commit_tx.send(()).unwrap();
let seq = th.join().unwrap();
assert!(seq.get() > 0);
let mut conc2 = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut conc2, PageNumber::new(2).unwrap(), test_data(0xBB))
.unwrap();
}
#[test]
fn test_commit_serialized_publishes_and_releases() {
let m = mgr();
let mut txn = m.begin(BeginKind::Immediate).unwrap();
let pgno = PageNumber::new(1).unwrap();
m.write_page(&mut txn, pgno, test_data(0x42)).unwrap();
let seq = m.commit(&mut txn).unwrap();
assert!(seq.get() > 0);
assert_eq!(txn.state, TransactionState::Committed);
assert!(m.write_mutex().holder().is_none());
let snap = Snapshot::new(seq, SchemaEpoch::ZERO);
assert!(m.version_store().resolve(pgno, &snap).is_some());
assert_eq!(m.commit_index().latest(pgno), Some(seq));
}
#[test]
fn test_commit_serialized_schema_epoch_check() {
let mut m = mgr();
let mut txn = m.begin(BeginKind::Immediate).unwrap();
let pgno = PageNumber::new(1).unwrap();
m.write_page(&mut txn, pgno, test_data(0x01)).unwrap();
m.advance_schema_epoch();
let result = m.commit(&mut txn);
assert_eq!(result.unwrap_err(), MvccError::Schema);
assert_eq!(txn.state, TransactionState::Aborted);
}
#[test]
fn test_commit_serialized_fcw_freshness() {
let m = mgr();
let mut txn1 = m.begin(BeginKind::Immediate).unwrap();
let pgno = PageNumber::new(1).unwrap();
m.write_page(&mut txn1, pgno, test_data(0x01)).unwrap();
m.commit(&mut txn1).unwrap();
let mut txn2 = m.begin(BeginKind::Immediate).unwrap();
m.write_page(&mut txn2, pgno, test_data(0x02)).unwrap();
txn2.snapshot = Snapshot::new(CommitSeq::ZERO, SchemaEpoch::ZERO);
let result = m.commit(&mut txn2);
assert_eq!(
result.unwrap_err(),
MvccError::BusySnapshot,
"FCW should reject stale snapshot"
);
}
#[test]
fn test_commit_concurrent_ssi_validation() {
let m = mgr();
let mut txn = m.begin(BeginKind::Concurrent).unwrap();
let pgno = PageNumber::new(1).unwrap();
m.write_page(&mut txn, pgno, test_data(0x01)).unwrap();
txn.has_in_rw = true;
txn.has_out_rw = true;
let result = m.commit(&mut txn);
assert_eq!(
result.unwrap_err(),
MvccError::BusySnapshot,
"dangerous SSI structure should abort"
);
assert_eq!(txn.state, TransactionState::Aborted);
}
#[test]
fn test_commit_concurrent_successful() {
let m = mgr();
let mut txn = m.begin(BeginKind::Concurrent).unwrap();
let p1 = PageNumber::new(1).unwrap();
let p2 = PageNumber::new(2).unwrap();
m.write_page(&mut txn, p1, test_data(0x01)).unwrap();
m.write_page(&mut txn, p2, test_data(0x02)).unwrap();
let seq = m.commit(&mut txn).unwrap();
assert!(seq.get() > 0);
assert_eq!(txn.state, TransactionState::Committed);
assert_eq!(m.lock_table().lock_count(), 0);
}
#[test]
fn test_abort_releases_page_locks() {
let m = mgr();
let mut txn = m.begin(BeginKind::Concurrent).unwrap();
let pgno = PageNumber::new(1).unwrap();
m.write_page(&mut txn, pgno, test_data(0x01)).unwrap();
assert_eq!(m.lock_table().lock_count(), 1);
m.abort(&mut txn);
assert_eq!(
m.lock_table().lock_count(),
0,
"abort must release all page locks"
);
}
#[test]
fn test_abort_discards_write_set() {
let m = mgr();
let mut txn = m.begin(BeginKind::Immediate).unwrap();
let pgno = PageNumber::new(1).unwrap();
m.write_page(&mut txn, pgno, test_data(0x01)).unwrap();
assert!(!txn.write_set.is_empty());
m.abort(&mut txn);
assert_eq!(txn.state, TransactionState::Aborted);
let mut txn2 = m.begin(BeginKind::Concurrent).unwrap();
assert!(
m.read_page(&mut txn2, pgno).is_none(),
"aborted data must not be visible"
);
}
#[test]
fn test_abort_serialized_releases_mutex() {
let m = mgr();
let mut txn = m.begin(BeginKind::Immediate).unwrap();
assert!(m.write_mutex().holder().is_some());
m.abort(&mut txn);
assert!(
m.write_mutex().holder().is_none(),
"abort must release serialized write mutex"
);
}
#[test]
fn test_abort_concurrent_witnesses_preserved() {
let m = mgr();
let mut txn = m.begin(BeginKind::Concurrent).unwrap();
let read_pg = PageNumber::new(1).unwrap();
let write_pg = PageNumber::new(2).unwrap();
txn.record_page_read(read_pg, CommitSeq::new(1));
txn.record_page_write(write_pg, Some(CommitSeq::new(1)));
m.abort(&mut txn);
assert!(
txn.read_set_versions.is_empty(),
"read tracking must be cleared on abort"
);
assert!(
txn.write_set_versions.is_empty(),
"write tracking must be cleared on abort"
);
assert!(
!txn.read_keys.is_empty(),
"SSI read witnesses must be preserved after abort"
);
assert!(
!txn.write_keys.is_empty(),
"SSI write witnesses must be preserved after abort"
);
}
#[test]
fn test_savepoint_records_state() {
let m = mgr();
let mut txn = m.begin(BeginKind::Immediate).unwrap();
let p1 = PageNumber::new(1).unwrap();
m.write_page(&mut txn, p1, test_data(0x01)).unwrap();
let sp = TransactionManager::savepoint(&txn, "sp1");
assert_eq!(sp.name, "sp1");
assert_eq!(sp.write_set_len, 1);
assert!(sp.write_set_snapshot.contains_key(&p1));
}
#[test]
fn test_rollback_to_savepoint_restores_pages() {
let m = mgr();
let mut txn = m.begin(BeginKind::Immediate).unwrap();
let p1 = PageNumber::new(1).unwrap();
let p2 = PageNumber::new(2).unwrap();
m.write_page(&mut txn, p1, test_data(0x01)).unwrap();
let sp = TransactionManager::savepoint(&txn, "sp1");
m.write_page(&mut txn, p2, test_data(0x02)).unwrap();
assert_eq!(txn.write_set.len(), 2);
TransactionManager::rollback_to_savepoint(&mut txn, &sp);
assert_eq!(
txn.write_set.len(),
1,
"write_set should be truncated to savepoint state"
);
assert!(
txn.write_set_data.contains_key(&p1),
"page 1 should still be in write_set_data"
);
assert!(
!txn.write_set_data.contains_key(&p2),
"page 2 should be removed from write_set_data"
);
}
#[test]
fn test_rollback_to_savepoint_prunes_write_version_tracking() {
let m = mgr();
let mut txn = m.begin(BeginKind::Immediate).unwrap();
let p1 = PageNumber::new(1).unwrap();
let p2 = PageNumber::new(2).unwrap();
m.write_page(&mut txn, p1, test_data(0x01)).unwrap();
let sp = TransactionManager::savepoint(&txn, "sp_tracking");
m.write_page(&mut txn, p2, test_data(0x02)).unwrap();
assert!(txn.write_version_for_page(p1).is_some());
assert!(txn.write_version_for_page(p2).is_some());
TransactionManager::rollback_to_savepoint(&mut txn, &sp);
assert!(txn.write_version_for_page(p1).is_some());
assert!(
txn.write_version_for_page(p2).is_none(),
"savepoint rollback must prune write-version entries for removed pages"
);
}
#[test]
fn test_rollback_to_savepoint_keeps_page_locks() {
let m = mgr();
let mut txn = m.begin(BeginKind::Concurrent).unwrap();
let p1 = PageNumber::new(1).unwrap();
let p2 = PageNumber::new(2).unwrap();
m.write_page(&mut txn, p1, test_data(0x01)).unwrap();
let sp = TransactionManager::savepoint(&txn, "sp1");
m.write_page(&mut txn, p2, test_data(0x02)).unwrap();
assert_eq!(txn.page_locks.len(), 2);
TransactionManager::rollback_to_savepoint(&mut txn, &sp);
assert_eq!(
txn.page_locks.len(),
2,
"page locks must NOT be released on ROLLBACK TO"
);
assert!(txn.page_locks.contains(&p1));
assert!(txn.page_locks.contains(&p2));
}
#[test]
fn test_rollback_to_savepoint_keeps_witnesses() {
let m = mgr();
let mut txn = m.begin(BeginKind::Concurrent).unwrap();
let p1 = PageNumber::new(1).unwrap();
m.write_page(&mut txn, p1, test_data(0x01)).unwrap();
let sp = TransactionManager::savepoint(&txn, "sp1");
txn.read_keys
.insert(fsqlite_types::WitnessKey::Page(PageNumber::new(2).unwrap()));
txn.write_keys
.insert(fsqlite_types::WitnessKey::Page(PageNumber::new(3).unwrap()));
TransactionManager::rollback_to_savepoint(&mut txn, &sp);
assert!(
!txn.read_keys.is_empty(),
"SSI read witnesses must NOT be rolled back"
);
assert!(
!txn.write_keys.is_empty(),
"SSI write witnesses must NOT be rolled back"
);
}
#[test]
fn test_nested_savepoints() {
let m = mgr();
let mut txn = m.begin(BeginKind::Immediate).unwrap();
let p1 = PageNumber::new(1).unwrap();
let p2 = PageNumber::new(2).unwrap();
let p3 = PageNumber::new(3).unwrap();
m.write_page(&mut txn, p1, test_data(0x01)).unwrap();
let sp1 = TransactionManager::savepoint(&txn, "sp1");
m.write_page(&mut txn, p2, test_data(0x02)).unwrap();
let sp2 = TransactionManager::savepoint(&txn, "sp2");
m.write_page(&mut txn, p3, test_data(0x03)).unwrap();
assert_eq!(txn.write_set.len(), 3);
TransactionManager::rollback_to_savepoint(&mut txn, &sp2);
assert_eq!(txn.write_set.len(), 2);
assert!(txn.write_set_data.contains_key(&p1));
assert!(txn.write_set_data.contains_key(&p2));
assert!(!txn.write_set_data.contains_key(&p3));
TransactionManager::rollback_to_savepoint(&mut txn, &sp1);
assert_eq!(txn.write_set.len(), 1);
assert!(txn.write_set_data.contains_key(&p1));
assert!(!txn.write_set_data.contains_key(&p2));
}
#[test]
fn test_state_machine_transitions_irreversible() {
let m = mgr();
let mut txn = m.begin(BeginKind::Concurrent).unwrap();
m.commit(&mut txn).unwrap();
assert_eq!(txn.state, TransactionState::Committed);
let mut txn2 = m.begin(BeginKind::Concurrent).unwrap();
m.abort(&mut txn2);
assert_eq!(txn2.state, TransactionState::Aborted);
}
#[test]
#[should_panic(expected = "can only commit active")]
fn test_committed_cannot_commit() {
let m = mgr();
let mut txn = m.begin(BeginKind::Concurrent).unwrap();
m.commit(&mut txn).unwrap();
txn.commit(); }
#[test]
#[should_panic(expected = "can only abort active")]
fn test_committed_cannot_abort() {
let m = mgr();
let mut txn = m.begin(BeginKind::Concurrent).unwrap();
m.commit(&mut txn).unwrap();
txn.abort(); }
#[test]
#[should_panic(expected = "can only commit active")]
fn test_aborted_cannot_commit() {
let m = mgr();
let mut txn = m.begin(BeginKind::Concurrent).unwrap();
m.abort(&mut txn);
txn.commit(); }
#[test]
#[should_panic(expected = "can only abort active")]
fn test_aborted_cannot_abort() {
let m = mgr();
let mut txn = m.begin(BeginKind::Concurrent).unwrap();
m.abort(&mut txn);
txn.abort(); }
#[test]
#[allow(clippy::too_many_lines)]
fn test_e2e_full_lifecycle_all_modes() {
let m = mgr();
let p1 = PageNumber::new(1).unwrap();
let p2 = PageNumber::new(2).unwrap();
let p3 = PageNumber::new(3).unwrap();
let p4 = PageNumber::new(4).unwrap();
let mut txn_imm = m.begin(BeginKind::Immediate).unwrap();
assert!(txn_imm.serialized_write_lock_held);
m.write_page(&mut txn_imm, p1, test_data(0x11)).unwrap();
let seq_imm = m.commit(&mut txn_imm).unwrap();
assert!(seq_imm.get() > 0);
let mut txn_exc = m.begin(BeginKind::Exclusive).unwrap();
assert!(txn_exc.serialized_write_lock_held);
m.write_page(&mut txn_exc, p2, test_data(0x22)).unwrap();
let seq_exc = m.commit(&mut txn_exc).unwrap();
assert!(seq_exc > seq_imm);
let mut txn_def = m.begin(BeginKind::Deferred).unwrap();
assert!(!txn_def.snapshot_established);
let data_p1 = m.read_page(&mut txn_def, p1).unwrap();
assert_eq!(data_p1.as_bytes()[0], 0x11);
assert!(txn_def.snapshot_established);
m.write_page(&mut txn_def, p3, test_data(0x33)).unwrap();
assert!(txn_def.serialized_write_lock_held);
let seq_def = m.commit(&mut txn_def).unwrap();
assert!(seq_def > seq_exc);
let mut txn_conc = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut txn_conc, p4, test_data(0x44)).unwrap();
let seq_conc = m.commit(&mut txn_conc).unwrap();
assert!(seq_conc > seq_def);
let mut txn_read = m.begin(BeginKind::Concurrent).unwrap();
let r1 = m.read_page(&mut txn_read, p1).unwrap();
assert_eq!(r1.as_bytes()[0], 0x11, "page 1 from IMMEDIATE");
let r2 = m.read_page(&mut txn_read, p2).unwrap();
assert_eq!(r2.as_bytes()[0], 0x22, "page 2 from EXCLUSIVE");
let r3 = m.read_page(&mut txn_read, p3).unwrap();
assert_eq!(r3.as_bytes()[0], 0x33, "page 3 from DEFERRED");
let r4 = m.read_page(&mut txn_read, p4).unwrap();
assert_eq!(r4.as_bytes()[0], 0x44, "page 4 from CONCURRENT");
let mut txn_old = m.begin(BeginKind::Concurrent).unwrap();
let mut txn_update = m.begin(BeginKind::Immediate).unwrap();
m.write_page(&mut txn_update, p1, test_data(0xFF)).unwrap();
m.commit(&mut txn_update).unwrap();
let r1_old = m.read_page(&mut txn_old, p1).unwrap();
assert_eq!(
r1_old.as_bytes()[0],
0x11,
"old snapshot should see old version"
);
}
#[test]
fn test_e2e_concurrent_different_pages_both_commit() {
let m = mgr();
let mut txn1 = m.begin(BeginKind::Concurrent).unwrap();
let mut txn2 = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut txn1, PageNumber::new(1).unwrap(), test_data(0x01))
.unwrap();
m.write_page(&mut txn2, PageNumber::new(2).unwrap(), test_data(0x02))
.unwrap();
let seq1 = m.commit(&mut txn1).unwrap();
let seq2 = m.commit(&mut txn2).unwrap();
assert!(seq1.get() > 0);
assert!(seq2.get() > 0);
assert_eq!(txn1.state, TransactionState::Committed);
assert_eq!(txn2.state, TransactionState::Committed);
}
#[test]
fn test_e2e_concurrent_same_page_conflict() {
let m = mgr();
let pgno = PageNumber::new(1).unwrap();
let mut txn1 = m.begin(BeginKind::Concurrent).unwrap();
let mut txn2 = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut txn1, pgno, test_data(0x01)).unwrap();
let result = m.write_page(&mut txn2, pgno, test_data(0x02));
assert_eq!(result.unwrap_err(), MvccError::Busy);
let seq = m.commit(&mut txn1).unwrap();
assert!(seq.get() > 0);
}
#[test]
fn test_xor_merge_forbidden_btree_interior() {
assert!(!raw_xor_merge_allowed(
WriteMergePolicy::Safe,
MergePageKind::BtreeInteriorTable,
true,
));
assert_eq!(
merge_decision(
WriteMergePolicy::Safe,
MergePageKind::BtreeInteriorTable,
true
),
MergeDecision::IntentReplay
);
}
#[test]
fn test_xor_merge_forbidden_btree_leaf() {
assert!(!raw_xor_merge_allowed(
WriteMergePolicy::Safe,
MergePageKind::BtreeLeafTable,
true,
));
assert_eq!(
merge_decision(WriteMergePolicy::Safe, MergePageKind::BtreeLeafTable, true),
MergeDecision::IntentReplay
);
}
#[test]
fn test_xor_merge_forbidden_overflow() {
assert!(!raw_xor_merge_allowed(
WriteMergePolicy::Safe,
MergePageKind::Overflow,
true,
));
assert_eq!(
merge_decision(WriteMergePolicy::Safe, MergePageKind::Overflow, true),
MergeDecision::IntentReplay
);
}
#[test]
fn test_xor_merge_forbidden_freelist() {
assert!(!raw_xor_merge_allowed(
WriteMergePolicy::Safe,
MergePageKind::Freelist,
true,
));
assert_eq!(
merge_decision(WriteMergePolicy::Safe, MergePageKind::Freelist, true),
MergeDecision::IntentReplay
);
}
#[test]
fn test_xor_merge_forbidden_pointer_map() {
assert!(!raw_xor_merge_allowed(
WriteMergePolicy::Safe,
MergePageKind::PointerMap,
true,
));
assert_eq!(
merge_decision(WriteMergePolicy::Safe, MergePageKind::PointerMap, true),
MergeDecision::IntentReplay
);
}
#[test]
fn test_disjoint_delta_lemma_correct() {
let base = vec![0_u8; 8];
let mut page_1 = base.clone();
page_1[1] = 0xA1;
page_1[5] = 0xB2;
let mut page_2 = base.clone();
page_2[2] = 0x0C;
page_2[7] = 0x7D;
let delta_1 = gf256_patch_delta(&base, &page_1).expect("equal lengths");
let delta_2 = gf256_patch_delta(&base, &page_2).expect("equal lengths");
assert!(gf256_patches_disjoint(&delta_1, &delta_2));
let merged =
compose_disjoint_gf256_patches(&base, &delta_1, &delta_2).expect("disjoint deltas");
assert_eq!(merged, vec![0_u8, 0xA1, 0x0C, 0, 0, 0xB2, 0, 0x7D]);
}
#[test]
fn test_counterexample_lost_update() {
let pointer_slot = 0_usize;
let old_offset = 10_usize;
let new_offset = 20_usize;
let mut page_0 = vec![0_u8; 64];
page_0[pointer_slot] = u8::try_from(old_offset).expect("small offset");
page_0[old_offset] = b'A';
let mut page_t1 = page_0.clone();
page_t1[pointer_slot] = u8::try_from(new_offset).expect("small offset");
page_t1[new_offset] = page_0[old_offset];
let mut page_t2 = page_0.clone();
page_t2[old_offset] = b'B';
let delta_t1 = gf256_patch_delta(&page_0, &page_t1).expect("equal lengths");
let delta_t2 = gf256_patch_delta(&page_0, &page_t2).expect("equal lengths");
assert!(gf256_patches_disjoint(&delta_t1, &delta_t2));
let merged =
compose_disjoint_gf256_patches(&page_0, &delta_t1, &delta_t2).expect("disjoint deltas");
let logical_offset = usize::from(merged[pointer_slot]);
let logical_payload = merged[logical_offset];
assert_eq!(
logical_offset, new_offset,
"pointer moved to new location by T1"
);
assert_eq!(logical_payload, b'A', "stale payload is still reachable");
assert_eq!(
merged[old_offset], b'B',
"T2 update exists at old location but became unreachable"
);
assert_ne!(
logical_payload, b'B',
"lost update reproduced despite disjoint byte deltas"
);
}
#[test]
fn test_pragma_write_merge_off() {
let mut m = mgr();
m.set_write_merge_policy(WriteMergePolicy::Off)
.expect("OFF must be accepted");
let pgno = PageNumber::new(1).unwrap();
let mut txn1 = m.begin(BeginKind::Concurrent).unwrap();
let mut txn2 = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut txn1, pgno, test_data(0x0D)).unwrap();
m.commit(&mut txn1).unwrap();
m.write_page(&mut txn2, pgno, test_data(0x0D)).unwrap();
let result = m.commit(&mut txn2);
assert_eq!(result.unwrap_err(), MvccError::BusySnapshot);
}
#[test]
fn test_pragma_write_merge_safe() {
let mut m = mgr();
m.set_write_merge_policy(WriteMergePolicy::Safe)
.expect("SAFE must be accepted");
let pgno = PageNumber::new(1).unwrap();
let mut txn1 = m.begin(BeginKind::Concurrent).unwrap();
let mut txn2 = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut txn1, pgno, test_data(0x0D)).unwrap();
m.commit(&mut txn1).unwrap();
m.write_page(&mut txn2, pgno, test_data(0x0D)).unwrap();
let result = m.commit(&mut txn2);
assert_eq!(result.unwrap_err(), MvccError::BusySnapshot);
assert_eq!(
merge_decision(
WriteMergePolicy::Safe,
MergePageKind::classify(test_data(0x0D).as_bytes()),
cfg!(debug_assertions),
),
MergeDecision::IntentReplay
);
}
#[test]
fn test_pragma_write_merge_lab_unsafe_rejected_in_release() {
let mut m = mgr();
let result = m.set_write_merge_policy(WriteMergePolicy::LabUnsafe);
if cfg!(debug_assertions) {
assert!(result.is_ok(), "LAB_UNSAFE is debug-only");
} else {
assert_eq!(
result.unwrap_err(),
MvccError::InvalidWriteMergePolicy,
"release builds must reject LAB_UNSAFE"
);
}
}
#[test]
fn test_lab_unsafe_still_forbids_btree_xor() {
assert!(!raw_xor_merge_allowed(
WriteMergePolicy::LabUnsafe,
MergePageKind::BtreeLeafTable,
true,
));
assert_eq!(
merge_decision(
WriteMergePolicy::LabUnsafe,
MergePageKind::BtreeLeafTable,
true
),
MergeDecision::IntentReplay
);
}
#[test]
fn test_gf256_delta_as_encoding_not_correctness() {
let base = vec![0x0D, 0x10, 0x20, 0x30, 0x40];
let target = vec![0x0D, 0x11, 0x20, 0x33, 0x40];
let delta = gf256_patch_delta(&base, &target).expect("equal lengths");
assert!(
delta.iter().any(|byte| *byte != 0),
"delta encodes byte differences"
);
let page_kind = MergePageKind::classify(&target);
assert_eq!(page_kind, MergePageKind::BtreeLeafTable);
assert!(
!raw_xor_merge_allowed(WriteMergePolicy::Safe, page_kind, true),
"delta encoding does not imply merge correctness permission"
);
}
#[test]
fn prop_merge_safety_compile_time() {
let structured = [
MergePageKind::BtreeInteriorTable,
MergePageKind::BtreeLeafTable,
MergePageKind::Overflow,
MergePageKind::Freelist,
MergePageKind::PointerMap,
];
for page_kind in structured {
assert!(page_kind.is_sqlite_structured());
assert!(!raw_xor_merge_allowed(
WriteMergePolicy::Safe,
page_kind,
true,
));
assert!(!raw_xor_merge_allowed(
WriteMergePolicy::LabUnsafe,
page_kind,
true,
));
}
}
proptest! {
#[test]
fn prop_disjoint_delta_composition(
base in prop::collection::vec(any::<u8>(), 1..256),
even_noise in prop::collection::vec(any::<u8>(), 1..64),
odd_noise in prop::collection::vec(any::<u8>(), 1..64),
) {
let len = base.len();
let mut delta_even = vec![0_u8; len];
let mut delta_odd = vec![0_u8; len];
for (idx, byte) in delta_even.iter_mut().enumerate() {
if idx % 2 == 0 {
*byte = even_noise[idx % even_noise.len()];
}
}
for (idx, byte) in delta_odd.iter_mut().enumerate() {
if idx % 2 == 1 {
*byte = odd_noise[idx % odd_noise.len()];
}
}
prop_assert!(gf256_patches_disjoint(&delta_even, &delta_odd));
let merged = compose_disjoint_gf256_patches(&base, &delta_even, &delta_odd)
.expect("disjoint deltas should compose");
for idx in 0..len {
prop_assert_eq!(merged[idx], base[idx] ^ delta_even[idx] ^ delta_odd[idx]);
}
}
}
#[test]
fn test_e2e_concurrent_insert_different_pages() {
let m = mgr();
let mut txn1 = m.begin(BeginKind::Concurrent).unwrap();
let mut txn2 = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut txn1, PageNumber::new(3).unwrap(), test_data(0x01))
.unwrap();
m.write_page(&mut txn2, PageNumber::new(4).unwrap(), test_data(0x02))
.unwrap();
assert!(m.commit(&mut txn1).is_ok());
assert!(m.commit(&mut txn2).is_ok());
}
#[test]
fn test_e2e_concurrent_insert_same_page_conflict() {
let m = mgr();
let pgno = PageNumber::new(5).unwrap();
let mut txn1 = m.begin(BeginKind::Concurrent).unwrap();
let mut txn2 = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut txn1, pgno, test_data(0x0D)).unwrap();
m.commit(&mut txn1).unwrap();
m.write_page(&mut txn2, pgno, test_data(0x0D)).unwrap();
assert_eq!(m.commit(&mut txn2).unwrap_err(), MvccError::BusySnapshot);
}
#[test]
fn test_e2e_concurrent_insert_same_page_intent_replay() {
let mut m = mgr();
m.set_write_merge_policy(WriteMergePolicy::Safe)
.expect("SAFE must be accepted");
let pgno = PageNumber::new(6).unwrap();
let mut txn1 = m.begin(BeginKind::Concurrent).unwrap();
let mut txn2 = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut txn1, pgno, test_data(0x0D)).unwrap();
m.commit(&mut txn1).unwrap();
m.write_page(&mut txn2, pgno, test_data(0x0D)).unwrap();
let result = m.commit(&mut txn2);
assert_eq!(result.unwrap_err(), MvccError::BusySnapshot);
assert_eq!(
merge_decision(
WriteMergePolicy::Safe,
MergePageKind::classify(test_data(0x0D).as_bytes()),
cfg!(debug_assertions),
),
MergeDecision::IntentReplay,
"SAFE mode should select semantic merge ladder for structured pages"
);
}
#[test]
fn test_theorem1_deadlock_freedom_try_acquire_never_blocks() {
let m = mgr();
let pgno = PageNumber::new(42).unwrap();
let mut txn1 = m.begin(BeginKind::Concurrent).unwrap();
let mut txn2 = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut txn1, pgno, test_data(0xAA)).unwrap();
let start = Instant::now();
for _ in 0..1_000 {
let err = m
.write_page(&mut txn2, pgno, test_data(0xBB))
.expect_err("conflicting write must fail immediately");
assert_eq!(err, MvccError::Busy);
}
assert!(
start.elapsed() < Duration::from_secs(1),
"non-blocking try_acquire path must return promptly"
);
}
#[test]
fn test_theorem2_snapshot_isolation_all_or_nothing_visibility() {
let m = mgr();
let pages = [1_u32, 2, 3];
let mut old_reader = m.begin(BeginKind::Concurrent).unwrap();
let mut writer = m.begin(BeginKind::Immediate).unwrap();
for page in pages {
let byte = u8::try_from(page).expect("test page numbers fit in u8");
m.write_page(&mut writer, PageNumber::new(page).unwrap(), test_data(byte))
.unwrap();
}
let committed = m.commit(&mut writer).unwrap();
assert!(committed > CommitSeq::ZERO);
for page in pages {
assert!(
m.read_page(&mut old_reader, PageNumber::new(page).unwrap())
.is_none(),
"old snapshot must not see post-snapshot commit"
);
}
let mut fresh_reader = m.begin(BeginKind::Concurrent).unwrap();
for page in pages {
assert!(
m.read_page(&mut fresh_reader, PageNumber::new(page).unwrap())
.is_some(),
"fresh snapshot must see all committed pages"
);
}
}
#[test]
fn test_theorem3_no_lost_update_case_a_lock_contention() {
let m = mgr();
let pgno = PageNumber::new(77).unwrap();
let mut txn1 = m.begin(BeginKind::Concurrent).unwrap();
let mut txn2 = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut txn1, pgno, test_data(0x10)).unwrap();
let result = m.write_page(&mut txn2, pgno, test_data(0x20));
assert_eq!(result.unwrap_err(), MvccError::Busy);
}
#[test]
fn test_theorem3_no_lost_update_case_b_fcw_stale() {
let m = mgr();
let pgno = PageNumber::new(88).unwrap();
let mut txn_fresh = m.begin(BeginKind::Concurrent).unwrap();
let mut txn_stale = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut txn_fresh, pgno, test_data(0x01)).unwrap();
m.commit(&mut txn_fresh).unwrap();
m.write_page(&mut txn_stale, pgno, test_data(0x02)).unwrap();
let result = m.commit(&mut txn_stale);
assert_eq!(result.unwrap_err(), MvccError::BusySnapshot);
}
#[test]
fn test_theorem5_txn_max_duration_enforced() {
let mut m = mgr();
m.set_txn_max_duration_ms(1);
let mut txn = m.begin(BeginKind::Immediate).unwrap();
txn.started_at_ms = txn.started_at_ms.saturating_sub(10);
let result = m.write_page(&mut txn, PageNumber::new(1).unwrap(), test_data(0xEF));
assert_eq!(result.unwrap_err(), MvccError::TxnMaxDurationExceeded);
assert_eq!(txn.state, TransactionState::Aborted);
assert!(m.write_mutex().holder().is_none());
}
#[test]
fn test_theorem6_liveness_all_ops_bounded() {
let m = mgr();
let mut txn1 = m.begin(BeginKind::Concurrent).unwrap();
assert!(
m.read_page(&mut txn1, PageNumber::new(500).unwrap())
.is_none(),
"read on missing page should terminate and return None"
);
m.write_page(&mut txn1, PageNumber::new(501).unwrap(), test_data(0x01))
.unwrap();
let seq = m.commit(&mut txn1).unwrap();
assert!(seq > CommitSeq::ZERO);
let mut txn2 = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut txn2, PageNumber::new(502).unwrap(), test_data(0x02))
.unwrap();
m.abort(&mut txn2);
assert_eq!(txn2.state, TransactionState::Aborted);
assert_eq!(m.lock_table().lock_count(), 0);
}
#[test]
fn test_theorem1_no_dirty_reads() {
let m = mgr();
let pgno = PageNumber::new(1_201).unwrap();
let mut seed = m.begin(BeginKind::Immediate).unwrap();
m.write_page(&mut seed, pgno, test_data(0x11)).unwrap();
m.commit(&mut seed).unwrap();
let mut writer = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut writer, pgno, test_data(0x22)).unwrap();
let mut reader = m.begin(BeginKind::Concurrent).unwrap();
let visible = m.read_page(&mut reader, pgno).unwrap();
assert_eq!(
visible.as_bytes()[0],
0x11,
"reader must not observe uncommitted writer state"
);
}
#[test]
fn test_theorem1_no_non_repeatable_reads() {
let m = mgr();
let pgno = PageNumber::new(1_202).unwrap();
let mut seed = m.begin(BeginKind::Immediate).unwrap();
m.write_page(&mut seed, pgno, test_data(0x33)).unwrap();
m.commit(&mut seed).unwrap();
let mut reader = m.begin(BeginKind::Concurrent).unwrap();
let first = m.read_page(&mut reader, pgno).unwrap();
assert_eq!(first.as_bytes()[0], 0x33);
let mut writer = m.begin(BeginKind::Immediate).unwrap();
m.write_page(&mut writer, pgno, test_data(0x44)).unwrap();
m.commit(&mut writer).unwrap();
let second = m.read_page(&mut reader, pgno).unwrap();
assert_eq!(
second.as_bytes()[0],
0x33,
"same transaction must keep a stable snapshot view"
);
}
#[test]
fn test_theorem1_no_phantom_reads() {
let m = mgr();
let base_pages = [1_301_u32, 1_302, 1_303];
let phantom_page = 1_304_u32;
let mut seed = m.begin(BeginKind::Immediate).unwrap();
for page in base_pages {
let byte = u8::try_from(page % 251).expect("page modulo 251 always fits in u8");
m.write_page(&mut seed, PageNumber::new(page).unwrap(), test_data(byte))
.unwrap();
}
m.commit(&mut seed).unwrap();
let mut reader = m.begin(BeginKind::Concurrent).unwrap();
let mut initial_visible = Vec::new();
for page in [1_301_u32, 1_302, 1_303, phantom_page] {
if m.read_page(&mut reader, PageNumber::new(page).unwrap())
.is_some()
{
initial_visible.push(page);
}
}
assert_eq!(initial_visible, vec![1_301_u32, 1_302, 1_303]);
let mut inserter = m.begin(BeginKind::Immediate).unwrap();
m.write_page(
&mut inserter,
PageNumber::new(phantom_page).unwrap(),
test_data(0x7E),
)
.unwrap();
m.commit(&mut inserter).unwrap();
let mut second_visible = Vec::new();
for page in [1_301_u32, 1_302, 1_303, phantom_page] {
if m.read_page(&mut reader, PageNumber::new(page).unwrap())
.is_some()
{
second_visible.push(page);
}
}
assert_eq!(
second_visible,
vec![1_301_u32, 1_302, 1_303],
"reader snapshot must not gain new rows/pages mid-transaction"
);
}
#[test]
fn test_theorem1_committed_writes_visible_in_later_snapshots() {
let m = mgr();
let pgno = PageNumber::new(1_205).unwrap();
let mut writer = m.begin(BeginKind::Immediate).unwrap();
m.write_page(&mut writer, pgno, test_data(0x5A)).unwrap();
let committed = m.commit(&mut writer).unwrap();
assert!(committed > CommitSeq::ZERO);
let mut later_reader = m.begin(BeginKind::Concurrent).unwrap();
let read = m.read_page(&mut later_reader, pgno).unwrap();
assert_eq!(
read.as_bytes()[0],
0x5A,
"later snapshots must observe committed writes"
);
}
#[test]
fn test_theorem2_write_skew_detected() {
let m = mgr();
let mut pivot = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut pivot, PageNumber::new(1_401).unwrap(), test_data(0xA1))
.unwrap();
pivot.has_in_rw = true;
pivot.has_out_rw = true;
let result = m.commit(&mut pivot);
assert_eq!(
result.unwrap_err(),
MvccError::BusySnapshot,
"dangerous rw-rw structure must abort"
);
assert_eq!(pivot.state, TransactionState::Aborted);
}
#[test]
fn test_theorem2_non_conflicting_concurrent_commits() {
let m = mgr();
let mut txn1 = m.begin(BeginKind::Concurrent).unwrap();
let mut txn2 = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut txn1, PageNumber::new(1_402).unwrap(), test_data(0x01))
.unwrap();
m.write_page(&mut txn2, PageNumber::new(1_403).unwrap(), test_data(0x02))
.unwrap();
let seq1 = m.commit(&mut txn1).unwrap();
let seq2 = m.commit(&mut txn2).unwrap();
assert!(seq1 > CommitSeq::ZERO);
assert!(seq2 > CommitSeq::ZERO);
}
#[test]
fn test_theorem2_rw_antidependency_tracking() {
let m = mgr();
let pgno = PageNumber::new(1_404).unwrap();
let mut seed = m.begin(BeginKind::Immediate).unwrap();
m.write_page(&mut seed, pgno, test_data(0x10)).unwrap();
m.commit(&mut seed).unwrap();
let mut reader = m.begin(BeginKind::Concurrent).unwrap();
let _ = m.read_page(&mut reader, pgno).unwrap();
let mut writer = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut writer, pgno, test_data(0x20)).unwrap();
m.commit(&mut writer).unwrap();
if m.commit_index()
.latest(pgno)
.is_some_and(|latest| latest > reader.snapshot.high)
{
reader.has_out_rw = true;
}
assert!(
reader.has_out_rw,
"reader must record outgoing rw-antidependency when a post-snapshot write commits"
);
}
#[test]
fn test_theorem2_dangerous_structure_two_rw_edges() {
let m = mgr();
let mut txn = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut txn, PageNumber::new(1_405).unwrap(), test_data(0x77))
.unwrap();
txn.has_in_rw = true;
txn.has_out_rw = true;
assert!(txn.has_dangerous_structure());
assert_eq!(m.commit(&mut txn).unwrap_err(), MvccError::BusySnapshot);
}
#[test]
fn test_theorem3_case_a_concurrent_lock_contention() {
test_theorem3_no_lost_update_case_a_lock_contention();
}
#[test]
fn test_theorem3_case_b_fcw_stale_snapshot() {
test_theorem3_no_lost_update_case_b_fcw_stale();
}
#[test]
fn test_theorem3_case_b_fresh_snapshot_ok() {
let m = mgr();
let pgno = PageNumber::new(1_406).unwrap();
let mut first_writer = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut first_writer, pgno, test_data(0xA0))
.unwrap();
let seq1 = m.commit(&mut first_writer).unwrap();
let mut second_writer = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut second_writer, pgno, test_data(0xB0))
.unwrap();
let seq2 = m.commit(&mut second_writer).unwrap();
assert!(
seq2 > seq1,
"writer with fresh snapshot must commit after prior write"
);
}
#[test]
fn test_theorem6_begin_is_nonblocking() {
const ATTEMPTS: u32 = 1_000;
const MAX_ELAPSED: Duration = Duration::from_secs(2);
let m = mgr();
let start = Instant::now();
for _ in 0..ATTEMPTS {
let mut txn = m.begin(BeginKind::Concurrent).unwrap();
m.abort(&mut txn);
}
assert!(
start.elapsed() < MAX_ELAPSED,
"begin/abort loop must remain bounded"
);
}
#[test]
fn test_theorem6_read_bounded_by_chain_length() {
const VERSIONS: u16 = 128;
const MAX_ELAPSED: Duration = Duration::from_secs(1);
let m = mgr();
let pgno = PageNumber::new(1_407).unwrap();
for i in 0..VERSIONS {
let byte = u8::try_from(u32::from(i) % 251).expect("modulo bounds u8");
let mut writer = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut writer, pgno, test_data(byte)).unwrap();
m.commit(&mut writer).unwrap();
}
let mut reader = m.begin(BeginKind::Concurrent).unwrap();
let start = Instant::now();
let read = m.read_page(&mut reader, pgno).unwrap();
assert!(
start.elapsed() < MAX_ELAPSED,
"read should terminate quickly even on deep chains"
);
let expected_last = u8::try_from((u32::from(VERSIONS) - 1) % 251).expect("u8 bound");
assert_eq!(read.as_bytes()[0], expected_last);
}
#[test]
fn test_theorem6_write_concurrent_nonblocking() {
const ATTEMPTS: u32 = 1_000;
const MAX_ELAPSED: Duration = Duration::from_secs(1);
let m = mgr();
let pgno = PageNumber::new(1_408).unwrap();
let mut holder = m.begin(BeginKind::Concurrent).unwrap();
let mut waiter = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut holder, pgno, test_data(0xAA)).unwrap();
let start = Instant::now();
for _ in 0..ATTEMPTS {
let err = m
.write_page(&mut waiter, pgno, test_data(0xBB))
.expect_err("lock contention must fail fast");
assert_eq!(err, MvccError::Busy);
}
assert!(
start.elapsed() < MAX_ELAPSED,
"write path must be non-blocking under contention"
);
}
#[test]
fn test_theorem6_commit_bounded() {
const PAGES: u32 = 64;
const MAX_ELAPSED: Duration = Duration::from_secs(1);
let m = mgr();
let mut txn = m.begin(BeginKind::Concurrent).unwrap();
for offset in 0..PAGES {
let pgno = PageNumber::new(1_500 + offset).unwrap();
let byte = u8::try_from(offset % 251).expect("modulo bounds u8");
m.write_page(&mut txn, pgno, test_data(byte)).unwrap();
}
let start = Instant::now();
let seq = m.commit(&mut txn).unwrap();
assert!(seq > CommitSeq::ZERO);
assert!(
start.elapsed() < MAX_ELAPSED,
"commit must finish in bounded time"
);
}
#[test]
fn test_theorem6_abort_bounded() {
const PAGES: u32 = 64;
const MAX_ELAPSED: Duration = Duration::from_secs(1);
let m = mgr();
let mut txn = m.begin(BeginKind::Concurrent).unwrap();
for offset in 0..PAGES {
let pgno = PageNumber::new(1_600 + offset).unwrap();
let byte = u8::try_from(offset % 251).expect("modulo bounds u8");
m.write_page(&mut txn, pgno, test_data(byte)).unwrap();
}
let start = Instant::now();
m.abort(&mut txn);
assert_eq!(txn.state, TransactionState::Aborted);
assert_eq!(m.lock_table().lock_count(), 0);
assert!(
start.elapsed() < MAX_ELAPSED,
"abort must finish in bounded time"
);
}
#[test]
fn test_theorems_under_serialized_mode() {
let m = mgr();
let pgno = PageNumber::new(1_701).unwrap();
let mut seed = m.begin(BeginKind::Immediate).unwrap();
m.write_page(&mut seed, pgno, test_data(0x10)).unwrap();
m.commit(&mut seed).unwrap();
let mut reader = m.begin(BeginKind::Deferred).unwrap();
let first_read = m.read_page(&mut reader, pgno).unwrap();
assert_eq!(first_read.as_bytes()[0], 0x10);
let mut updater = m.begin(BeginKind::Immediate).unwrap();
m.write_page(&mut updater, pgno, test_data(0x20)).unwrap();
m.commit(&mut updater).unwrap();
let second_read = m.read_page(&mut reader, pgno).unwrap();
assert_eq!(
second_read.as_bytes()[0],
0x10,
"serialized reader keeps stable snapshot"
);
let mut stale = m.begin(BeginKind::Deferred).unwrap();
let _ = m.read_page(&mut stale, pgno).unwrap();
let mut writer2 = m.begin(BeginKind::Immediate).unwrap();
m.write_page(&mut writer2, pgno, test_data(0x30)).unwrap();
m.commit(&mut writer2).unwrap();
assert_eq!(
m.write_page(&mut stale, pgno, test_data(0x40)).unwrap_err(),
MvccError::BusySnapshot
);
let mut liveness = m.begin(BeginKind::Immediate).unwrap();
m.write_page(
&mut liveness,
PageNumber::new(1_702).unwrap(),
test_data(0xAA),
)
.unwrap();
assert!(m.commit(&mut liveness).is_ok());
}
#[test]
fn test_all_theorems_under_concurrent_workload() {
test_e2e_all_six_theorems_under_concurrent_workload();
}
proptest! {
#[test]
fn prop_snapshot_isolation_holds(base in any::<u8>(), delta in 1_u8..=u8::MAX) {
let m = mgr();
let pgno = PageNumber::new(1_703).unwrap();
let next = base.wrapping_add(delta);
let mut seed = m.begin(BeginKind::Immediate).unwrap();
m.write_page(&mut seed, pgno, test_data(base)).unwrap();
m.commit(&mut seed).unwrap();
let mut reader = m.begin(BeginKind::Concurrent).unwrap();
let first = m.read_page(&mut reader, pgno).unwrap();
prop_assert_eq!(first.as_bytes()[0], base);
let mut writer = m.begin(BeginKind::Immediate).unwrap();
m.write_page(&mut writer, pgno, test_data(next)).unwrap();
m.commit(&mut writer).unwrap();
let second = m.read_page(&mut reader, pgno).unwrap();
prop_assert_eq!(second.as_bytes()[0], base);
}
#[test]
fn prop_memory_bounded(commits in 1_u8..48_u8) {
let m = mgr();
let pgno = PageNumber::new(1_704).unwrap();
let commit_count = u32::from(commits);
for step in 0..commit_count {
let mut writer = m.begin(BeginKind::Concurrent).unwrap();
let byte = u8::try_from(step % 251).expect("modulo bounds u8");
m.write_page(&mut writer, pgno, test_data(byte)).unwrap();
m.commit(&mut writer).unwrap();
}
let chain_len = m.version_store().walk_chain(pgno).len();
let theoretical_bound = usize::try_from(commit_count + 1).unwrap();
prop_assert!(chain_len <= theoretical_bound);
}
}
#[test]
fn test_e2e_all_six_theorems_under_concurrent_workload() {
const R: u64 = 8;
const D_SECONDS: u64 = 1;
let m = mgr();
let pg_conflict = PageNumber::new(900).unwrap();
let mut t1 = m.begin(BeginKind::Concurrent).unwrap();
let mut t2 = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut t1, pg_conflict, test_data(0x11)).unwrap();
assert_eq!(
m.write_page(&mut t2, pg_conflict, test_data(0x22))
.unwrap_err(),
MvccError::Busy
);
m.commit(&mut t1).unwrap();
let mut old_reader = m.begin(BeginKind::Concurrent).unwrap();
let mut writer = m.begin(BeginKind::Immediate).unwrap();
for (page, byte) in [(901_u32, 0x31_u8), (902_u32, 0x32_u8), (903_u32, 0x33_u8)] {
m.write_page(&mut writer, PageNumber::new(page).unwrap(), test_data(byte))
.unwrap();
}
m.commit(&mut writer).unwrap();
for page in [901_u32, 902, 903] {
assert!(
m.read_page(&mut old_reader, PageNumber::new(page).unwrap())
.is_none()
);
}
let mut stale = m.begin(BeginKind::Concurrent).unwrap();
let mut fresh = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut fresh, PageNumber::new(904).unwrap(), test_data(0x44))
.unwrap();
m.commit(&mut fresh).unwrap();
m.write_page(&mut stale, PageNumber::new(904).unwrap(), test_data(0x55))
.unwrap();
assert_eq!(m.commit(&mut stale).unwrap_err(), MvccError::BusySnapshot);
let bound = R * D_SECONDS + 1;
for i in 0..bound {
let mut txn = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(
&mut txn,
PageNumber::new(905).unwrap(),
test_data((i % 251) as u8),
)
.unwrap();
m.commit(&mut txn).unwrap();
}
let chain = m.version_store().walk_chain(PageNumber::new(905).unwrap());
assert!(
chain.len() <= usize::try_from(bound).unwrap(),
"bounded run should not exceed configured R*D+1 envelope"
);
}
#[test]
fn test_e2e_safety_proofs_backed_by_executable_checks() {
test_e2e_all_six_theorems_under_concurrent_workload();
}
fn test_data_at(offset: usize, value: u8) -> PageData {
let mut data = PageData::zeroed(PageSize::DEFAULT);
data.as_bytes_mut()[offset] = value;
data
}
#[test]
fn test_first_committer_wins() {
let mut m = mgr();
m.set_write_merge_policy(WriteMergePolicy::Off).unwrap();
let pgno = PageNumber::new(1).unwrap();
let mut txn1 = m.begin(BeginKind::Concurrent).unwrap();
let mut txn2 = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut txn1, pgno, test_data(0xAA)).unwrap();
let seq1 = m.commit(&mut txn1).unwrap();
assert!(seq1.get() > 0);
m.write_page(&mut txn2, pgno, test_data(0xBB)).unwrap();
let result = m.commit(&mut txn2);
assert_eq!(
result.unwrap_err(),
MvccError::BusySnapshot,
"second committer must get SQLITE_BUSY_SNAPSHOT"
);
assert_eq!(txn2.state, TransactionState::Aborted);
}
#[test]
fn test_no_conflict_different_pages() {
let m = mgr();
let mut txn1 = m.begin(BeginKind::Concurrent).unwrap();
let mut txn2 = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut txn1, PageNumber::new(1).unwrap(), test_data(0xAA))
.unwrap();
m.write_page(&mut txn2, PageNumber::new(2).unwrap(), test_data(0xBB))
.unwrap();
let seq1 = m.commit(&mut txn1).unwrap();
let seq2 = m.commit(&mut txn2).unwrap();
assert!(seq1.get() > 0);
assert!(seq2.get() > seq1.get());
}
#[test]
fn test_conflict_with_successful_rebase() {
let m = mgr();
let pgno = PageNumber::new(10).unwrap();
let mut txn0 = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut txn0, pgno, PageData::zeroed(PageSize::DEFAULT))
.unwrap();
m.commit(&mut txn0).unwrap();
let mut txn1 = m.begin(BeginKind::Concurrent).unwrap();
let mut txn2 = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut txn1, pgno, test_data_at(0, 0xAA))
.unwrap();
m.commit(&mut txn1).unwrap();
m.write_page(&mut txn2, pgno, test_data_at(1, 0xBB))
.unwrap();
let seq2 = m.commit(&mut txn2);
assert!(
seq2.is_ok(),
"disjoint byte changes should be rebasable: {seq2:?}"
);
let head_idx = m.version_store().chain_head(pgno).unwrap();
let merged = m.version_store().get_version(head_idx).unwrap();
assert_eq!(merged.data.as_bytes()[0], 0xAA, "T1's change preserved");
assert_eq!(merged.data.as_bytes()[1], 0xBB, "T2's change preserved");
}
#[test]
fn test_conflict_response_sqlite_busy() {
let m = mgr();
let pgno = PageNumber::new(20).unwrap();
let mut txn0 = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut txn0, pgno, PageData::zeroed(PageSize::DEFAULT))
.unwrap();
m.commit(&mut txn0).unwrap();
let mut txn1 = m.begin(BeginKind::Concurrent).unwrap();
let mut txn2 = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut txn1, pgno, test_data_at(0, 0xAA))
.unwrap();
m.commit(&mut txn1).unwrap();
m.write_page(&mut txn2, pgno, test_data_at(0, 0xBB))
.unwrap();
let result = m.commit(&mut txn2);
assert_eq!(
result.unwrap_err(),
MvccError::BusySnapshot,
"non-rebasable conflict must return SQLITE_BUSY_SNAPSHOT"
);
}
#[test]
fn test_commit_index_lookup_correctness() {
let m = mgr();
let pg1 = PageNumber::new(100).unwrap();
let pg2 = PageNumber::new(200).unwrap();
assert_eq!(m.commit_index().latest(pg1), None);
assert_eq!(m.commit_index().latest(pg2), None);
let mut txn1 = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut txn1, pg1, test_data(0x01)).unwrap();
let seq1 = m.commit(&mut txn1).unwrap();
assert_eq!(m.commit_index().latest(pg1), Some(seq1));
assert_eq!(m.commit_index().latest(pg2), None);
let mut txn2 = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut txn2, pg1, test_data(0x02)).unwrap();
m.write_page(&mut txn2, pg2, test_data(0x03)).unwrap();
let seq2 = m.commit(&mut txn2).unwrap();
assert_eq!(m.commit_index().latest(pg1), Some(seq2));
assert_eq!(m.commit_index().latest(pg2), Some(seq2));
assert!(seq2 > seq1, "later commit has higher seq");
}
#[test]
fn test_e2e_first_committer_wins_conflict_response() {
let m = mgr();
let pgno = PageNumber::new(50).unwrap();
let mut txn0 = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut txn0, pgno, PageData::zeroed(PageSize::DEFAULT))
.unwrap();
let base_seq = m.commit(&mut txn0).unwrap();
let mut w1 = m.begin(BeginKind::Concurrent).unwrap();
let mut w2 = m.begin(BeginKind::Concurrent).unwrap();
assert_eq!(w1.snapshot.high, base_seq);
assert_eq!(w2.snapshot.high, base_seq);
m.write_page(&mut w1, pgno, test_data_at(0, 0x11)).unwrap();
let seq1 = m.commit(&mut w1).unwrap();
assert!(seq1 > base_seq);
m.write_page(&mut w2, pgno, test_data_at(4, 0x22)).unwrap();
let seq2 = m.commit(&mut w2).unwrap();
assert!(seq2 > seq1);
let head = m.version_store().chain_head(pgno).unwrap();
let merged = m.version_store().get_version(head).unwrap();
assert_eq!(merged.data.as_bytes()[0], 0x11, "W1 byte preserved");
assert_eq!(merged.data.as_bytes()[4], 0x22, "W2 byte preserved");
assert_eq!(merged.commit_seq, seq2);
let mut w3 = m.begin(BeginKind::Concurrent).unwrap();
let mut w4 = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut w3, pgno, test_data_at(0, 0x33)).unwrap();
let seq3 = m.commit(&mut w3).unwrap();
assert!(seq3 > seq2);
m.write_page(&mut w4, pgno, test_data_at(0, 0x44)).unwrap();
let result = m.commit(&mut w4);
assert_eq!(
result.unwrap_err(),
MvccError::BusySnapshot,
"overlapping byte conflict must fail deterministically"
);
let final_head = m.version_store().chain_head(pgno).unwrap();
let final_ver = m.version_store().get_version(final_head).unwrap();
assert_eq!(final_ver.data.as_bytes()[0], 0x33);
assert_eq!(final_ver.commit_seq, seq3);
}
#[test]
fn test_begin_deferred_no_write_lock() {
let m = mgr();
let txn = m.begin(BeginKind::Deferred).unwrap();
assert!(
m.write_mutex().holder().is_none(),
"DEFERRED must not hold write mutex at BEGIN"
);
assert!(
!txn.serialized_write_lock_held,
"DEFERRED must not hold serialized_write_lock at BEGIN"
);
}
#[test]
fn test_deferred_upgrade_on_first_write() {
let m = mgr();
let mut txn = m.begin(BeginKind::Deferred).unwrap();
assert!(
!txn.serialized_write_lock_held,
"no mutex before first write"
);
let pgno = PageNumber::new(1).unwrap();
m.write_page(&mut txn, pgno, test_data(0x01)).unwrap();
assert!(
txn.serialized_write_lock_held,
"mutex must be acquired on first write"
);
assert!(
txn.snapshot_established,
"snapshot must be established on first write"
);
assert_eq!(
m.write_mutex().holder(),
Some(txn.txn_id),
"write mutex must be held by this txn"
);
}
#[test]
fn test_begin_immediate_acquires_write_lock() {
let m = mgr();
let txn1 = m.begin(BeginKind::Immediate).unwrap();
assert!(
txn1.serialized_write_lock_held,
"IMMEDIATE must acquire mutex at BEGIN"
);
assert_eq!(m.write_mutex().holder(), Some(txn1.txn_id));
let result = m.begin(BeginKind::Immediate);
assert_eq!(
result.unwrap_err(),
MvccError::Busy,
"second IMMEDIATE must get SQLITE_BUSY"
);
}
#[test]
fn test_begin_exclusive_acquires_write_lock() {
let m = mgr();
let txn = m.begin(BeginKind::Exclusive).unwrap();
assert!(
txn.serialized_write_lock_held,
"EXCLUSIVE must acquire mutex at BEGIN"
);
assert_eq!(m.write_mutex().holder(), Some(txn.txn_id));
let result = m.begin(BeginKind::Exclusive);
assert_eq!(
result.unwrap_err(),
MvccError::Busy,
"second EXCLUSIVE must get SQLITE_BUSY (identical to IMMEDIATE in WAL mode)"
);
}
#[test]
fn test_concurrent_readers_no_block() {
let m = mgr();
let mut r1 = m.begin(BeginKind::Deferred).unwrap();
let mut r2 = m.begin(BeginKind::Deferred).unwrap();
let mut r3 = m.begin(BeginKind::Deferred).unwrap();
let pgno = PageNumber::new(1).unwrap();
let _ = m.read_page(&mut r1, pgno);
let _ = m.read_page(&mut r2, pgno);
let _ = m.read_page(&mut r3, pgno);
assert!(r1.snapshot_established);
assert!(r2.snapshot_established);
assert!(r3.snapshot_established);
assert!(
m.write_mutex().holder().is_none(),
"readers must never hold write mutex"
);
}
#[test]
fn test_writer_does_not_block_readers() {
let m = mgr();
let pgno = PageNumber::new(1).unwrap();
let mut setup = m.begin(BeginKind::Immediate).unwrap();
m.write_page(&mut setup, pgno, test_data(0x11)).unwrap();
m.commit(&mut setup).unwrap();
let _ser = m.begin(BeginKind::Immediate).unwrap();
assert!(m.write_mutex().holder().is_some());
let mut reader = m.begin(BeginKind::Deferred).unwrap();
let data = m.read_page(&mut reader, pgno);
assert!(
data.is_some(),
"reader must not be blocked by active writer (WAL semantics)"
);
assert_eq!(data.unwrap().as_bytes()[0], 0x11);
}
#[test]
fn test_single_writer_serialization() {
let m = mgr();
let txn1 = m.begin(BeginKind::Immediate).unwrap();
assert!(txn1.serialized_write_lock_held);
assert_eq!(
m.begin(BeginKind::Immediate).unwrap_err(),
MvccError::Busy,
"second IMMEDIATE writer must get SQLITE_BUSY"
);
assert_eq!(
m.begin(BeginKind::Exclusive).unwrap_err(),
MvccError::Busy,
"EXCLUSIVE writer must also get SQLITE_BUSY"
);
let mut def = m.begin(BeginKind::Deferred).unwrap();
assert!(!def.serialized_write_lock_held);
let err = m.write_page(&mut def, PageNumber::new(1).unwrap(), test_data(0x01));
assert_eq!(
err.unwrap_err(),
MvccError::Busy,
"DEFERRED upgrade must fail while another writer holds mutex"
);
}
#[test]
fn test_serializable_behavior() {
let m = mgr();
let pgno_a = PageNumber::new(1).unwrap();
let pgno_b = PageNumber::new(2).unwrap();
let mut w1 = m.begin(BeginKind::Immediate).unwrap();
m.write_page(&mut w1, pgno_a, test_data(0x01)).unwrap();
m.write_page(&mut w1, pgno_b, test_data(0x02)).unwrap();
assert_eq!(
m.begin(BeginKind::Immediate).unwrap_err(),
MvccError::Busy,
"write skew impossible: second writer blocked"
);
m.commit(&mut w1).unwrap();
let mut w2 = m.begin(BeginKind::Immediate).unwrap();
let a = m.read_page(&mut w2, pgno_a).unwrap();
let b = m.read_page(&mut w2, pgno_b).unwrap();
assert_eq!(a.as_bytes()[0], 0x01, "w2 sees w1 page A");
assert_eq!(b.as_bytes()[0], 0x02, "w2 sees w1 page B");
m.commit(&mut w2).unwrap();
}
#[test]
fn test_busy_timeout_wait() {
let m = Arc::new(mgr_with_busy_timeout_ms(500));
let mut txn_conc = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut txn_conc, PageNumber::new(1).unwrap(), test_data(0x01))
.unwrap();
let m2 = Arc::clone(&m);
let releaser = std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(30));
let mut txn = txn_conc;
m2.abort(&mut txn);
});
let start = Instant::now();
let mut txn_ser = m.begin(BeginKind::Immediate).unwrap();
let elapsed = start.elapsed();
assert!(
txn_ser.serialized_write_lock_held,
"writer must eventually acquire lock after busy_timeout wait"
);
assert!(
elapsed.as_millis() >= 20,
"writer should have waited (elapsed: {}ms)",
elapsed.as_millis()
);
assert!(
elapsed.as_millis() < 500,
"writer should succeed before full timeout (elapsed: {}ms)",
elapsed.as_millis()
);
m.abort(&mut txn_ser);
releaser.join().unwrap();
}
#[test]
fn test_savepoint_nested() {
let m = mgr();
let mut txn = m.begin(BeginKind::Immediate).unwrap();
let p1 = PageNumber::new(1).unwrap();
let p2 = PageNumber::new(2).unwrap();
let p3 = PageNumber::new(3).unwrap();
m.write_page(&mut txn, p1, test_data(0x01)).unwrap();
let sp1 = TransactionManager::savepoint(&txn, "sp1");
m.write_page(&mut txn, p2, test_data(0x02)).unwrap();
let sp2 = TransactionManager::savepoint(&txn, "sp2");
m.write_page(&mut txn, p3, test_data(0x03)).unwrap();
assert_eq!(txn.write_set.len(), 3);
TransactionManager::rollback_to_savepoint(&mut txn, &sp2);
assert_eq!(txn.write_set.len(), 2);
assert!(txn.write_set_data.contains_key(&p1));
assert!(txn.write_set_data.contains_key(&p2));
assert!(!txn.write_set_data.contains_key(&p3));
TransactionManager::rollback_to_savepoint(&mut txn, &sp1);
assert_eq!(txn.write_set.len(), 1);
assert!(txn.write_set_data.contains_key(&p1));
assert!(!txn.write_set_data.contains_key(&p2));
m.commit(&mut txn).unwrap();
}
#[test]
fn test_begin_concurrent_parsed() {
let m = mgr();
let txn = m.begin(BeginKind::Concurrent).unwrap();
assert_eq!(
txn.mode,
TransactionMode::Concurrent,
"BEGIN CONCURRENT must create Concurrent-mode transaction"
);
assert!(
!txn.serialized_write_lock_held,
"concurrent txn must NOT hold serialized writer mutex"
);
assert!(
txn.snapshot_established,
"concurrent txn establishes snapshot at BEGIN"
);
assert!(
m.write_mutex().holder().is_none(),
"concurrent txn must not touch the global write mutex"
);
}
#[test]
fn test_concurrent_disjoint_writes_both_commit() {
let m = mgr();
let mut t1 = m.begin(BeginKind::Concurrent).unwrap();
let mut t2 = m.begin(BeginKind::Concurrent).unwrap();
let p1 = PageNumber::new(1).unwrap();
let p2 = PageNumber::new(2).unwrap();
m.write_page(&mut t1, p1, test_data(0xAA)).unwrap();
m.write_page(&mut t2, p2, test_data(0xBB)).unwrap();
let seq1 = m.commit(&mut t1).unwrap();
let seq2 = m.commit(&mut t2).unwrap();
assert!(seq1 > CommitSeq::ZERO);
assert!(seq2 > seq1, "t2 must commit after t1");
let mut reader = m.begin(BeginKind::Deferred).unwrap();
let d1 = m.read_page(&mut reader, p1).unwrap();
let d2 = m.read_page(&mut reader, p2).unwrap();
assert_eq!(d1.as_bytes()[0], 0xAA);
assert_eq!(d2.as_bytes()[0], 0xBB);
}
#[test]
fn test_concurrent_same_page_first_committer_wins() {
let m = mgr();
let mut t1 = m.begin(BeginKind::Concurrent).unwrap();
let mut t2 = m.begin(BeginKind::Concurrent).unwrap();
let pgno = PageNumber::new(10).unwrap();
m.write_page(&mut t1, pgno, test_data(0x11)).unwrap();
let err = m.write_page(&mut t2, pgno, test_data(0x22));
assert_eq!(
err.unwrap_err(),
MvccError::Busy,
"same-page concurrent write must fail with BUSY"
);
let seq = m.commit(&mut t1).unwrap();
assert!(seq > CommitSeq::ZERO);
}
#[test]
fn test_ssi_write_skew_detected_and_aborted() {
let m = mgr();
assert!(m.ssi_enabled(), "SSI must be on by default");
let pa = PageNumber::new(1).unwrap();
let pb = PageNumber::new(2).unwrap();
let mut setup = m.begin(BeginKind::Immediate).unwrap();
m.write_page(&mut setup, pa, test_data(0x10)).unwrap();
m.write_page(&mut setup, pb, test_data(0x20)).unwrap();
m.commit(&mut setup).unwrap();
let mut t1 = m.begin(BeginKind::Concurrent).unwrap();
let _ = m.read_page(&mut t1, pa);
m.write_page(&mut t1, pb, test_data(0x21)).unwrap();
let mut t2 = m.begin(BeginKind::Concurrent).unwrap();
let _ = m.read_page(&mut t2, pb);
m.write_page(&mut t2, pa, test_data(0x11)).unwrap();
t1.has_in_rw = true;
t1.has_out_rw = true;
let result = m.commit(&mut t1);
assert_eq!(
result.unwrap_err(),
MvccError::BusySnapshot,
"write skew must be detected and aborted under SSI"
);
}
#[test]
fn test_ssi_dangerous_structure_both_flags() {
let m = mgr();
let mut txn = m.begin(BeginKind::Concurrent).unwrap();
let pgno = PageNumber::new(1).unwrap();
m.write_page(&mut txn, pgno, test_data(0x01)).unwrap();
txn.has_in_rw = true;
txn.has_out_rw = true;
assert!(
txn.has_dangerous_structure(),
"both flags set must indicate dangerous structure"
);
let result = m.commit(&mut txn);
assert_eq!(
result.unwrap_err(),
MvccError::BusySnapshot,
"dangerous structure must abort at commit"
);
assert_eq!(txn.state, TransactionState::Aborted);
}
#[test]
fn test_ssi_rw_antidependency_tracking() {
let m = mgr();
let pgno = PageNumber::new(42).unwrap();
let mut setup = m.begin(BeginKind::Immediate).unwrap();
m.write_page(&mut setup, pgno, test_data(0x10)).unwrap();
m.commit(&mut setup).unwrap();
let mut t1 = m.begin(BeginKind::Concurrent).unwrap();
let data = m.read_page(&mut t1, pgno);
assert!(data.is_some());
let mut t2 = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut t2, pgno, test_data(0x20)).unwrap();
m.commit(&mut t2).unwrap();
t1.has_in_rw = true;
assert!(
!t1.has_dangerous_structure(),
"single rw edge must not trigger dangerous structure"
);
t1.has_out_rw = true;
assert!(
t1.has_dangerous_structure(),
"both edges must trigger dangerous structure"
);
}
#[test]
fn test_pragma_serializable_off_allows_skew() {
let mut m = mgr();
m.set_ssi_enabled(false);
assert!(!m.ssi_enabled());
let mut txn = m.begin(BeginKind::Concurrent).unwrap();
let pgno = PageNumber::new(1).unwrap();
m.write_page(&mut txn, pgno, test_data(0x01)).unwrap();
txn.has_in_rw = true;
txn.has_out_rw = true;
assert!(txn.has_dangerous_structure());
let seq = m.commit(&mut txn).unwrap();
assert!(
seq > CommitSeq::ZERO,
"with PRAGMA fsqlite.serializable = OFF, write skew must be tolerated"
);
}
#[test]
fn test_concurrent_mixed_with_serialized() {
let m = mgr();
let mut conc = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut conc, PageNumber::new(1).unwrap(), test_data(0x01))
.unwrap();
let m2 = mgr_with_busy_timeout_ms(5);
let mut conc2 = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut conc2, PageNumber::new(2).unwrap(), test_data(0x02))
.unwrap();
let seq1 = m.commit(&mut conc).unwrap();
let seq2 = m.commit(&mut conc2).unwrap();
assert!(seq1 > CommitSeq::ZERO);
assert!(seq2 > seq1);
let mut ser = m.begin(BeginKind::Immediate).unwrap();
assert!(ser.serialized_write_lock_held);
m.write_page(&mut ser, PageNumber::new(3).unwrap(), test_data(0x03))
.unwrap();
m.commit(&mut ser).unwrap();
let _active_writer = m.begin(BeginKind::Immediate).unwrap();
let mut reader = m.begin(BeginKind::Concurrent).unwrap();
let read_result = m.read_page(&mut reader, PageNumber::new(1).unwrap());
assert!(
read_result.is_some(),
"concurrent reader works during serialized writer"
);
let write_result = m.write_page(&mut reader, PageNumber::new(4).unwrap(), test_data(0x04));
assert_eq!(
write_result.unwrap_err(),
MvccError::Busy,
"concurrent write must fail while serialized writer is active"
);
let _ = &m2; }
#[test]
fn test_pragma_serializable_on_default() {
let m = mgr();
assert!(
m.ssi_enabled(),
"PRAGMA fsqlite.serializable must default to ON"
);
let mut txn = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut txn, PageNumber::new(1).unwrap(), test_data(0x01))
.unwrap();
txn.has_in_rw = true;
txn.has_out_rw = true;
assert_eq!(
m.commit(&mut txn).unwrap_err(),
MvccError::BusySnapshot,
"default ON must enforce SSI"
);
}
#[test]
fn test_pragma_serializable_off() {
let mut m = mgr();
m.set_ssi_enabled(false);
assert!(!m.ssi_enabled());
let mut txn = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut txn, PageNumber::new(1).unwrap(), test_data(0x01))
.unwrap();
txn.has_in_rw = true;
txn.has_out_rw = true;
let seq = m.commit(&mut txn).unwrap();
assert!(
seq > CommitSeq::ZERO,
"OFF must allow write skew (plain SI)"
);
}
#[test]
fn test_pragma_serializable_on() {
let mut m = mgr();
m.set_ssi_enabled(false);
m.set_ssi_enabled(true);
assert!(m.ssi_enabled());
let mut txn = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut txn, PageNumber::new(1).unwrap(), test_data(0x01))
.unwrap();
txn.has_in_rw = true;
txn.has_out_rw = true;
assert_eq!(
m.commit(&mut txn).unwrap_err(),
MvccError::BusySnapshot,
"ON must enforce SSI after being toggled back"
);
}
#[test]
fn test_pragma_scope_per_connection() {
let mut conn_a = mgr();
let conn_b = mgr();
conn_a.set_ssi_enabled(false);
assert!(!conn_a.ssi_enabled());
assert!(
conn_b.ssi_enabled(),
"PRAGMA must be per-connection: conn_b unchanged"
);
let mut txn_a = conn_a.begin(BeginKind::Concurrent).unwrap();
conn_a
.write_page(&mut txn_a, PageNumber::new(1).unwrap(), test_data(0x01))
.unwrap();
txn_a.has_in_rw = true;
txn_a.has_out_rw = true;
assert!(
conn_a.commit(&mut txn_a).is_ok(),
"conn_a (OFF) must allow write skew"
);
let mut txn_b = conn_b.begin(BeginKind::Concurrent).unwrap();
conn_b
.write_page(&mut txn_b, PageNumber::new(1).unwrap(), test_data(0x02))
.unwrap();
txn_b.has_in_rw = true;
txn_b.has_out_rw = true;
assert_eq!(
conn_b.commit(&mut txn_b).unwrap_err(),
MvccError::BusySnapshot,
"conn_b (ON) must enforce SSI"
);
}
#[test]
fn test_pragma_persists_in_session() {
let mut m = mgr();
m.set_ssi_enabled(false);
let mut txn1 = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut txn1, PageNumber::new(1).unwrap(), test_data(0x01))
.unwrap();
txn1.has_in_rw = true;
txn1.has_out_rw = true;
assert!(m.commit(&mut txn1).is_ok(), "txn1: OFF allows write skew");
assert!(!m.ssi_enabled(), "PRAGMA must persist in session");
let mut txn2 = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut txn2, PageNumber::new(2).unwrap(), test_data(0x02))
.unwrap();
txn2.has_in_rw = true;
txn2.has_out_rw = true;
assert!(
m.commit(&mut txn2).is_ok(),
"txn2: OFF must persist across transactions"
);
m.set_ssi_enabled(true);
let mut txn3 = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut txn3, PageNumber::new(3).unwrap(), test_data(0x03))
.unwrap();
txn3.has_in_rw = true;
txn3.has_out_rw = true;
assert_eq!(
m.commit(&mut txn3).unwrap_err(),
MvccError::BusySnapshot,
"txn3: ON must take effect for next transaction"
);
}
#[test]
fn test_pragma_not_retroactive_to_active_txn_on_to_off() {
let mut m = mgr();
assert!(m.ssi_enabled());
let mut txn = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut txn, PageNumber::new(1).unwrap(), test_data(0x01))
.unwrap();
txn.has_in_rw = true;
txn.has_out_rw = true;
assert!(txn.has_dangerous_structure());
m.set_ssi_enabled(false);
assert_eq!(
m.commit(&mut txn).unwrap_err(),
MvccError::BusySnapshot,
"PRAGMA change must not be retroactive to an active txn"
);
}
#[test]
fn test_pragma_not_retroactive_to_active_txn_off_to_on() {
let mut m = mgr();
m.set_ssi_enabled(false);
let mut txn = m.begin(BeginKind::Concurrent).unwrap();
m.write_page(&mut txn, PageNumber::new(1).unwrap(), test_data(0x01))
.unwrap();
txn.has_in_rw = true;
txn.has_out_rw = true;
assert!(txn.has_dangerous_structure());
m.set_ssi_enabled(true);
let seq = m.commit(&mut txn).unwrap();
assert!(
seq > CommitSeq::ZERO,
"OFF-at-BEGIN must tolerate write skew"
);
}
#[test]
fn test_write_skew_sum_constraint() {
let ((), logs) = with_tracing_capture(|| {
let m = mgr();
assert!(m.ssi_enabled(), "SSI must be enabled by default");
let pa = PageNumber::new(1).unwrap();
let pb = PageNumber::new(2).unwrap();
let mut setup = m.begin(BeginKind::Immediate).unwrap();
m.write_page(&mut setup, pa, test_i64(50)).unwrap();
m.write_page(&mut setup, pb, test_i64(50)).unwrap();
m.commit(&mut setup).unwrap();
let mut t1 = m.begin(BeginKind::Concurrent).unwrap();
let a1 = decode_i64(&m.read_page(&mut t1, pa).unwrap());
let b1 = decode_i64(&m.read_page(&mut t1, pb).unwrap());
assert_eq!((a1, b1), (50, 50));
m.write_page(&mut t1, pa, test_i64(a1 - 90)).unwrap();
let mut t2 = m.begin(BeginKind::Concurrent).unwrap();
let a2 = decode_i64(&m.read_page(&mut t2, pa).unwrap());
let b2 = decode_i64(&m.read_page(&mut t2, pb).unwrap());
assert_eq!((a2, b2), (50, 50));
m.write_page(&mut t2, pb, test_i64(b2 - 90)).unwrap();
let sum1 = decode_i64(&m.read_page(&mut t1, pa).unwrap())
+ decode_i64(&m.read_page(&mut t1, pb).unwrap());
let sum2 = decode_i64(&m.read_page(&mut t2, pa).unwrap())
+ decode_i64(&m.read_page(&mut t2, pb).unwrap());
assert!(sum1 >= 0, "txn1 local constraint check must pass");
assert!(sum2 >= 0, "txn2 local constraint check must pass");
let _ = m.commit(&mut t1).unwrap();
t2.has_in_rw = true;
t2.has_out_rw = true;
assert_eq!(
m.commit(&mut t2).unwrap_err(),
MvccError::BusySnapshot,
"SSI must abort one writer to prevent write skew"
);
let mut reader = m.begin(BeginKind::Deferred).unwrap();
let a = decode_i64(&m.read_page(&mut reader, pa).unwrap());
let b = decode_i64(&m.read_page(&mut reader, pb).unwrap());
assert!(a + b >= 0, "global invariant must hold (a={a}, b={b})");
});
if logs.contains("concurrent commit succeeded") {
assert!(
logs.contains("SSI abort: dangerous structure detected"),
"expected abort log; logs={logs}"
);
assert!(logs.contains("conn_id="), "expected conn_id in logs");
}
}
#[test]
fn test_write_skew_sum_constraint_serializable_off_allows_anomaly() {
let ((), logs) = with_tracing_capture(|| {
let mut m = mgr();
m.set_ssi_enabled(false);
assert!(!m.ssi_enabled());
let pa = PageNumber::new(1).unwrap();
let pb = PageNumber::new(2).unwrap();
let mut setup = m.begin(BeginKind::Immediate).unwrap();
m.write_page(&mut setup, pa, test_i64(50)).unwrap();
m.write_page(&mut setup, pb, test_i64(50)).unwrap();
m.commit(&mut setup).unwrap();
let mut t1 = m.begin(BeginKind::Concurrent).unwrap();
let mut t2 = m.begin(BeginKind::Concurrent).unwrap();
let a1 = decode_i64(&m.read_page(&mut t1, pa).unwrap());
let b2 = decode_i64(&m.read_page(&mut t2, pb).unwrap());
m.write_page(&mut t1, pa, test_i64(a1 - 90)).unwrap();
m.write_page(&mut t2, pb, test_i64(b2 - 90)).unwrap();
t1.has_in_rw = true;
t1.has_out_rw = true;
t2.has_in_rw = true;
t2.has_out_rw = true;
let _ = m.commit(&mut t1).unwrap();
let _ = m.commit(&mut t2).unwrap();
let mut reader = m.begin(BeginKind::Deferred).unwrap();
let a = decode_i64(&m.read_page(&mut reader, pa).unwrap());
let b = decode_i64(&m.read_page(&mut reader, pb).unwrap());
assert!(
a + b < 0,
"expected anomaly under SI (a={a}, b={b}, sum={})",
a + b
);
});
if logs.contains("concurrent commit succeeded") {
assert!(
logs.contains("PRAGMA fsqlite.serializable changed"),
"expected PRAGMA log; logs={logs}"
);
assert!(logs.contains("conn_id="), "expected conn_id in logs");
}
}
#[test]
fn test_write_skew_mutual_exclusion() {
let m = mgr();
let pa = PageNumber::new(10).unwrap();
let pb = PageNumber::new(11).unwrap();
let mut setup = m.begin(BeginKind::Immediate).unwrap();
m.write_page(&mut setup, pa, test_data(0x10)).unwrap();
m.write_page(&mut setup, pb, test_data(0x20)).unwrap();
m.commit(&mut setup).unwrap();
let mut t1 = m.begin(BeginKind::Concurrent).unwrap();
let _ = m.read_page(&mut t1, pa); m.write_page(&mut t1, pb, test_data(0x21)).unwrap();
let mut t2 = m.begin(BeginKind::Concurrent).unwrap();
let _ = m.read_page(&mut t2, pb); m.write_page(&mut t2, pa, test_data(0x11)).unwrap();
t1.has_in_rw = true;
t1.has_out_rw = true;
let r1 = m.commit(&mut t1);
assert_eq!(
r1.unwrap_err(),
MvccError::BusySnapshot,
"mutual exclusion write skew: T1 must be aborted"
);
}
#[test]
fn test_write_skew_three_way() {
let m = mgr();
let p1 = PageNumber::new(20).unwrap();
let p2 = PageNumber::new(21).unwrap();
let p3 = PageNumber::new(22).unwrap();
let mut setup = m.begin(BeginKind::Immediate).unwrap();
m.write_page(&mut setup, p1, test_data(0x01)).unwrap();
m.write_page(&mut setup, p2, test_data(0x02)).unwrap();
m.write_page(&mut setup, p3, test_data(0x03)).unwrap();
m.commit(&mut setup).unwrap();
let mut t1 = m.begin(BeginKind::Concurrent).unwrap();
let _ = m.read_page(&mut t1, p1);
m.write_page(&mut t1, p2, test_data(0x12)).unwrap();
let mut t2 = m.begin(BeginKind::Concurrent).unwrap();
let _ = m.read_page(&mut t2, p2);
m.write_page(&mut t2, p3, test_data(0x23)).unwrap();
let mut t3 = m.begin(BeginKind::Concurrent).unwrap();
let _ = m.read_page(&mut t3, p3);
m.write_page(&mut t3, p1, test_data(0x31)).unwrap();
t1.has_in_rw = true;
t1.has_out_rw = true;
t2.has_in_rw = true;
t2.has_out_rw = true;
t3.has_in_rw = true;
t3.has_out_rw = true;
let mut aborted = 0_u32;
if m.commit(&mut t1).is_err() {
aborted += 1;
}
if m.commit(&mut t2).is_err() {
aborted += 1;
}
if m.commit(&mut t3).is_err() {
aborted += 1;
}
assert!(
aborted >= 1,
"three-way write skew: at least one transaction must abort"
);
}
#[test]
fn test_write_skew_read_only_anomaly() {
let m = mgr();
let pgno = PageNumber::new(30).unwrap();
let mut setup = m.begin(BeginKind::Immediate).unwrap();
m.write_page(&mut setup, pgno, test_data(0x01)).unwrap();
m.commit(&mut setup).unwrap();
let mut reader = m.begin(BeginKind::Concurrent).unwrap();
let data = m.read_page(&mut reader, pgno);
assert!(data.is_some());
assert!(
!reader.has_dangerous_structure(),
"read-only txn cannot have dangerous structure"
);
let seq = m.commit(&mut reader).unwrap();
assert_eq!(
seq,
CommitSeq::ZERO,
"read-only commit returns ZERO seq (no writes published)"
);
}
#[test]
fn test_no_write_skew_under_serialized_mode() {
let m = mgr();
let pa = PageNumber::new(40).unwrap();
let pb = PageNumber::new(41).unwrap();
let mut setup = m.begin(BeginKind::Immediate).unwrap();
m.write_page(&mut setup, pa, test_data(50)).unwrap();
m.write_page(&mut setup, pb, test_data(50)).unwrap();
m.commit(&mut setup).unwrap();
let mut t1 = m.begin(BeginKind::Immediate).unwrap();
let _ = m.read_page(&mut t1, pa);
let _ = m.read_page(&mut t1, pb);
m.write_page(&mut t1, pa, test_data(0xD8)).unwrap();
assert_eq!(
m.begin(BeginKind::Immediate).unwrap_err(),
MvccError::Busy,
"serialized mode prevents concurrent writers entirely"
);
m.commit(&mut t1).unwrap();
let mut t2 = m.begin(BeginKind::Immediate).unwrap();
let a = m.read_page(&mut t2, pa).unwrap();
assert_eq!(
a.as_bytes()[0],
0xD8,
"T2 sees T1's committed data — no write skew possible"
);
m.commit(&mut t2).unwrap();
}
#[test]
fn test_write_skew_with_indexes() {
let m = mgr();
let idx_page = PageNumber::new(50).unwrap(); let data_a = PageNumber::new(51).unwrap();
let data_b = PageNumber::new(52).unwrap();
let mut setup = m.begin(BeginKind::Immediate).unwrap();
m.write_page(&mut setup, idx_page, test_data(0xFF)).unwrap();
m.write_page(&mut setup, data_a, test_data(0x0A)).unwrap();
m.write_page(&mut setup, data_b, test_data(0x0B)).unwrap();
m.commit(&mut setup).unwrap();
let mut t1 = m.begin(BeginKind::Concurrent).unwrap();
let _ = m.read_page(&mut t1, idx_page);
let _ = m.read_page(&mut t1, data_a);
m.write_page(&mut t1, data_b, test_data(0x1B)).unwrap();
let mut t2 = m.begin(BeginKind::Concurrent).unwrap();
let _ = m.read_page(&mut t2, idx_page);
let _ = m.read_page(&mut t2, data_b);
m.write_page(&mut t2, data_a, test_data(0x2A)).unwrap();
t1.has_in_rw = true;
t1.has_out_rw = true;
let result = m.commit(&mut t1);
assert_eq!(
result.unwrap_err(),
MvccError::BusySnapshot,
"indexed write skew must be detected"
);
}
#[test]
fn test_mvcc_serialized_mode() {
test_serializable_behavior();
}
#[test]
fn test_mvcc_concurrent_different_pages() {
test_concurrent_disjoint_writes_both_commit();
}
#[test]
fn test_mvcc_concurrent_same_page_conflict() {
test_concurrent_same_page_first_committer_wins();
}
#[test]
fn test_mvcc_100_threads_100_rows() {
let manager = Arc::new(Mutex::new(mgr()));
let handles: Vec<_> = (0_u32..100)
.map(|thread_id| {
let manager = Arc::clone(&manager);
std::thread::spawn(move || {
for row in 0_u32..100 {
let pgno_raw = 10_000 + thread_id * 100 + row;
let pgno = PageNumber::new(pgno_raw).expect("page number in range");
let payload = u8::try_from((row + thread_id) % 251).expect("bounded");
let guard = manager.lock().expect("manager lock");
let mut txn = guard
.begin(BeginKind::Concurrent)
.expect("begin concurrent");
guard
.write_page(&mut txn, pgno, test_data(payload))
.expect("write");
guard.commit(&mut txn).expect("commit");
}
})
})
.collect();
for handle in handles {
handle.join().expect("worker thread panicked");
}
let guard = manager.lock().expect("manager lock");
let mut reader = guard.begin(BeginKind::Deferred).expect("begin reader");
let mut present = 0usize;
for thread_id in 0_u32..100 {
for row in 0_u32..100 {
let pgno_raw = 10_000 + thread_id * 100 + row;
let pgno = PageNumber::new(pgno_raw).expect("page number in range");
if guard.read_page(&mut reader, pgno).is_some() {
present += 1;
}
}
}
assert_eq!(present, 10_000, "expected 10,000 committed page writes");
}
#[test]
fn test_snapshot_isolation_long_reader() {
test_theorem2_snapshot_isolation_all_or_nothing_visibility();
}
#[test]
fn test_snapshot_isolation_new_reader() {
test_theorem1_committed_writes_visible_in_later_snapshots();
}
#[test]
fn test_ssi_write_skew_abort() {
test_ssi_write_skew_detected_and_aborted();
}
#[test]
fn test_ssi_non_serializable_allows() {
test_pragma_serializable_off_allows_skew();
}
#[test]
fn test_ssi_rw_flags() {
test_ssi_rw_antidependency_tracking();
}
#[test]
fn test_rebase_merge_distinct_keys() {
test_conflict_with_successful_rebase();
}
#[test]
fn test_rebase_merge_same_key_abort() {
test_conflict_response_sqlite_busy();
}
#[test]
fn test_version_guard_pinned_at_begin() {
let mgr = TransactionManager::new(PageSize::new(4096).unwrap());
let txn = mgr.begin(BeginKind::Concurrent).unwrap();
assert!(
txn.has_version_guard(),
"VersionGuard must be pinned at begin"
);
assert_eq!(mgr.version_guard_registry().active_guard_count(), 1);
}
#[test]
fn test_version_guard_unpinned_on_commit() {
let mgr = TransactionManager::new(PageSize::new(4096).unwrap());
let mut txn = mgr.begin(BeginKind::Concurrent).unwrap();
assert_eq!(mgr.version_guard_registry().active_guard_count(), 1);
let _ = mgr.commit(&mut txn);
assert!(
!txn.has_version_guard(),
"VersionGuard must be unpinned after commit"
);
assert_eq!(mgr.version_guard_registry().active_guard_count(), 0);
}
#[test]
fn test_version_guard_unpinned_on_abort() {
let mgr = TransactionManager::new(PageSize::new(4096).unwrap());
let mut txn = mgr.begin(BeginKind::Concurrent).unwrap();
assert_eq!(mgr.version_guard_registry().active_guard_count(), 1);
mgr.abort(&mut txn);
assert!(
!txn.has_version_guard(),
"VersionGuard must be unpinned after abort"
);
assert_eq!(mgr.version_guard_registry().active_guard_count(), 0);
}
#[test]
fn test_version_guard_pinned_for_all_begin_kinds() {
let mgr = TransactionManager::new(PageSize::new(4096).unwrap());
let mut txn = mgr.begin(BeginKind::Concurrent).unwrap();
assert!(txn.has_version_guard());
mgr.abort(&mut txn);
let mut txn = mgr.begin(BeginKind::Deferred).unwrap();
assert!(txn.has_version_guard());
mgr.abort(&mut txn);
let mut txn = mgr.begin(BeginKind::Immediate).unwrap();
assert!(txn.has_version_guard());
mgr.abort(&mut txn);
assert_eq!(mgr.version_guard_registry().active_guard_count(), 0);
}
#[test]
fn test_multiple_concurrent_txns_pin_separate_guards() {
let mgr = TransactionManager::new(PageSize::new(4096).unwrap());
let txn1 = mgr.begin(BeginKind::Concurrent).unwrap();
let txn2 = mgr.begin(BeginKind::Concurrent).unwrap();
let txn3 = mgr.begin(BeginKind::Concurrent).unwrap();
assert_eq!(mgr.version_guard_registry().active_guard_count(), 3);
assert!(txn1.has_version_guard());
assert!(txn2.has_version_guard());
assert!(txn3.has_version_guard());
drop(txn1);
}
#[test]
fn test_version_guard_defer_retire_returns_true_when_pinned() {
let mgr = TransactionManager::new(PageSize::new(4096).unwrap());
let txn = mgr.begin(BeginKind::Concurrent).unwrap();
let result = txn.defer_retire_version(vec![1_u8, 2, 3]);
assert!(
result,
"defer_retire_version must return true when guard is pinned"
);
}
#[test]
fn test_publish_write_set_keeps_superseded_version_visible_until_gc() {
let mgr = TransactionManager::new(PageSize::new(4096).unwrap());
let pgno = PageNumber::new(6_001).expect("valid page number");
let mut txn1 = mgr.begin(BeginKind::Concurrent).unwrap();
let first_data = test_data(0x11);
mgr.write_page(&mut txn1, pgno, first_data.clone()).unwrap();
let first_commit = mgr.commit(&mut txn1).unwrap();
let mut txn2 = mgr.begin(BeginKind::Concurrent).unwrap();
let snapshot_before_second_commit = txn2.snapshot;
let second_data = test_data(0x22);
mgr.write_page(&mut txn2, pgno, second_data.clone())
.unwrap();
let second_commit = mgr.commit(&mut txn2).unwrap();
assert_eq!(
mgr.version_store().chain_length(pgno),
2,
"publishing a replacement version must keep the superseded version in the chain until GC"
);
let version_before_second_commit = mgr
.version_store()
.resolve(pgno, &snapshot_before_second_commit)
.expect("snapshot taken before second commit should still resolve the first version");
assert_eq!(
mgr.version_store()
.get_version(version_before_second_commit)
.expect("resolved version")
.data,
first_data
);
let latest_version = mgr
.version_store()
.resolve(pgno, &Snapshot::new(second_commit, SchemaEpoch::ZERO))
.expect("latest snapshot should resolve the replacement version");
assert!(
second_commit > first_commit,
"replacement commit should advance the global commit sequence"
);
assert_eq!(
mgr.version_store()
.get_version(latest_version)
.expect("latest resolved version")
.data,
second_data
);
assert_eq!(mgr.version_guard_registry().active_guard_count(), 0);
}
#[test]
fn test_version_guard_defer_retire_returns_false_without_guard() {
let txn = Transaction::new(
TxnId::new(1).expect("TxnId::new(1) should be valid"),
TxnEpoch::new(0),
Snapshot::new(CommitSeq::ZERO, SchemaEpoch::ZERO),
TransactionMode::Concurrent,
);
let result = txn.defer_retire_version(42_u64);
assert!(
!result,
"defer_retire_version must return false when no guard is pinned"
);
}
#[test]
fn test_version_guard_deferred_value_freed_after_unpin() {
use crossbeam_epoch as epoch;
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
#[derive(Clone)]
struct DropTracker(Arc<AtomicUsize>);
impl Drop for DropTracker {
fn drop(&mut self) {
self.0.fetch_add(1, AtomicOrdering::SeqCst);
}
}
let mgr = TransactionManager::new(PageSize::new(4096).unwrap());
let mut txn = mgr.begin(BeginKind::Concurrent).unwrap();
let drop_count = Arc::new(AtomicUsize::new(0));
txn.defer_retire_version(DropTracker(Arc::clone(&drop_count)));
assert_eq!(drop_count.load(AtomicOrdering::SeqCst), 0);
let _ = mgr.commit(&mut txn);
let deadline = Instant::now() + Duration::from_secs(2);
while drop_count.load(AtomicOrdering::SeqCst) == 0 && Instant::now() < deadline {
let g = epoch::pin();
g.flush();
std::thread::yield_now();
}
assert_eq!(
drop_count.load(AtomicOrdering::SeqCst),
1,
"deferred value must be freed after guard unpin + epoch advance"
);
}
#[test]
fn test_version_guard_deferred_value_freed_after_abort() {
use crossbeam_epoch as epoch;
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
#[derive(Clone)]
struct DropTracker(Arc<AtomicUsize>);
impl Drop for DropTracker {
fn drop(&mut self) {
self.0.fetch_add(1, AtomicOrdering::SeqCst);
}
}
let mgr = TransactionManager::new(PageSize::new(4096).unwrap());
let mut txn = mgr.begin(BeginKind::Concurrent).unwrap();
let drop_count = Arc::new(AtomicUsize::new(0));
assert_eq!(mgr.version_guard_registry().active_guard_count(), 1);
assert!(
txn.defer_retire_version(DropTracker(Arc::clone(&drop_count))),
"defer_retire_version should succeed while guard is pinned"
);
mgr.abort(&mut txn);
assert!(!txn.has_version_guard(), "abort must drop version guard");
assert_eq!(mgr.version_guard_registry().active_guard_count(), 0);
let deadline = Instant::now() + Duration::from_secs(2);
while drop_count.load(AtomicOrdering::SeqCst) == 0 && Instant::now() < deadline {
let g = epoch::pin();
g.flush();
std::thread::yield_now();
}
assert_eq!(
drop_count.load(AtomicOrdering::SeqCst),
1,
"aborted transaction retirements must be reclaimed after unpin + epoch advance"
);
}
#[test]
fn test_concurrent_same_page_writers_preserve_all_committed_versions() {
let mut m = mgr_with_busy_timeout_ms(25);
m.set_max_chain_length(128);
m.chain_ewma_x256
.store(32_u64 * 256, std::sync::atomic::Ordering::Relaxed);
let mgr = Arc::new(m);
let pgno = PageNumber::new(6_777).unwrap();
let workers = 8usize;
let writes_per_worker = 4usize;
let mut seed = mgr.begin(BeginKind::Concurrent).unwrap();
mgr.write_page(&mut seed, pgno, test_data(0x00)).unwrap();
mgr.commit(&mut seed).unwrap();
let start = Arc::new(std::sync::Barrier::new(workers));
let mut handles = Vec::with_capacity(workers);
for worker in 0..workers {
let mgr_clone = Arc::clone(&mgr);
let start_clone = Arc::clone(&start);
handles.push(std::thread::spawn(move || {
start_clone.wait();
let mut committed = 0usize;
for step in 0..writes_per_worker {
loop {
let mut txn = mgr_clone.begin(BeginKind::Concurrent).unwrap();
let payload =
u8::try_from((worker * writes_per_worker + step) % 251).unwrap();
match mgr_clone.write_page(&mut txn, pgno, test_data(payload)) {
Ok(()) => match mgr_clone.commit(&mut txn) {
Ok(_) => {
committed += 1;
break;
}
Err(MvccError::BusySnapshot) => {
std::thread::yield_now();
}
Err(err) => panic!("unexpected commit error: {err:?}"),
},
Err(MvccError::Busy) => {
mgr_clone.abort(&mut txn);
std::thread::yield_now();
}
Err(err) => panic!("unexpected write error: {err:?}"),
}
}
}
committed
}));
}
let total_committed = handles
.into_iter()
.map(|handle| handle.join().expect("writer thread should not panic"))
.sum::<usize>();
assert_eq!(
total_committed,
workers * writes_per_worker,
"each worker operation should eventually commit after retries"
);
let chain_len = mgr.version_store().walk_chain(pgno).len();
assert_eq!(
chain_len,
total_committed + 1,
"same-page concurrent commits should retain one version per successful commit plus seed"
);
}
#[test]
fn test_chain_length_bounded_after_10000_updates_same_page() {
let mut mgr = mgr();
mgr.set_max_chain_length(64);
mgr.set_chain_length_warning(32);
let pgno = PageNumber::new(6_778).unwrap();
let before = GLOBAL_EBR_METRICS.snapshot();
for step in 0_u32..10_000_u32 {
let mut txn = mgr.begin(BeginKind::Concurrent).unwrap();
let byte = u8::try_from(step % 251).unwrap();
mgr.write_page(&mut txn, pgno, test_data(byte)).unwrap();
mgr.commit(&mut txn).unwrap();
}
let chain_len = mgr.version_store().chain_length(pgno);
assert!(
chain_len <= mgr.max_chain_length(),
"chain length {} exceeded configured max {}",
chain_len,
mgr.max_chain_length()
);
let after = GLOBAL_EBR_METRICS.snapshot();
assert!(
after.gc_freed_count > before.gc_freed_count,
"expected eager GC frees during sustained same-page updates"
);
assert!(
after.max_chain_length_observed >= before.max_chain_length_observed,
"max observed chain length should be monotonic"
);
}
#[test]
fn test_chain_soft_bound_allows_exceeding_max_chain_length() {
let mut mgr = mgr();
mgr.set_max_chain_length(4);
mgr.set_chain_length_warning(2);
let pgno = PageNumber::new(6_779).unwrap();
let mut seed = mgr.begin(BeginKind::Concurrent).unwrap();
mgr.write_page(&mut seed, pgno, test_data(0x01)).unwrap();
mgr.commit(&mut seed).unwrap();
let mut pinned_reader = mgr.begin(BeginKind::Concurrent).unwrap();
let _ = mgr.read_page(&mut pinned_reader, pgno);
let mut commits_in_soft_range = 0_u32;
for step in 0_u32..12_u32 {
let mut writer = mgr.begin(BeginKind::Concurrent).unwrap();
let byte = u8::try_from((step + 2) % 251).unwrap();
mgr.write_page(&mut writer, pgno, test_data(byte)).unwrap();
match mgr.commit(&mut writer) {
Ok(_) => commits_in_soft_range += 1,
Err(MvccError::Busy) => break,
Err(other) => panic!("unexpected commit error: {other:?}"),
}
}
assert_eq!(
commits_in_soft_range, 12,
"C7 soft-bound: commits between max_chain_length and hard_limit should succeed"
);
mgr.abort(&mut pinned_reader);
}
#[test]
fn test_chain_hard_limit_triggers_busy_when_horizon_pinned() {
let mut mgr = mgr();
mgr.set_max_chain_length(4);
mgr.set_chain_length_warning(2);
let pgno = PageNumber::new(6_780).unwrap();
let before = GLOBAL_EBR_METRICS.snapshot();
let mut seed = mgr.begin(BeginKind::Concurrent).unwrap();
mgr.write_page(&mut seed, pgno, test_data(0x01)).unwrap();
mgr.commit(&mut seed).unwrap();
let mut pinned_reader = mgr.begin(BeginKind::Concurrent).unwrap();
let _ = mgr.read_page(&mut pinned_reader, pgno);
let mut saw_busy = false;
for step in 0_u32..32_u32 {
let mut writer = mgr.begin(BeginKind::Concurrent).unwrap();
let byte = u8::try_from((step + 2) % 251).unwrap();
mgr.write_page(&mut writer, pgno, test_data(byte)).unwrap();
match mgr.commit(&mut writer) {
Ok(_) => {}
Err(MvccError::Busy) => {
saw_busy = true;
break;
}
Err(other) => panic!("unexpected commit error: {other:?}"),
}
}
assert!(
saw_busy,
"expected MvccError::Busy when chain exceeds hard limit (SOFT_BOUND_MULTIPLIER * max_chain_length)"
);
mgr.abort(&mut pinned_reader);
let after = GLOBAL_EBR_METRICS.snapshot();
assert!(
after.gc_blocked_count > before.gc_blocked_count,
"gc_blocked metric should increment when hard limit exceeded"
);
}
#[test]
fn test_adaptive_compact_threshold_handles_small_max_chain_length() {
let mut mgr = mgr();
mgr.set_max_chain_length(4);
mgr.chain_ewma_x256
.store(32_u64 * 256, std::sync::atomic::Ordering::Relaxed);
assert_eq!(
mgr.adaptive_compact_threshold(),
2,
"threshold should collapse to max_chain_length/2 instead of panicking"
);
mgr.set_max_chain_length(1);
mgr.chain_ewma_x256
.store(128_u64 * 256, std::sync::atomic::Ordering::Relaxed);
assert_eq!(
mgr.adaptive_compact_threshold(),
1,
"single-version chains should still yield a valid positive threshold"
);
}
#[test]
fn test_cached_gc_horizon_tracks_snapshot_lifecycle() {
let mgr = mgr();
let pgno = PageNumber::new(6_780).unwrap();
assert!(mgr.cached_gc_horizon().is_none());
let mut deferred = mgr.begin(BeginKind::Deferred).unwrap();
assert!(
mgr.cached_gc_horizon().is_none(),
"deferred txns should not pin horizon before first read"
);
let _ = mgr.read_page(&mut deferred, pgno);
assert!(
mgr.cached_gc_horizon().is_some(),
"first read should register active snapshot horizon"
);
mgr.abort(&mut deferred);
assert!(
mgr.cached_gc_horizon().is_none(),
"releasing last active snapshot should clear cached horizon"
);
}
#[test]
fn test_cached_gc_horizon_tracks_deferred_first_write_lifecycle() {
let mgr = mgr();
let pgno = PageNumber::new(6_784).unwrap();
assert!(mgr.cached_gc_horizon().is_none());
let mut deferred = mgr.begin(BeginKind::Deferred).unwrap();
mgr.write_page(&mut deferred, pgno, test_data(0x7A))
.expect("first deferred write should upgrade and establish a snapshot");
assert_eq!(
mgr.cached_gc_horizon(),
Some(deferred.snapshot.high),
"first deferred write should register the active snapshot horizon"
);
mgr.abort(&mut deferred);
assert!(
mgr.cached_gc_horizon().is_none(),
"aborting the last deferred writer should release the cached horizon"
);
}
#[test]
fn test_cached_gc_horizon_retains_duplicate_snapshot_highs_until_last_release() {
let mgr = mgr();
let pgno = PageNumber::new(6_781).unwrap();
let mut txn_a = mgr.begin(BeginKind::Deferred).unwrap();
let mut txn_b = mgr.begin(BeginKind::Deferred).unwrap();
let _ = mgr.read_page(&mut txn_a, pgno);
let first_horizon = mgr
.cached_gc_horizon()
.expect("first active snapshot should populate cached horizon");
let _ = mgr.read_page(&mut txn_b, pgno);
assert_eq!(
mgr.cached_gc_horizon(),
Some(first_horizon),
"same snapshot high should keep the cached minimum unchanged"
);
mgr.abort(&mut txn_a);
assert_eq!(
mgr.cached_gc_horizon(),
Some(first_horizon),
"remaining txn with identical snapshot high must keep horizon pinned"
);
mgr.abort(&mut txn_b);
assert!(
mgr.cached_gc_horizon().is_none(),
"cached horizon should clear only after the last duplicate snapshot releases"
);
}
#[test]
fn test_cached_gc_horizon_advances_to_next_snapshot_without_full_clear() {
let mgr = mgr();
let read_pgno = PageNumber::new(6_782).unwrap();
let write_pgno = PageNumber::new(6_783).unwrap();
let mut oldest = mgr.begin(BeginKind::Deferred).unwrap();
let _ = mgr.read_page(&mut oldest, read_pgno);
let oldest_horizon = oldest.snapshot.high;
let mut writer = mgr.begin(BeginKind::Concurrent).unwrap();
mgr.write_page(&mut writer, write_pgno, PageData::zeroed(PageSize::DEFAULT))
.expect("writer should stage page");
mgr.commit(&mut writer).expect("writer should commit");
let mut newer = mgr.begin(BeginKind::Deferred).unwrap();
let _ = mgr.read_page(&mut newer, read_pgno);
let newer_horizon = newer.snapshot.high;
assert!(
newer_horizon > oldest_horizon,
"later txn should observe an advanced snapshot high after intervening commit"
);
assert_eq!(
mgr.cached_gc_horizon(),
Some(oldest_horizon),
"oldest active snapshot should pin the minimum horizon"
);
mgr.abort(&mut oldest);
assert_eq!(
mgr.cached_gc_horizon(),
Some(newer_horizon),
"releasing the oldest snapshot should advance horizon to the next active snapshot"
);
mgr.abort(&mut newer);
assert!(
mgr.cached_gc_horizon().is_none(),
"cached horizon should clear after all active snapshots release"
);
}
#[test]
fn test_version_guard_registry_accessor() {
let mgr = TransactionManager::new(PageSize::new(4096).unwrap());
let registry = mgr.version_guard_registry();
assert_eq!(registry.active_guard_count(), 0);
}
#[test]
fn test_commit_structural_page_uses_page_level_mvcc() {
let mgr = mgr();
let pgno = PageNumber::new(100).unwrap();
let mut txn = mgr.begin(BeginKind::Concurrent).unwrap();
let data = test_data(0xAB);
mgr.write_page(&mut txn, pgno, data.clone())
.expect("write should succeed");
assert!(
txn.structural_pages.contains(&pgno),
"write_page should mark page as structural"
);
let commit_seq = mgr.commit(&mut txn).expect("commit should succeed");
let snapshot = Snapshot::new(commit_seq, SchemaEpoch::ZERO);
let resolved = mgr.version_store.resolve_visible_version(pgno, &snapshot);
assert!(
resolved.is_some(),
"structural page should be visible in VersionStore"
);
assert_eq!(
resolved.unwrap().data.as_bytes()[0],
0xAB,
"structural page data should match"
);
}
#[test]
fn test_commit_logical_page_not_in_structural_set() {
use crate::cell_visibility::CellKey;
use fsqlite_types::{BtreeRef, SemanticKeyKind, TableId};
let mgr = mgr();
let pgno = PageNumber::new(200).unwrap();
let mut txn = mgr.begin(BeginKind::Concurrent).unwrap();
let _ = mgr.read_page(&mut txn, pgno);
let cell_key = CellKey {
btree: BtreeRef::Table(TableId::new(1)),
kind: SemanticKeyKind::TableRow,
key_digest: [0u8; 16],
};
let txn_token = txn.token();
let idx = mgr
.cell_log()
.record_insert(cell_key, pgno, vec![1, 2, 3, 4], txn_token);
assert!(idx.is_some(), "record_insert should succeed");
txn.write_set.push(pgno);
assert!(
!txn.structural_pages.contains(&pgno),
"cell-only page should not be in structural_pages"
);
let commit_seq = mgr.commit(&mut txn).expect("commit should succeed");
assert_eq!(
mgr.cell_log().resolve(pgno, &cell_key, commit_seq),
Some(vec![1, 2, 3, 4])
);
assert!(
mgr.commit_index.latest(pgno).is_some(),
"commit_index should have entry for logical page"
);
let indexed_seq = mgr.commit_index.latest(pgno).unwrap();
assert_eq!(
indexed_seq, commit_seq,
"commit_index should have correct commit_seq"
);
}
#[test]
fn test_commit_mixed_structural_and_logical_pages() {
use crate::cell_visibility::CellKey;
use fsqlite_types::{BtreeRef, SemanticKeyKind, TableId};
let mgr = mgr();
let structural_pgno = PageNumber::new(300).unwrap();
let logical_pgno = PageNumber::new(301).unwrap();
let mut txn = mgr.begin(BeginKind::Concurrent).unwrap();
let _ = mgr.read_page(&mut txn, logical_pgno);
let data = test_data(0xCD);
mgr.write_page(&mut txn, structural_pgno, data.clone())
.expect("write should succeed");
let cell_key = CellKey {
btree: BtreeRef::Table(TableId::new(2)),
kind: SemanticKeyKind::TableRow,
key_digest: [1u8; 16],
};
let _ = mgr
.cell_log()
.record_insert(cell_key, logical_pgno, vec![5, 6, 7, 8], txn.token());
txn.write_set.push(logical_pgno);
assert!(txn.structural_pages.contains(&structural_pgno));
assert!(!txn.structural_pages.contains(&logical_pgno));
let commit_seq = mgr.commit(&mut txn).expect("commit should succeed");
let snapshot = Snapshot::new(commit_seq, SchemaEpoch::ZERO);
let structural_version = mgr
.version_store
.resolve_visible_version(structural_pgno, &snapshot);
assert!(
structural_version.is_some(),
"structural page should be in VersionStore"
);
assert_eq!(structural_version.unwrap().data.as_bytes()[0], 0xCD);
assert_eq!(mgr.commit_index.latest(structural_pgno), Some(commit_seq));
assert_eq!(mgr.commit_index.latest(logical_pgno), Some(commit_seq));
assert_eq!(
mgr.cell_log().resolve(logical_pgno, &cell_key, commit_seq),
Some(vec![5, 6, 7, 8])
);
}
#[test]
fn test_abort_rolls_back_cell_deltas() {
use crate::cell_visibility::CellKey;
use fsqlite_types::{BtreeRef, SemanticKeyKind, TableId};
let mgr = mgr();
let pgno = PageNumber::new(400).unwrap();
let mut txn = mgr.begin(BeginKind::Concurrent).unwrap();
let _ = mgr.read_page(&mut txn, pgno);
let cell_key = CellKey {
btree: BtreeRef::Table(TableId::new(3)),
kind: SemanticKeyKind::TableRow,
key_digest: [2u8; 16],
};
let idx = mgr
.cell_log()
.record_insert(cell_key, pgno, vec![9, 10, 11, 12], txn.token());
assert!(idx.is_some(), "record_insert should succeed");
assert_eq!(mgr.cell_log().delta_count(), 1);
mgr.abort(&mut txn);
assert_eq!(txn.state, TransactionState::Aborted);
assert!(
txn.structural_pages.is_empty(),
"structural_pages should be cleared after abort"
);
assert_eq!(mgr.cell_log().delta_count(), 0);
assert!(
mgr.cell_log()
.resolve(pgno, &cell_key, CommitSeq::new(u64::MAX))
.is_none(),
"aborted cell delta should be rolled back"
);
}
#[test]
fn test_cell_log_accessor() {
let mgr = TransactionManager::new(PageSize::new(4096).unwrap());
let cell_log = mgr.cell_log();
assert_eq!(cell_log.delta_count(), 0, "new cell_log should be empty");
}
}