use crate::write_set::{WriteSet, WriteSetApplicator};
use contextdb_core::{Error, Result, RowId, SnapshotId, TxId};
use parking_lot::Mutex;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
pub struct TxManager<S: WriteSetApplicator> {
next_tx: AtomicU64,
committed_watermark: AtomicU64,
next_lsn: AtomicU64,
active_txs: Mutex<HashMap<TxId, WriteSet>>,
commit_mutex: Mutex<()>,
store: S,
}
impl<S: WriteSetApplicator> TxManager<S> {
pub fn new(store: S) -> Self {
Self::new_with_counters(store, 1, 1, 0)
}
pub fn new_with_counters(
store: S,
next_tx: TxId,
next_lsn: u64,
committed_watermark: TxId,
) -> Self {
Self {
next_tx: AtomicU64::new(next_tx),
committed_watermark: AtomicU64::new(committed_watermark),
next_lsn: AtomicU64::new(next_lsn),
active_txs: Mutex::new(HashMap::new()),
commit_mutex: Mutex::new(()),
store,
}
}
pub fn begin(&self) -> TxId {
let tx_id = self.next_tx.fetch_add(1, Ordering::SeqCst);
let mut active = self.active_txs.lock();
active.insert(tx_id, WriteSet::new());
tx_id
}
pub fn snapshot(&self) -> SnapshotId {
self.committed_watermark.load(Ordering::SeqCst)
}
pub fn with_write_set<F, R>(&self, tx: TxId, f: F) -> Result<R>
where
F: FnOnce(&mut WriteSet) -> R,
{
let mut active = self.active_txs.lock();
let ws = active.get_mut(&tx).ok_or(Error::TxNotFound(tx))?;
Ok(f(ws))
}
pub fn cloned_write_set(&self, tx: TxId) -> Result<WriteSet> {
let active = self.active_txs.lock();
active.get(&tx).cloned().ok_or(Error::TxNotFound(tx))
}
pub fn commit(&self, tx: TxId) -> Result<()> {
self.commit_with_lsn(tx).map(|_| ())
}
pub fn commit_with_lsn(&self, tx: TxId) -> Result<u64> {
let _lock = self.commit_mutex.lock();
let mut ws = self.cloned_write_set(tx)?;
let lsn = self.next_lsn.fetch_add(1, Ordering::SeqCst);
ws.stamp_lsn(lsn);
self.store.apply(ws)?;
let mut active = self.active_txs.lock();
active.remove(&tx).ok_or(Error::TxNotFound(tx))?;
self.committed_watermark.fetch_max(tx, Ordering::SeqCst);
Ok(lsn)
}
pub fn rollback(&self, tx: TxId) -> Result<()> {
let mut active = self.active_txs.lock();
active.remove(&tx).ok_or(Error::TxNotFound(tx))?;
Ok(())
}
pub fn store(&self) -> &S {
&self.store
}
pub fn new_row_id(&self) -> RowId {
self.store.new_row_id()
}
pub fn current_lsn(&self) -> u64 {
self.next_lsn.load(Ordering::SeqCst).saturating_sub(1)
}
pub fn allocate_ddl_lsn<F, R>(&self, f: F) -> R
where
F: FnOnce(u64) -> R,
{
let _lock = self.commit_mutex.lock();
let lsn = self.next_lsn.fetch_add(1, Ordering::SeqCst);
f(lsn)
}
pub fn with_commit_lock<F, R>(&self, f: F) -> R
where
F: FnOnce() -> R,
{
let _lock = self.commit_mutex.lock();
f()
}
}
impl<S: WriteSetApplicator> contextdb_core::TransactionManager for TxManager<S> {
fn begin(&self) -> TxId {
TxManager::begin(self)
}
fn commit(&self, tx: TxId) -> Result<()> {
TxManager::commit(self, tx)
}
fn rollback(&self, tx: TxId) -> Result<()> {
TxManager::rollback(self, tx)
}
fn snapshot(&self) -> SnapshotId {
TxManager::snapshot(self)
}
}