use crate::api::transaction::types::TxId;
use crate::core::temporal::Timestamp;
use std::collections::HashSet;
use std::sync::{Arc, Mutex, PoisonError};
#[derive(Debug, Clone)]
pub struct TransactionSnapshot {
pub snapshot_timestamp: Timestamp,
pub active_transactions: Arc<HashSet<TxId>>,
}
impl TransactionSnapshot {
pub fn is_visible(&self, created_by_tx: TxId, commit_timestamp: Option<Timestamp>) -> bool {
match commit_timestamp {
None => false, Some(ts) => {
ts < self.snapshot_timestamp && !self.active_transactions.contains(&created_by_tx)
}
}
}
}
pub struct TxVisibilityManager {
active: Mutex<Arc<HashSet<TxId>>>,
}
impl TxVisibilityManager {
pub fn new() -> Self {
TxVisibilityManager {
active: Mutex::new(Arc::new(HashSet::new())),
}
}
pub fn register_active(&self, tx_id: TxId) {
let mut guard = self.active.lock().unwrap_or_else(PoisonError::into_inner);
Arc::make_mut(&mut *guard).insert(tx_id);
}
pub fn capture_snapshot(&self, snapshot_timestamp: Timestamp) -> TransactionSnapshot {
let guard = self.active.lock().unwrap_or_else(PoisonError::into_inner);
TransactionSnapshot {
snapshot_timestamp,
active_transactions: Arc::clone(&guard),
}
}
pub fn register_commit(&self, tx_id: TxId) {
let mut guard = self.active.lock().unwrap_or_else(PoisonError::into_inner);
Arc::make_mut(&mut *guard).remove(&tx_id);
}
pub fn register_abort(&self, tx_id: TxId) {
let mut guard = self.active.lock().unwrap_or_else(PoisonError::into_inner);
Arc::make_mut(&mut *guard).remove(&tx_id);
}
pub fn is_visible_with_embedded_ts(
&self,
snapshot: &TransactionSnapshot,
created_by_tx: TxId,
commit_timestamp: Option<Timestamp>,
) -> bool {
if created_by_tx.as_u64() == 0 {
return true;
}
snapshot.is_visible(created_by_tx, commit_timestamp)
}
pub fn active_count(&self) -> usize {
let guard = self.active.lock().unwrap_or_else(PoisonError::into_inner);
guard.len()
}
}
impl Default for TxVisibilityManager {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_snapshot_visibility_committed_before() {
let snapshot = TransactionSnapshot {
snapshot_timestamp: 100.into(),
active_transactions: Arc::new(HashSet::new()),
};
assert!(snapshot.is_visible(TxId::new(1), Some(50.into())));
}
#[test]
fn test_snapshot_visibility_committed_after() {
let snapshot = TransactionSnapshot {
snapshot_timestamp: 100.into(),
active_transactions: Arc::new(HashSet::new()),
};
assert!(!snapshot.is_visible(TxId::new(1), Some(150.into())));
}
#[test]
fn test_snapshot_visibility_uncommitted() {
let snapshot = TransactionSnapshot {
snapshot_timestamp: 100.into(),
active_transactions: Arc::new(HashSet::new()),
};
assert!(!snapshot.is_visible(TxId::new(1), None));
}
#[test]
fn test_snapshot_visibility_active_transaction() {
let mut active = HashSet::new();
active.insert(TxId::new(1));
let snapshot = TransactionSnapshot {
snapshot_timestamp: 100.into(),
active_transactions: Arc::new(active),
};
assert!(!snapshot.is_visible(TxId::new(1), Some(50.into())));
}
#[test]
fn test_visibility_manager_creation() {
let manager = TxVisibilityManager::new();
assert_eq!(manager.active_count(), 0);
}
#[test]
fn test_register_active() {
let manager = TxVisibilityManager::new();
manager.register_active(TxId::new(1));
manager.register_active(TxId::new(2));
assert_eq!(manager.active_count(), 2);
}
#[test]
fn test_capture_snapshot() {
let manager = TxVisibilityManager::new();
manager.register_active(TxId::new(1));
manager.register_active(TxId::new(2));
let snapshot = manager.capture_snapshot(100.into());
assert_eq!(snapshot.snapshot_timestamp, 100.into());
assert_eq!(snapshot.active_transactions.len(), 2);
assert!(snapshot.active_transactions.contains(&TxId::new(1)));
assert!(snapshot.active_transactions.contains(&TxId::new(2)));
}
#[test]
fn test_register_commit() {
let manager = TxVisibilityManager::new();
manager.register_active(TxId::new(1));
assert_eq!(manager.active_count(), 1);
manager.register_commit(TxId::new(1));
assert_eq!(manager.active_count(), 0);
}
#[test]
fn test_register_abort() {
let manager = TxVisibilityManager::new();
manager.register_active(TxId::new(1));
assert_eq!(manager.active_count(), 1);
manager.register_abort(TxId::new(1));
assert_eq!(manager.active_count(), 0);
}
#[test]
fn test_concurrent_snapshots() {
let manager = TxVisibilityManager::new();
manager.register_active(TxId::new(1));
let snapshot1 = manager.capture_snapshot(100.into());
assert_eq!(snapshot1.active_transactions.len(), 1);
manager.register_commit(TxId::new(1));
manager.register_active(TxId::new(2));
let snapshot2 = manager.capture_snapshot(120.into());
assert!(snapshot2.active_transactions.contains(&TxId::new(2)));
assert_eq!(snapshot1.active_transactions.len(), 1);
assert!(snapshot1.active_transactions.contains(&TxId::new(1)));
}
#[test]
fn test_count_methods() {
let manager = TxVisibilityManager::new();
assert_eq!(manager.active_count(), 0);
manager.register_active(TxId::new(1));
manager.register_active(TxId::new(2));
assert_eq!(manager.active_count(), 2);
manager.register_commit(TxId::new(1));
assert_eq!(manager.active_count(), 1);
manager.register_abort(TxId::new(2));
assert_eq!(manager.active_count(), 0);
}
#[test]
fn test_concurrent_visibility_checks() {
use std::sync::Arc;
use std::thread;
let manager = Arc::new(TxVisibilityManager::new());
for i in 1..=10u64 {
manager.register_active(TxId::new(i));
manager.register_commit(TxId::new(i));
}
let snapshot = manager.capture_snapshot(101.into());
let handles: Vec<_> = (0..10u64)
.map(|i| {
let mgr = Arc::clone(&manager);
let snap = snapshot.clone();
thread::spawn(move || {
for _ in 0..1000 {
let tx_id = TxId::new((i % 10) + 1);
let commit_ts: Option<crate::core::temporal::Timestamp> =
Some(((i * 10 + 1) as i64).into());
let _ = mgr.is_visible_with_embedded_ts(&snap, tx_id, commit_ts);
}
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
}
}
#[cfg(test)]
mod embedded_timestamp_visibility_tests {
use super::*;
#[test]
fn test_is_visible_with_embedded_ts_committed_before_snapshot() {
let manager = TxVisibilityManager::new();
let snapshot = manager.capture_snapshot(100.into());
assert!(
manager.is_visible_with_embedded_ts(&snapshot, TxId::new(1), Some(50.into())),
"Version committed before snapshot should be visible via embedded timestamp"
);
}
#[test]
fn test_is_visible_with_embedded_ts_committed_after_snapshot() {
let manager = TxVisibilityManager::new();
let snapshot = manager.capture_snapshot(100.into());
assert!(
!manager.is_visible_with_embedded_ts(&snapshot, TxId::new(1), Some(150.into())),
"Version committed after snapshot should not be visible"
);
}
#[test]
fn test_is_visible_with_embedded_ts_concurrent_transaction() {
let manager = TxVisibilityManager::new();
manager.register_active(TxId::new(1));
let snapshot = manager.capture_snapshot(100.into());
manager.register_commit(TxId::new(1));
assert!(
!manager.is_visible_with_embedded_ts(&snapshot, TxId::new(1), Some(90.into())),
"Version from concurrent transaction should not be visible"
);
}
#[test]
fn test_is_visible_with_embedded_ts_tx_zero_always_visible() {
let manager = TxVisibilityManager::new();
let snapshot = manager.capture_snapshot(100.into());
assert!(
manager.is_visible_with_embedded_ts(&snapshot, TxId::new(0), Some(0.into())),
"TxId(0) pre-existing data must always be visible"
);
}
#[test]
fn test_is_visible_with_embedded_ts_matches_map_based_check() {
let manager = TxVisibilityManager::new();
manager.register_active(TxId::new(1));
manager.register_commit(TxId::new(1));
let snapshot = manager.capture_snapshot(100.into());
let direct_result = snapshot.is_visible(TxId::new(1), Some(50.into()));
let embedded_result =
manager.is_visible_with_embedded_ts(&snapshot, TxId::new(1), Some(50.into()));
assert_eq!(
direct_result, embedded_result,
"Embedded-ts visibility must match direct snapshot visibility check"
);
}
#[test]
fn test_is_visible_with_embedded_ts_no_committed_map_required() {
let manager = TxVisibilityManager::new();
let snapshot = manager.capture_snapshot(100.into());
let commit_ts: Option<Timestamp> = Some(50.into());
assert!(
manager.is_visible_with_embedded_ts(&snapshot, TxId::new(42), commit_ts),
"Embedded-ts check must work without a committed-map entry"
);
}
}