use dashmap::DashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
#[derive(Debug)]
pub struct TimestampOracle {
next_ts: AtomicU64,
}
impl TimestampOracle {
pub fn new(start_ts: u64) -> Self {
Self {
next_ts: AtomicU64::new(start_ts),
}
}
pub fn next(&self) -> u64 {
self.next_ts.fetch_add(1, Ordering::SeqCst) + 1
}
pub fn read(&self) -> u64 {
self.next_ts.load(Ordering::SeqCst)
}
}
impl Default for TimestampOracle {
fn default() -> Self {
Self::new(1)
}
}
#[derive(Debug)]
pub struct TransactionManager {
oracle: Arc<TimestampOracle>,
active_txs: Arc<DashMap<u64, u64>>,
}
impl Default for TransactionManager {
fn default() -> Self {
Self::new()
}
}
impl TransactionManager {
pub fn new() -> Self {
Self {
oracle: Arc::new(TimestampOracle::default()),
active_txs: Arc::new(DashMap::new()),
}
}
pub fn begin_transaction(&self) -> u64 {
let read_ts = self.oracle.next();
let tx_id = read_ts; self.active_txs.insert(tx_id, read_ts);
read_ts
}
pub fn allocate_commit_ts(&self) -> u64 {
self.oracle.next()
}
pub fn current_ts(&self) -> u64 {
self.oracle.read()
}
pub fn end_transaction(&self, tx_id: u64) {
self.active_txs.remove(&tx_id);
}
pub fn min_active_ts(&self) -> Option<u64> {
self.active_txs.iter().map(|entry| *entry.value()).min()
}
pub fn active_count(&self) -> usize {
self.active_txs.len()
}
}