use std::cell::RefCell;
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU8, AtomicUsize, Ordering};
use parking_lot::Mutex;
use crate::common::I64Map;
use crate::core::IsolationLevel;
use crate::storage::VisibilityChecker;
pub const INVALID_TRANSACTION_ID: i64 = -999999999;
pub const RECOVERY_TRANSACTION_ID: i64 = -1;
const ABORTED_SENTINEL: i64 = -1;
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
#[repr(u8)]
pub enum TxnStatus {
Active = 0,
Committing = 1,
Aborted = 2,
}
#[derive(Clone, Copy, Debug)]
pub struct TxnState {
begin_seq: i64,
state_seq: i64,
}
const STATUS_SHIFT: u32 = 62;
const SEQ_MASK: i64 = (1i64 << STATUS_SHIFT) - 1;
impl TxnState {
#[inline]
const fn new_active(begin_seq: i64) -> Self {
Self {
begin_seq,
state_seq: 0, }
}
#[inline]
const fn new_aborted() -> Self {
Self {
begin_seq: ABORTED_SENTINEL,
state_seq: (TxnStatus::Aborted as i64) << STATUS_SHIFT,
}
}
#[inline(always)]
pub const fn is_aborted(&self) -> bool {
self.begin_seq == ABORTED_SENTINEL
}
#[inline(always)]
pub const fn begin_seq(&self) -> i64 {
if self.begin_seq == ABORTED_SENTINEL {
0
} else {
self.begin_seq
}
}
#[inline(always)]
pub const fn status(&self) -> TxnStatus {
if self.begin_seq == ABORTED_SENTINEL {
return TxnStatus::Aborted;
}
match (self.state_seq >> STATUS_SHIFT) as u8 {
0 => TxnStatus::Active,
1 => TxnStatus::Committing,
_ => TxnStatus::Aborted,
}
}
#[inline(always)]
pub const fn is_active_or_committing(&self) -> bool {
self.begin_seq != ABORTED_SENTINEL
}
#[inline(always)]
fn set_committing(&mut self, commit_seq: i64) {
self.state_seq = commit_seq | (1i64 << STATUS_SHIFT);
}
#[inline(always)]
pub const fn commit_seq(&self) -> i64 {
self.state_seq & SEQ_MASK
}
}
const CACHE_SIZE: usize = 65536;
const CACHE_SHIFT: u32 = CACHE_SIZE.trailing_zeros();
thread_local! {
static COMMITTED_CACHE: RefCell<CommittedCache> = const { RefCell::new(CommittedCache::new()) };
}
struct CommittedCache {
entries: [i64; CACHE_SIZE],
}
impl CommittedCache {
#[inline]
const fn new() -> Self {
Self {
entries: [0; CACHE_SIZE],
}
}
#[inline(always)]
fn cache_index(txn_id: i64) -> usize {
let x = txn_id as u64;
((x ^ (x >> CACHE_SHIFT)) as usize) & (CACHE_SIZE - 1)
}
#[inline(always)]
fn contains(&self, txn_id: i64) -> bool {
let idx = Self::cache_index(txn_id);
self.entries[idx] == txn_id
}
#[inline(always)]
fn insert(&mut self, txn_id: i64) {
let idx = Self::cache_index(txn_id);
self.entries[idx] = txn_id;
}
}
pub struct TransactionRegistry {
transactions: Mutex<I64Map<TxnState>>,
snapshot_seqs: Mutex<I64Map<i64>>,
next_txn_id: AtomicI64,
next_sequence: AtomicI64,
global_isolation_level: AtomicU8,
isolation_overrides: Mutex<I64Map<u8>>,
override_count: AtomicUsize,
active_txn_count: AtomicUsize,
accepting: AtomicBool,
}
impl TransactionRegistry {
pub fn new() -> Self {
Self::with_capacity(1024)
}
pub fn with_capacity(capacity: usize) -> Self {
Self {
transactions: Mutex::new(I64Map::with_capacity(capacity)),
snapshot_seqs: Mutex::new(I64Map::new()),
next_txn_id: AtomicI64::new(0),
next_sequence: AtomicI64::new(0),
global_isolation_level: AtomicU8::new(0),
isolation_overrides: Mutex::new(I64Map::new()),
override_count: AtomicUsize::new(0),
active_txn_count: AtomicUsize::new(0),
accepting: AtomicBool::new(true),
}
}
#[inline(always)]
const fn isolation_to_u8(level: IsolationLevel) -> u8 {
match level {
IsolationLevel::ReadCommitted => 0,
IsolationLevel::SnapshotIsolation => 1,
}
}
#[inline(always)]
const fn u8_to_isolation(value: u8) -> IsolationLevel {
match value {
0 => IsolationLevel::ReadCommitted,
_ => IsolationLevel::SnapshotIsolation,
}
}
pub fn set_global_isolation_level(&self, level: IsolationLevel) {
self.global_isolation_level
.store(Self::isolation_to_u8(level), Ordering::Release);
}
#[inline(always)]
pub fn get_global_isolation_level(&self) -> IsolationLevel {
Self::u8_to_isolation(self.global_isolation_level.load(Ordering::Acquire))
}
pub fn set_transaction_isolation_level(&self, txn_id: i64, level: IsolationLevel) {
let mut map = self.isolation_overrides.lock();
let is_new = !map.contains_key(txn_id);
map.insert(txn_id, Self::isolation_to_u8(level));
if is_new {
self.override_count.fetch_add(1, Ordering::Relaxed);
}
}
pub fn remove_transaction_isolation_level(&self, txn_id: i64) {
if self.isolation_overrides.lock().remove(txn_id).is_some() {
self.override_count.fetch_sub(1, Ordering::Relaxed);
}
}
#[inline(always)]
pub fn get_isolation_level(&self, txn_id: i64) -> IsolationLevel {
if self.override_count.load(Ordering::Relaxed) > 0 {
if let Some(&level) = self.isolation_overrides.lock().get(txn_id) {
return Self::u8_to_isolation(level);
}
}
self.get_global_isolation_level()
}
#[inline(always)]
fn needs_snapshot_isolation(&self, txn_id: i64) -> bool {
let global = self.global_isolation_level.load(Ordering::Relaxed);
if global == 1 {
return true;
}
if self.override_count.load(Ordering::Relaxed) > 0 {
if let Some(&level) = self.isolation_overrides.lock().get(txn_id) {
return level == 1;
}
}
false
}
pub fn has_active_snapshot_transactions(&self) -> bool {
let global = self.global_isolation_level.load(Ordering::Relaxed);
if global == 1 {
return !self.transactions.lock().is_empty();
}
if self.override_count.load(Ordering::Relaxed) > 0 {
let transactions = self.transactions.lock();
let overrides = self.isolation_overrides.lock();
for txn_id in transactions.keys() {
if overrides.get(txn_id) == Some(&1) {
return true;
}
}
}
false
}
pub fn get_snapshot_transaction_ids(&self) -> Vec<i64> {
let global = self.global_isolation_level.load(Ordering::Relaxed);
let transactions = self.transactions.lock();
if global == 1 {
return transactions.keys().collect();
}
if self.override_count.load(Ordering::Relaxed) > 0 {
let overrides = self.isolation_overrides.lock();
return transactions
.keys()
.filter(|txn_id| overrides.get(*txn_id) == Some(&1))
.collect();
}
Vec::new()
}
pub fn get_min_snapshot_begin_seq(&self) -> Option<i64> {
let global = self.global_isolation_level.load(Ordering::Relaxed);
let transactions = self.transactions.lock();
if global == 1 {
return transactions
.values()
.filter(|s| s.is_active_or_committing())
.map(|s| s.begin_seq())
.min();
}
if self.override_count.load(Ordering::Relaxed) > 0 {
let overrides = self.isolation_overrides.lock();
return transactions
.iter()
.filter(|(id, s)| s.is_active_or_committing() && overrides.get(*id) == Some(&1))
.map(|(_, s)| s.begin_seq())
.min();
}
None
}
pub fn begin_transaction(&self) -> (i64, i64) {
if !self.accepting.load(Ordering::Acquire) {
return (INVALID_TRANSACTION_ID, 0);
}
let txn_id = self.next_txn_id.fetch_add(1, Ordering::AcqRel) + 1;
let begin_seq = self.next_sequence.fetch_add(1, Ordering::AcqRel) + 1;
self.transactions
.lock()
.insert(txn_id, TxnState::new_active(begin_seq));
self.active_txn_count.fetch_add(1, Ordering::Relaxed);
(txn_id, begin_seq)
}
#[inline]
pub fn start_commit(&self, txn_id: i64) -> i64 {
let commit_seq = self.next_sequence.fetch_add(1, Ordering::AcqRel) + 1;
if let Some(entry) = self.transactions.lock().get_mut(txn_id) {
entry.set_committing(commit_seq);
}
commit_seq
}
#[inline]
pub fn complete_commit(&self, txn_id: i64) {
{
let mut txns = self.transactions.lock();
let seq = txns.get(txn_id).map(|e| e.commit_seq()).unwrap_or(0);
if self.global_isolation_level.load(Ordering::Relaxed) == 1
|| self.override_count.load(Ordering::Relaxed) > 0
{
self.snapshot_seqs.lock().insert(txn_id, seq);
}
txns.remove(txn_id);
};
self.active_txn_count.fetch_sub(1, Ordering::Relaxed);
}
pub fn commit_transaction(&self, txn_id: i64) -> i64 {
let commit_seq = self.next_sequence.fetch_add(1, Ordering::AcqRel) + 1;
{
let mut txns = self.transactions.lock();
if self.global_isolation_level.load(Ordering::Relaxed) == 1
|| self.override_count.load(Ordering::Relaxed) > 0
{
self.snapshot_seqs.lock().insert(txn_id, commit_seq);
}
txns.remove(txn_id);
}
self.active_txn_count.fetch_sub(1, Ordering::Relaxed);
commit_seq
}
#[inline]
pub fn abort_transaction(&self, txn_id: i64) {
let should_decrement = {
let mut txns = self.transactions.lock();
if let Some(state) = txns.get_mut(txn_id) {
let was_active = state.is_active_or_committing();
*state = TxnState::new_aborted();
was_active
} else {
false
}
};
if should_decrement {
self.active_txn_count.fetch_sub(1, Ordering::Relaxed);
}
}
pub fn recover_committed_transaction(&self, txn_id: i64, commit_seq: i64) {
self.snapshot_seqs.lock().insert(txn_id, commit_seq);
loop {
let current = self.next_txn_id.load(Ordering::Acquire);
if txn_id > current {
if self
.next_txn_id
.compare_exchange(current, txn_id, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
break;
}
} else {
break;
}
}
loop {
let current = self.next_sequence.load(Ordering::Acquire);
if commit_seq > current {
if self
.next_sequence
.compare_exchange(current, commit_seq, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
break;
}
} else {
break;
}
}
}
pub fn recover_aborted_transaction(&self, txn_id: i64) {
self.transactions
.lock()
.insert(txn_id, TxnState::new_aborted());
loop {
let current = self.next_txn_id.load(Ordering::Acquire);
if txn_id > current {
if self
.next_txn_id
.compare_exchange(current, txn_id, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
break;
}
} else {
break;
}
}
}
#[inline(always)]
pub fn is_visible(&self, version_txn_id: i64, viewer_txn_id: i64) -> bool {
if version_txn_id == viewer_txn_id {
return true;
}
if version_txn_id == RECOVERY_TRANSACTION_ID {
return true;
}
let needs_snapshot = self.needs_snapshot_isolation(viewer_txn_id);
if needs_snapshot {
self.is_visible_snapshot(version_txn_id, viewer_txn_id)
} else {
self.check_committed(version_txn_id)
}
}
#[inline(always)]
fn check_committed(&self, txn_id: i64) -> bool {
if COMMITTED_CACHE.with(|c| c.borrow().contains(txn_id)) {
return true;
}
if self.transactions.lock().contains_key(txn_id) {
return false;
}
let next = self.next_txn_id.load(Ordering::Acquire);
if txn_id > 0 && txn_id <= next {
COMMITTED_CACHE.with(|c| c.borrow_mut().insert(txn_id));
return true;
}
false
}
#[inline(always)]
pub fn is_directly_visible(&self, version_txn_id: i64) -> bool {
if version_txn_id == RECOVERY_TRANSACTION_ID {
return true;
}
self.check_committed(version_txn_id)
}
#[cold]
#[inline(never)]
fn is_visible_snapshot(&self, version_txn_id: i64, viewer_txn_id: i64) -> bool {
let (version_state, viewer_begin_seq) = {
let txns = self.transactions.lock();
let viewer_begin_seq = match txns.get(viewer_txn_id) {
Some(state) if state.is_active_or_committing() => state.begin_seq(),
_ => {
drop(txns);
return self.check_committed(version_txn_id);
}
};
let version_state = txns.get(version_txn_id).copied();
(version_state, viewer_begin_seq)
};
match version_state {
Some(state) => {
if state.is_aborted() {
return false; }
false
}
None => {
let next = self.next_txn_id.load(Ordering::Acquire);
if version_txn_id <= 0 || version_txn_id > next {
return false; }
let commit_seq = self.snapshot_seqs.lock().get(version_txn_id).copied();
if let Some(commit_seq) = commit_seq {
return commit_seq <= viewer_begin_seq;
}
true
}
}
}
pub fn get_commit_sequence(&self, txn_id: i64) -> Option<i64> {
if let Some(&seq) = self.snapshot_seqs.lock().get(txn_id) {
return Some(seq);
}
if let Some(state) = self.transactions.lock().get(txn_id) {
if state.is_aborted() || state.is_active_or_committing() {
return None;
}
}
let next = self.next_txn_id.load(Ordering::Acquire);
if txn_id > 0 && txn_id <= next {
return Some(0); }
None
}
pub fn get_transaction_begin_sequence(&self, txn_id: i64) -> i64 {
self.transactions
.lock()
.get(txn_id)
.map(|e| e.begin_seq())
.unwrap_or(0)
}
pub fn get_committing_sequence(&self, txn_id: i64) -> i64 {
self.transactions
.lock()
.get(txn_id)
.map(|e| e.commit_seq())
.unwrap_or(0)
}
pub fn get_current_sequence(&self) -> i64 {
self.next_sequence.load(Ordering::Acquire)
}
pub fn current_commit_sequence(&self) -> i64 {
self.next_sequence.load(Ordering::Acquire)
}
pub fn safe_snapshot_cutoff(&self) -> i64 {
let txns = self.transactions.lock();
let mut min_committing = i64::MAX;
for (_, entry) in txns.iter() {
if entry.status() == TxnStatus::Committing {
let seq = entry.commit_seq();
if seq > 0 && seq < min_committing {
min_committing = seq;
}
}
}
let current = self.next_sequence.load(Ordering::Acquire);
drop(txns);
if min_committing == i64::MAX {
current
} else {
min_committing - 1
}
}
pub fn run_gc(&self) -> usize {
let mut removed = 0;
let override_keys: Vec<i64> = if self.override_count.load(Ordering::Relaxed) > 0 {
self.isolation_overrides.lock().keys().collect()
} else {
Vec::new()
};
let (min_begin_seq, invalid_overrides) = {
let mut txns = self.transactions.lock();
let mut min_begin = i64::MAX;
let mut min_id = i64::MAX;
for (id, state) in txns.iter() {
if state.is_active_or_committing() {
min_begin = min_begin.min(state.begin_seq());
min_id = min_id.min(id);
}
}
if min_id == i64::MAX {
min_id = self.next_txn_id.load(Ordering::Acquire);
}
let aborted_cutoff = min_id.saturating_sub(10000);
if aborted_cutoff > 0 {
let len_before = txns.len();
txns.retain(|id, state| !state.is_aborted() || id >= aborted_cutoff);
removed += len_before - txns.len();
}
let invalid: Vec<i64> = override_keys
.iter()
.filter(|&&txn_id| {
match txns.get(txn_id) {
None => true, Some(state) => state.is_aborted(),
}
})
.copied()
.collect();
(min_begin, invalid)
};
{
let mut seqs = self.snapshot_seqs.lock();
let len_before = seqs.len();
seqs.retain(|_, &mut commit_seq| commit_seq >= min_begin_seq);
removed += len_before - seqs.len();
}
if !invalid_overrides.is_empty() {
let mut overrides = self.isolation_overrides.lock();
let mut actual_removals = 0usize;
for txn_id in &invalid_overrides {
if overrides.remove(*txn_id).is_some() {
removed += 1;
actual_removals += 1;
}
}
if actual_removals > 0 {
self.override_count
.fetch_sub(actual_removals, Ordering::Relaxed);
}
}
removed
}
pub fn cleanup_old_transactions(&self, _max_age: std::time::Duration) -> i32 {
self.run_gc() as i32
}
pub fn wait_for_active_transactions(&self, timeout: std::time::Duration) -> i32 {
let deadline = crate::common::time_compat::Instant::now() + timeout;
loop {
if crate::common::time_compat::Instant::now() > deadline {
break;
}
let count = self.active_count();
if count == 0 {
return 0;
}
#[cfg(not(target_arch = "wasm32"))]
std::thread::sleep(std::time::Duration::from_millis(10));
#[cfg(target_arch = "wasm32")]
break;
}
self.active_count() as i32
}
pub fn stop_accepting_transactions(&self) {
self.accepting.store(false, Ordering::Release);
}
pub fn start_accepting_transactions(&self) {
self.accepting.store(true, Ordering::Release);
}
pub fn shutdown(&self) {
self.stop_accepting_transactions();
}
pub fn is_accepting(&self) -> bool {
self.accepting.load(Ordering::Acquire)
}
#[inline]
pub fn active_count(&self) -> usize {
self.active_txn_count.load(Ordering::Relaxed)
}
pub fn active_transaction_ids(&self) -> Vec<i64> {
self.transactions
.lock()
.iter()
.filter(|(_, s)| {
let status = s.status();
status == TxnStatus::Active || status == TxnStatus::Committing
})
.map(|(id, _)| id)
.collect()
}
pub fn committed_count(&self) -> usize {
self.snapshot_seqs.lock().len()
}
pub fn is_active(&self, txn_id: i64) -> bool {
self.transactions
.lock()
.get(txn_id)
.map(|e| e.status() == TxnStatus::Active)
.unwrap_or(false)
}
pub fn is_committed(&self, txn_id: i64) -> bool {
if self.transactions.lock().contains_key(txn_id) {
return false;
}
let next = self.next_txn_id.load(Ordering::Acquire);
txn_id > 0 && txn_id <= next
}
#[cfg(test)]
pub fn is_committing(&self, txn_id: i64) -> bool {
self.transactions
.lock()
.get(txn_id)
.map(|e| e.status() == TxnStatus::Committing)
.unwrap_or(false)
}
pub fn is_committed_before(&self, txn_id: i64, cutoff_commit_seq: i64) -> bool {
if txn_id < 0 {
return true;
}
if self.transactions.lock().contains_key(txn_id) {
return false;
}
if let Some(&commit_seq) = self.snapshot_seqs.lock().get(txn_id) {
return commit_seq <= cutoff_commit_seq;
}
let next = self.next_txn_id.load(Ordering::Acquire);
txn_id > 0 && txn_id <= next
}
}
impl Default for TransactionRegistry {
fn default() -> Self {
Self::new()
}
}
impl VisibilityChecker for TransactionRegistry {
fn is_visible(&self, version_txn_id: i64, viewing_txn_id: i64) -> bool {
TransactionRegistry::is_visible(self, version_txn_id, viewing_txn_id)
}
fn get_current_sequence(&self) -> i64 {
TransactionRegistry::get_current_sequence(self)
}
fn get_active_transaction_ids(&self) -> Vec<i64> {
self.transactions
.lock()
.iter()
.filter(|(_, s)| s.status() == TxnStatus::Active)
.map(|(id, _)| id)
.collect()
}
fn is_committed_before(&self, txn_id: i64, cutoff_commit_seq: i64) -> bool {
TransactionRegistry::is_committed_before(self, txn_id, cutoff_commit_seq)
}
fn needs_snapshot_isolation(&self, txn_id: i64) -> bool {
TransactionRegistry::needs_snapshot_isolation(self, txn_id)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_begin_transaction() {
let registry = TransactionRegistry::new();
let (txn_id1, seq1) = registry.begin_transaction();
assert!(txn_id1 > 0);
assert!(seq1 > 0);
let (txn_id2, seq2) = registry.begin_transaction();
assert!(txn_id2 > txn_id1);
assert!(seq2 > seq1);
}
#[test]
fn test_commit_transaction() {
let registry = TransactionRegistry::new();
let (txn_id, _) = registry.begin_transaction();
assert!(registry.is_active(txn_id));
assert!(!registry.is_committed(txn_id));
registry.commit_transaction(txn_id);
assert!(!registry.is_active(txn_id));
assert!(registry.is_committed(txn_id));
}
#[test]
fn test_two_phase_commit() {
let registry = TransactionRegistry::new();
let (txn_id, _) = registry.begin_transaction();
let commit_seq = registry.start_commit(txn_id);
assert!(commit_seq > 0);
assert!(!registry.is_active(txn_id));
assert!(registry.is_committing(txn_id));
registry.complete_commit(txn_id);
assert!(!registry.is_committing(txn_id));
assert!(registry.is_committed(txn_id));
}
#[test]
fn test_abort_transaction() {
let registry = TransactionRegistry::new();
let (txn_id, _) = registry.begin_transaction();
assert!(registry.is_active(txn_id));
registry.abort_transaction(txn_id);
assert!(!registry.is_active(txn_id));
assert!(!registry.is_committed(txn_id));
let state = registry.transactions.lock().get(txn_id).copied();
assert!(state.map(|s| s.is_aborted()).unwrap_or(false));
}
#[test]
fn test_visibility_own_writes() {
let registry = TransactionRegistry::new();
let (txn_id, _) = registry.begin_transaction();
assert!(registry.is_visible(txn_id, txn_id));
}
#[test]
fn test_visibility_recovery_transaction() {
let registry = TransactionRegistry::new();
let (viewer_id, _) = registry.begin_transaction();
assert!(registry.is_visible(RECOVERY_TRANSACTION_ID, viewer_id));
}
#[test]
fn test_visibility_read_committed() {
let registry = TransactionRegistry::new();
registry.set_global_isolation_level(IsolationLevel::ReadCommitted);
let (txn1, _) = registry.begin_transaction();
let (txn2, _) = registry.begin_transaction();
assert!(!registry.is_visible(txn1, txn2));
registry.commit_transaction(txn1);
assert!(registry.is_visible(txn1, txn2));
}
#[test]
fn test_visibility_snapshot_isolation() {
let registry = TransactionRegistry::new();
registry.set_global_isolation_level(IsolationLevel::SnapshotIsolation);
let (txn1, _) = registry.begin_transaction();
registry.commit_transaction(txn1);
let (txn2, _) = registry.begin_transaction();
assert!(registry.is_visible(txn1, txn2));
let (txn3, _) = registry.begin_transaction();
registry.commit_transaction(txn3);
assert!(!registry.is_visible(txn3, txn2));
}
#[test]
fn test_stop_accepting() {
let registry = TransactionRegistry::new();
assert!(registry.is_accepting());
registry.stop_accepting_transactions();
assert!(!registry.is_accepting());
let (txn_id, _) = registry.begin_transaction();
assert_eq!(txn_id, INVALID_TRANSACTION_ID);
}
#[test]
fn test_isolation_level_override() {
let registry = TransactionRegistry::new();
registry.set_global_isolation_level(IsolationLevel::ReadCommitted);
let (txn_id, _) = registry.begin_transaction();
assert_eq!(
registry.get_isolation_level(txn_id),
IsolationLevel::ReadCommitted
);
registry.set_transaction_isolation_level(txn_id, IsolationLevel::SnapshotIsolation);
assert_eq!(
registry.get_isolation_level(txn_id),
IsolationLevel::SnapshotIsolation
);
registry.remove_transaction_isolation_level(txn_id);
assert_eq!(
registry.get_isolation_level(txn_id),
IsolationLevel::ReadCommitted
);
}
#[test]
fn test_get_commit_sequence() {
let registry = TransactionRegistry::new();
registry.set_global_isolation_level(IsolationLevel::SnapshotIsolation);
let (txn_id, _) = registry.begin_transaction();
assert!(registry.get_commit_sequence(txn_id).is_none());
let commit_seq = registry.commit_transaction(txn_id);
assert_eq!(registry.get_commit_sequence(txn_id), Some(commit_seq));
}
#[test]
fn test_recover_committed_transaction() {
let registry = TransactionRegistry::new();
registry.recover_committed_transaction(1000, 500);
assert!(registry.is_committed(1000));
assert_eq!(registry.get_commit_sequence(1000), Some(500));
let (new_id, _) = registry.begin_transaction();
assert!(new_id > 1000);
}
#[test]
fn test_gc() {
let registry = TransactionRegistry::new();
registry.set_global_isolation_level(IsolationLevel::SnapshotIsolation);
for _ in 0..10 {
let (txn_id, _) = registry.begin_transaction();
registry.commit_transaction(txn_id);
}
assert_eq!(registry.snapshot_seqs.lock().len(), 10);
let (active_txn, _) = registry.begin_transaction();
for _ in 0..5 {
let (txn_id, _) = registry.begin_transaction();
registry.commit_transaction(txn_id);
}
let removed = registry.run_gc();
assert!(removed > 0);
registry.commit_transaction(active_txn);
}
#[test]
fn test_aborted_not_visible() {
let registry = TransactionRegistry::new();
let (txn1, _) = registry.begin_transaction();
let (txn2, _) = registry.begin_transaction();
registry.abort_transaction(txn1);
assert!(!registry.is_visible(txn1, txn2));
assert!(!registry.is_committed(txn1));
}
#[test]
fn test_per_transaction_snapshot_isolation() {
let registry = TransactionRegistry::new();
registry.set_global_isolation_level(IsolationLevel::ReadCommitted);
let (txn1, _) = registry.begin_transaction();
registry.commit_transaction(txn1);
let (txn2, _) = registry.begin_transaction();
registry.set_transaction_isolation_level(txn2, IsolationLevel::SnapshotIsolation);
assert!(registry.is_visible(txn1, txn2));
let (txn3, _) = registry.begin_transaction();
registry.commit_transaction(txn3);
assert!(!registry.is_visible(txn3, txn2));
let (txn4, _) = registry.begin_transaction();
assert!(registry.is_visible(txn3, txn4));
}
#[test]
fn test_commit_read_committed_skips_snapshot_seqs() {
let registry = TransactionRegistry::new();
registry.set_global_isolation_level(IsolationLevel::ReadCommitted);
let (txn_id, _) = registry.begin_transaction();
registry.commit_transaction(txn_id);
assert!(registry.snapshot_seqs.lock().is_empty());
}
#[test]
fn test_recover_committed_advances_next_txn_id() {
let registry = TransactionRegistry::new();
registry.recover_committed_transaction(100, 50);
let (new_id, _) = registry.begin_transaction();
assert!(new_id > 100);
}
#[test]
fn test_recover_committed_advances_next_sequence() {
let registry = TransactionRegistry::new();
registry.recover_committed_transaction(10, 200);
let (_, begin_seq) = registry.begin_transaction();
assert!(begin_seq > 200);
}
#[test]
fn test_recover_committed_descending_order() {
let registry = TransactionRegistry::new();
registry.recover_committed_transaction(100, 50);
registry.recover_committed_transaction(50, 30);
let (new_id, _) = registry.begin_transaction();
assert!(new_id > 100, "next_txn_id should be >= 100, got {}", new_id);
}
#[test]
fn test_recover_aborted_marks_aborted() {
let registry = TransactionRegistry::new();
registry.recover_aborted_transaction(42);
assert!(!registry.is_committed(42));
assert!(!registry.is_active(42));
let state = registry.transactions.lock().get(42).copied();
assert!(state.is_some());
assert!(state.unwrap().is_aborted());
}
#[test]
fn test_recover_aborted_advances_next_txn_id() {
let registry = TransactionRegistry::new();
registry.recover_aborted_transaction(100);
let (new_id, _) = registry.begin_transaction();
assert!(new_id > 100);
}
#[test]
fn test_recover_aborted_descending_order() {
let registry = TransactionRegistry::new();
registry.recover_aborted_transaction(200);
registry.recover_aborted_transaction(100);
let (new_id, _) = registry.begin_transaction();
assert!(new_id > 200, "next_txn_id should be > 200, got {}", new_id);
}
#[test]
fn test_recover_aborted_not_visible() {
let registry = TransactionRegistry::new();
registry.recover_aborted_transaction(5);
let (viewer, _) = registry.begin_transaction();
assert!(!registry.is_visible(5, viewer));
}
#[test]
fn test_check_committed_negative_txn_id_not_committed() {
let registry = TransactionRegistry::new();
let (txn_id, _) = registry.begin_transaction();
registry.commit_transaction(txn_id);
assert!(!registry.check_committed(-5));
assert!(!registry.check_committed(-100));
}
#[test]
fn test_check_committed_future_txn_id() {
let registry = TransactionRegistry::new();
assert!(!registry.check_committed(1));
}
#[test]
fn test_check_committed_valid_committed() {
let registry = TransactionRegistry::new();
registry.set_global_isolation_level(IsolationLevel::SnapshotIsolation);
let (txn_id, _) = registry.begin_transaction();
registry.commit_transaction(txn_id);
assert!(registry.check_committed(txn_id));
}
#[test]
fn test_check_committed_boundary_next_txn_id() {
let registry = TransactionRegistry::new();
let (txn_id, _) = registry.begin_transaction();
registry.commit_transaction(txn_id);
assert!(registry.check_committed(txn_id));
assert!(!registry.check_committed(txn_id + 1));
}
#[test]
fn test_is_directly_visible_recovery() {
let registry = TransactionRegistry::new();
assert!(registry.is_directly_visible(RECOVERY_TRANSACTION_ID));
}
#[test]
fn test_is_directly_visible_normal_txn() {
let registry = TransactionRegistry::new();
registry.set_global_isolation_level(IsolationLevel::SnapshotIsolation);
let (txn_id, _) = registry.begin_transaction();
assert!(!registry.is_directly_visible(txn_id));
registry.commit_transaction(txn_id);
assert!(registry.is_directly_visible(txn_id));
}
#[test]
fn test_is_directly_visible_non_recovery_negative() {
let registry = TransactionRegistry::new();
assert!(!registry.is_directly_visible(-99));
}
#[test]
fn test_snapshot_committed_viewer_fallback() {
let registry = TransactionRegistry::new();
registry.set_global_isolation_level(IsolationLevel::SnapshotIsolation);
let (txn1, _) = registry.begin_transaction();
registry.commit_transaction(txn1);
let (txn2, _) = registry.begin_transaction();
registry.commit_transaction(txn2);
assert!(registry.is_visible(txn1, txn2));
}
#[test]
fn test_snapshot_invalid_version_txn_zero() {
let registry = TransactionRegistry::new();
registry.set_global_isolation_level(IsolationLevel::SnapshotIsolation);
let (viewer, _) = registry.begin_transaction();
assert!(!registry.is_visible(0, viewer));
}
#[test]
fn test_snapshot_invalid_version_txn_negative() {
let registry = TransactionRegistry::new();
registry.set_global_isolation_level(IsolationLevel::SnapshotIsolation);
let (viewer, _) = registry.begin_transaction();
assert!(!registry.is_visible(-50, viewer));
}
#[test]
fn test_snapshot_future_version_txn_not_visible() {
let registry = TransactionRegistry::new();
registry.set_global_isolation_level(IsolationLevel::SnapshotIsolation);
let (viewer, _) = registry.begin_transaction();
let next = registry.next_txn_id.load(Ordering::Acquire);
assert!(!registry.is_visible(next + 1, viewer));
assert!(!registry.is_visible(next + 100, viewer));
}
#[test]
fn test_snapshot_boundary_version_equals_next() {
let registry = TransactionRegistry::new();
registry.set_global_isolation_level(IsolationLevel::SnapshotIsolation);
let (txn1, _) = registry.begin_transaction();
registry.commit_transaction(txn1);
let (viewer, _) = registry.begin_transaction();
assert!(registry.is_visible(txn1, viewer));
}
#[test]
fn test_get_commit_sequence_invalid_txn_id() {
let registry = TransactionRegistry::new();
assert_eq!(registry.get_commit_sequence(0), None);
assert_eq!(registry.get_commit_sequence(-1), None);
}
#[test]
fn test_get_commit_sequence_future_txn_id() {
let registry = TransactionRegistry::new();
assert_eq!(registry.get_commit_sequence(1), None);
}
#[test]
fn test_get_commit_sequence_active() {
let registry = TransactionRegistry::new();
let (txn_id, _) = registry.begin_transaction();
assert_eq!(registry.get_commit_sequence(txn_id), None);
}
#[test]
fn test_get_commit_sequence_aborted() {
let registry = TransactionRegistry::new();
let (txn_id, _) = registry.begin_transaction();
registry.abort_transaction(txn_id);
assert_eq!(registry.get_commit_sequence(txn_id), None);
}
#[test]
fn test_get_commit_sequence_committed_with_snapshot() {
let registry = TransactionRegistry::new();
registry.set_global_isolation_level(IsolationLevel::SnapshotIsolation);
let (txn_id, _) = registry.begin_transaction();
let commit_seq = registry.commit_transaction(txn_id);
assert_eq!(registry.get_commit_sequence(txn_id), Some(commit_seq));
assert!(commit_seq > 0);
}
}