use rustc_hash::FxHashMap;
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::sync::RwLock;
use std::time::{Duration, Instant};
use crate::common::{new_concurrent_int64_map, ConcurrentInt64Map};
use crate::core::IsolationLevel;
use crate::storage::mvcc::VisibilityChecker;
pub const INVALID_TRANSACTION_ID: i64 = -999999999;
pub const RECOVERY_TRANSACTION_ID: i64 = -1;
pub struct TransactionRegistry {
next_txn_id: AtomicI64,
active_transactions: ConcurrentInt64Map<i64>,
committed_transactions: ConcurrentInt64Map<i64>,
committing_transactions: ConcurrentInt64Map<i64>,
global_isolation_level: RwLock<IsolationLevel>,
transaction_isolation_levels: RwLock<FxHashMap<i64, IsolationLevel>>,
accepting: AtomicBool,
next_sequence: AtomicI64,
}
impl TransactionRegistry {
pub fn new() -> Self {
Self {
next_txn_id: AtomicI64::new(0),
active_transactions: new_concurrent_int64_map(),
committed_transactions: new_concurrent_int64_map(),
committing_transactions: new_concurrent_int64_map(),
global_isolation_level: RwLock::new(IsolationLevel::ReadCommitted),
transaction_isolation_levels: RwLock::new(FxHashMap::with_capacity_and_hasher(
100,
Default::default(),
)),
accepting: AtomicBool::new(true),
next_sequence: AtomicI64::new(0),
}
}
pub fn set_transaction_isolation_level(&self, txn_id: i64, level: IsolationLevel) {
let mut levels = self.transaction_isolation_levels.write().unwrap();
levels.insert(txn_id, level);
}
pub fn remove_transaction_isolation_level(&self, txn_id: i64) {
let mut levels = self.transaction_isolation_levels.write().unwrap();
levels.remove(&txn_id);
}
pub fn set_global_isolation_level(&self, level: IsolationLevel) {
let mut global = self.global_isolation_level.write().unwrap();
*global = level;
}
pub fn get_global_isolation_level(&self) -> IsolationLevel {
*self.global_isolation_level.read().unwrap()
}
pub fn get_isolation_level(&self, txn_id: i64) -> IsolationLevel {
let levels = self.transaction_isolation_levels.read().unwrap();
if let Some(&level) = levels.get(&txn_id) {
return level;
}
drop(levels);
self.get_global_isolation_level()
}
pub fn begin_transaction(&self) -> (i64, i64) {
if !self.accepting.load(Ordering::Acquire) {
return (INVALID_TRANSACTION_ID, 0);
}
let txn_id = self.next_txn_id.fetch_add(1, Ordering::AcqRel) + 1;
let begin_seq = self.next_sequence.fetch_add(1, Ordering::AcqRel) + 1;
self.active_transactions.insert(txn_id, begin_seq);
(txn_id, begin_seq)
}
#[inline]
pub fn start_commit(&self, txn_id: i64) -> i64 {
let commit_seq = self.next_sequence.fetch_add(1, Ordering::AcqRel) + 1;
self.active_transactions.remove(&txn_id);
self.committing_transactions.insert(txn_id, commit_seq);
commit_seq
}
#[inline]
pub fn complete_commit(&self, txn_id: i64) {
let commit_seq = self
.committing_transactions
.get(&txn_id)
.map(|r| *r)
.unwrap_or_else(|| self.next_sequence.load(Ordering::Acquire));
self.committing_transactions.remove(&txn_id);
self.committed_transactions.insert(txn_id, commit_seq);
}
pub fn commit_transaction(&self, txn_id: i64) -> i64 {
let commit_seq = self.next_sequence.fetch_add(1, Ordering::AcqRel) + 1;
self.active_transactions.remove(&txn_id);
self.committed_transactions.insert(txn_id, commit_seq);
commit_seq
}
pub fn recover_committed_transaction(&self, txn_id: i64, commit_seq: i64) {
self.committed_transactions.insert(txn_id, commit_seq);
loop {
let current = self.next_txn_id.load(Ordering::Acquire);
if txn_id >= current {
if self
.next_txn_id
.compare_exchange(current, txn_id + 1, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
break;
}
} else {
break;
}
}
loop {
let current = self.next_sequence.load(Ordering::Acquire);
if commit_seq >= current {
if self
.next_sequence
.compare_exchange(current, commit_seq + 1, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
break;
}
} else {
break;
}
}
}
pub fn recover_aborted_transaction(&self, txn_id: i64) {
loop {
let current = self.next_txn_id.load(Ordering::Acquire);
if txn_id >= current {
if self
.next_txn_id
.compare_exchange(current, txn_id + 1, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
break;
}
} else {
break;
}
}
}
#[inline]
pub fn abort_transaction(&self, txn_id: i64) {
self.active_transactions.remove(&txn_id);
self.committing_transactions.remove(&txn_id);
}
pub fn get_commit_sequence(&self, txn_id: i64) -> Option<i64> {
self.committed_transactions.get(&txn_id).map(|r| *r)
}
pub fn get_transaction_begin_sequence(&self, txn_id: i64) -> i64 {
self.active_transactions
.get(&txn_id)
.map(|r| *r)
.unwrap_or(0)
}
pub fn get_current_sequence(&self) -> i64 {
self.next_sequence.load(Ordering::Acquire)
}
#[inline(always)]
pub fn is_directly_visible(&self, version_txn_id: i64) -> bool {
if version_txn_id == RECOVERY_TRANSACTION_ID {
return true;
}
if self.committed_transactions.contains_key(&version_txn_id) {
return true;
}
false
}
#[inline(always)]
pub fn is_visible(&self, version_txn_id: i64, viewer_txn_id: i64) -> bool {
if version_txn_id == viewer_txn_id {
return true;
}
if version_txn_id == RECOVERY_TRANSACTION_ID {
return true;
}
{
let levels = self.transaction_isolation_levels.read().unwrap();
if !levels.contains_key(&viewer_txn_id) {
drop(levels);
if *self.global_isolation_level.read().unwrap() == IsolationLevel::ReadCommitted {
return self.is_directly_visible(version_txn_id);
}
}
}
self.is_visible_snapshot(version_txn_id, viewer_txn_id)
}
#[inline(never)]
fn is_visible_snapshot(&self, version_txn_id: i64, viewer_txn_id: i64) -> bool {
if self.get_isolation_level(viewer_txn_id) == IsolationLevel::ReadCommitted {
return self.is_directly_visible(version_txn_id);
}
if self.committing_transactions.contains_key(&version_txn_id) {
return false;
}
let commit_seq = match self.committed_transactions.get(&version_txn_id) {
Some(seq) => *seq,
None => return false,
};
let viewer_begin_seq = match self.active_transactions.get(&viewer_txn_id) {
Some(seq) => *seq,
None => return false,
};
commit_seq <= viewer_begin_seq
}
pub fn cleanup_old_transactions(&self, max_age: Duration) -> i32 {
let isolation_level = self.get_global_isolation_level();
if isolation_level == IsolationLevel::ReadCommitted {
return 0;
}
let cutoff_time = Instant::now()
.checked_sub(max_age)
.map(|_| self.next_sequence.load(Ordering::Acquire) - (max_age.as_nanos() as i64))
.unwrap_or(0);
let active_set: std::collections::HashSet<i64> =
if isolation_level == IsolationLevel::SnapshotIsolation {
self.active_transactions
.iter()
.map(|entry| *entry.key())
.collect()
} else {
std::collections::HashSet::new()
};
let txns_to_remove: Vec<i64> = self
.committed_transactions
.iter()
.filter(|entry| {
let txn_id = *entry.key();
let commit_seq = *entry.value();
if txn_id < 0 {
return false;
}
if isolation_level == IsolationLevel::SnapshotIsolation
&& active_set.contains(&txn_id)
{
return false;
}
commit_seq < cutoff_time
})
.map(|entry| *entry.key())
.collect();
let mut removed = 0;
for txn_id in txns_to_remove {
self.committed_transactions.remove(&txn_id);
removed += 1;
}
removed
}
pub fn wait_for_active_transactions(&self, timeout: Duration) -> i32 {
let deadline = Instant::now() + timeout;
loop {
if Instant::now() > deadline {
break;
}
let active_count = self.active_transactions.len();
if active_count == 0 {
return 0;
}
std::thread::sleep(Duration::from_millis(10));
}
self.active_transactions.len() as i32
}
pub fn stop_accepting_transactions(&self) {
self.accepting.store(false, Ordering::Release);
}
pub fn start_accepting_transactions(&self) {
self.accepting.store(true, Ordering::Release);
}
pub fn shutdown(&self) {
self.stop_accepting_transactions();
}
pub fn is_accepting(&self) -> bool {
self.accepting.load(Ordering::Acquire)
}
pub fn active_count(&self) -> usize {
self.active_transactions.len()
}
pub fn committed_count(&self) -> usize {
self.committed_transactions.len()
}
pub fn is_active(&self, txn_id: i64) -> bool {
self.active_transactions.contains_key(&txn_id)
}
pub fn is_committed(&self, txn_id: i64) -> bool {
self.committed_transactions.contains_key(&txn_id)
}
pub fn current_commit_sequence(&self) -> i64 {
self.next_sequence.load(Ordering::Acquire)
}
pub fn is_committed_before(&self, txn_id: i64, cutoff_commit_seq: i64) -> bool {
if txn_id < 0 {
return true;
}
if let Some(commit_seq) = self.committed_transactions.get(&txn_id) {
*commit_seq <= cutoff_commit_seq
} else {
true
}
}
}
impl Default for TransactionRegistry {
fn default() -> Self {
Self::new()
}
}
impl VisibilityChecker for TransactionRegistry {
fn is_visible(&self, version_txn_id: i64, viewing_txn_id: i64) -> bool {
TransactionRegistry::is_visible(self, version_txn_id, viewing_txn_id)
}
fn get_current_sequence(&self) -> i64 {
TransactionRegistry::get_current_sequence(self)
}
fn get_active_transaction_ids(&self) -> Vec<i64> {
let mut ids = Vec::new();
self.active_transactions
.iter()
.for_each(|entry| ids.push(*entry.key()));
ids
}
fn is_committed_before(&self, txn_id: i64, cutoff_commit_seq: i64) -> bool {
TransactionRegistry::is_committed_before(self, txn_id, cutoff_commit_seq)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_begin_transaction() {
let registry = TransactionRegistry::new();
let (txn_id1, seq1) = registry.begin_transaction();
assert!(txn_id1 > 0);
assert!(seq1 > 0);
let (txn_id2, seq2) = registry.begin_transaction();
assert!(txn_id2 > txn_id1);
assert!(seq2 > seq1);
}
#[test]
fn test_commit_transaction() {
let registry = TransactionRegistry::new();
let (txn_id, _) = registry.begin_transaction();
assert!(registry.is_active(txn_id));
assert!(!registry.is_committed(txn_id));
registry.commit_transaction(txn_id);
assert!(!registry.is_active(txn_id));
assert!(registry.is_committed(txn_id));
}
#[test]
fn test_two_phase_commit() {
let registry = TransactionRegistry::new();
let (txn_id, _) = registry.begin_transaction();
let commit_seq = registry.start_commit(txn_id);
assert!(commit_seq > 0);
assert!(!registry.is_active(txn_id));
assert!(registry.committing_transactions.contains_key(&txn_id));
registry.complete_commit(txn_id);
assert!(!registry.committing_transactions.contains_key(&txn_id));
assert!(registry.is_committed(txn_id));
}
#[test]
fn test_abort_transaction() {
let registry = TransactionRegistry::new();
let (txn_id, _) = registry.begin_transaction();
assert!(registry.is_active(txn_id));
registry.abort_transaction(txn_id);
assert!(!registry.is_active(txn_id));
assert!(!registry.is_committed(txn_id));
}
#[test]
fn test_visibility_own_writes() {
let registry = TransactionRegistry::new();
let (txn_id, _) = registry.begin_transaction();
assert!(registry.is_visible(txn_id, txn_id));
}
#[test]
fn test_visibility_recovery_transaction() {
let registry = TransactionRegistry::new();
let (viewer_id, _) = registry.begin_transaction();
assert!(registry.is_visible(RECOVERY_TRANSACTION_ID, viewer_id));
}
#[test]
fn test_visibility_read_committed() {
let registry = TransactionRegistry::new();
registry.set_global_isolation_level(IsolationLevel::ReadCommitted);
let (txn1, _) = registry.begin_transaction();
let (txn2, _) = registry.begin_transaction();
assert!(!registry.is_visible(txn1, txn2));
registry.commit_transaction(txn1);
assert!(registry.is_visible(txn1, txn2));
}
#[test]
fn test_visibility_snapshot_isolation() {
let registry = TransactionRegistry::new();
registry.set_global_isolation_level(IsolationLevel::SnapshotIsolation);
let (txn1, _) = registry.begin_transaction();
registry.commit_transaction(txn1);
let (txn2, _) = registry.begin_transaction();
assert!(registry.is_visible(txn1, txn2));
let (txn3, _) = registry.begin_transaction();
registry.commit_transaction(txn3);
assert!(!registry.is_visible(txn3, txn2));
}
#[test]
fn test_stop_accepting() {
let registry = TransactionRegistry::new();
assert!(registry.is_accepting());
registry.stop_accepting_transactions();
assert!(!registry.is_accepting());
let (txn_id, _) = registry.begin_transaction();
assert_eq!(txn_id, INVALID_TRANSACTION_ID);
}
#[test]
fn test_isolation_level_override() {
let registry = TransactionRegistry::new();
registry.set_global_isolation_level(IsolationLevel::ReadCommitted);
let (txn_id, _) = registry.begin_transaction();
assert_eq!(
registry.get_isolation_level(txn_id),
IsolationLevel::ReadCommitted
);
registry.set_transaction_isolation_level(txn_id, IsolationLevel::SnapshotIsolation);
assert_eq!(
registry.get_isolation_level(txn_id),
IsolationLevel::SnapshotIsolation
);
registry.remove_transaction_isolation_level(txn_id);
assert_eq!(
registry.get_isolation_level(txn_id),
IsolationLevel::ReadCommitted
);
}
#[test]
fn test_get_commit_sequence() {
let registry = TransactionRegistry::new();
let (txn_id, _) = registry.begin_transaction();
assert!(registry.get_commit_sequence(txn_id).is_none());
let commit_seq = registry.commit_transaction(txn_id);
assert_eq!(registry.get_commit_sequence(txn_id), Some(commit_seq));
}
#[test]
fn test_recover_committed_transaction() {
let registry = TransactionRegistry::new();
registry.recover_committed_transaction(1000, 500);
assert!(registry.is_committed(1000));
assert_eq!(registry.get_commit_sequence(1000), Some(500));
let (new_id, _) = registry.begin_transaction();
assert!(new_id > 1000);
}
}