use crossbeam_epoch::{self as epoch, Atomic, Owned};
use parking_lot::RwLock;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
pub type TxnId = u64;
pub type Timestamp = u64;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TxnStatus {
Active,
Committed(Timestamp),
Aborted,
}
#[derive(Debug, Clone)]
pub struct VersionInfo {
pub xmin: TxnId,
pub xmax: TxnId,
pub created_ts: Timestamp,
pub deleted_ts: Timestamp,
}
impl VersionInfo {
pub fn new(xmin: TxnId, created_ts: Timestamp) -> Self {
Self {
xmin,
xmax: 0,
created_ts,
deleted_ts: Timestamp::MAX,
}
}
pub fn delete(&mut self, xmax: TxnId, deleted_ts: Timestamp) {
self.xmax = xmax;
self.deleted_ts = deleted_ts;
}
#[allow(deprecated)]
pub fn is_visible(&self, snapshot: &Snapshot, txn_manager: &TransactionManager) -> bool {
if self.xmin == snapshot.txn_id {
if self.xmax == snapshot.txn_id {
return false;
}
return true;
}
match txn_manager.get_status(self.xmin) {
Some(TxnStatus::Committed(commit_ts)) => {
if commit_ts >= snapshot.start_ts {
return false; }
}
Some(TxnStatus::Active) => {
if snapshot.active_txns.contains(&self.xmin) {
return false; }
return false; }
Some(TxnStatus::Aborted) | None => {
return false; }
}
if self.xmax == 0 {
return true; }
if self.xmax == snapshot.txn_id {
return false; }
match txn_manager.get_status(self.xmax) {
Some(TxnStatus::Committed(commit_ts)) => {
if commit_ts < snapshot.start_ts {
return false; }
true }
Some(TxnStatus::Active) | Some(TxnStatus::Aborted) | None => {
true }
}
}
}
#[derive(Debug, Clone)]
pub struct Snapshot {
pub txn_id: TxnId,
pub start_ts: Timestamp,
pub active_txns: HashSet<TxnId>,
pub xmin: TxnId,
}
impl Snapshot {
pub fn new(txn_id: TxnId, start_ts: Timestamp, active_txns: HashSet<TxnId>) -> Self {
let xmin = active_txns.iter().copied().min().unwrap_or(txn_id);
Self {
txn_id,
start_ts,
active_txns,
xmin,
}
}
pub fn is_txn_visible(&self, txn_id: TxnId, commit_ts: Option<Timestamp>) -> bool {
if txn_id == self.txn_id {
return true; }
if self.active_txns.contains(&txn_id) {
return false; }
match commit_ts {
Some(ts) => ts < self.start_ts,
None => false, }
}
}
#[deprecated(
since = "0.1.0",
note = "Use MvccTransactionManager from wal_integration for production workloads with durability"
)]
pub struct TransactionManager {
next_txn_id: AtomicU64,
timestamp: AtomicU64,
active_txns: RwLock<HashMap<TxnId, (Timestamp, TxnStatus)>>,
commit_log: RwLock<BTreeMap<TxnId, Timestamp>>,
min_active_txn: AtomicU64,
}
#[allow(deprecated)]
impl TransactionManager {
pub fn new() -> Self {
Self {
next_txn_id: AtomicU64::new(1),
timestamp: AtomicU64::new(1),
active_txns: RwLock::new(HashMap::new()),
commit_log: RwLock::new(BTreeMap::new()),
min_active_txn: AtomicU64::new(u64::MAX),
}
}
pub fn begin(&self) -> (TxnId, Timestamp) {
let txn_id = self.next_txn_id.fetch_add(1, Ordering::SeqCst);
let start_ts = self.timestamp.fetch_add(1, Ordering::SeqCst);
{
let mut active = self.active_txns.write();
active.insert(txn_id, (start_ts, TxnStatus::Active));
}
self.update_min_active();
(txn_id, start_ts)
}
pub fn acquire_snapshot(&self, txn_id: TxnId) -> Snapshot {
let active = self.active_txns.read();
let start_ts = active
.get(&txn_id)
.map(|(ts, _)| *ts)
.unwrap_or_else(|| self.timestamp.load(Ordering::SeqCst));
let active_set: HashSet<TxnId> = active
.iter()
.filter(|(id, (_, status))| **id != txn_id && *status == TxnStatus::Active)
.map(|(id, _)| *id)
.collect();
Snapshot::new(txn_id, start_ts, active_set)
}
pub fn commit(&self, txn_id: TxnId) -> Option<Timestamp> {
let commit_ts = self.timestamp.fetch_add(1, Ordering::SeqCst);
{
let mut active = self.active_txns.write();
if let Some((_, status)) = active.get_mut(&txn_id) {
if *status != TxnStatus::Active {
return None; }
*status = TxnStatus::Committed(commit_ts);
} else {
return None; }
}
{
let mut log = self.commit_log.write();
log.insert(txn_id, commit_ts);
}
self.update_min_active();
Some(commit_ts)
}
pub fn abort(&self, txn_id: TxnId) -> bool {
let mut active = self.active_txns.write();
if let Some((_, status)) = active.get_mut(&txn_id) {
if *status != TxnStatus::Active {
return false;
}
*status = TxnStatus::Aborted;
self.update_min_active();
true
} else {
false
}
}
pub fn get_status(&self, txn_id: TxnId) -> Option<TxnStatus> {
let active = self.active_txns.read();
active.get(&txn_id).map(|(_, status)| *status)
}
pub fn get_commit_ts(&self, txn_id: TxnId) -> Option<Timestamp> {
let log = self.commit_log.read();
log.get(&txn_id).copied()
}
pub fn min_active_txn_id(&self) -> TxnId {
self.min_active_txn.load(Ordering::SeqCst)
}
pub fn current_timestamp(&self) -> Timestamp {
self.timestamp.load(Ordering::SeqCst)
}
fn update_min_active(&self) {
let active = self.active_txns.read();
let min = active
.iter()
.filter(|(_, (_, status))| *status == TxnStatus::Active)
.map(|(&id, _)| id)
.min()
.unwrap_or(u64::MAX);
self.min_active_txn.store(min, Ordering::SeqCst);
}
pub fn gc(&self, watermark: Timestamp) -> usize {
let mut log = self.commit_log.write();
let mut active = self.active_txns.write();
let old_len = log.len();
log.retain(|_, commit_ts| *commit_ts >= watermark);
active.retain(|txn_id, (_, status)| match status {
TxnStatus::Committed(ts) => *ts >= watermark,
TxnStatus::Aborted => {
log.get(txn_id).map(|t| *t >= watermark).unwrap_or(true)
}
TxnStatus::Active => true,
});
old_len - log.len()
}
}
#[allow(deprecated)]
impl Default for TransactionManager {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct VersionChain<V> {
versions: Vec<(VersionInfo, V)>,
}
impl<V: Clone> VersionChain<V> {
pub fn new() -> Self {
Self {
versions: Vec::new(),
}
}
pub fn add_version(&mut self, info: VersionInfo, value: V) {
self.versions.insert(0, (info, value));
}
#[allow(deprecated)]
pub fn get_visible(&self, snapshot: &Snapshot, txn_manager: &TransactionManager) -> Option<&V> {
for (info, value) in &self.versions {
if info.is_visible(snapshot, txn_manager) {
return Some(value);
}
}
None
}
pub fn delete(&mut self, xmax: TxnId, deleted_ts: Timestamp) -> bool {
if let Some((info, _)) = self.versions.first_mut()
&& info.xmax == 0
{
info.delete(xmax, deleted_ts);
return true;
}
false
}
pub fn gc(&mut self, min_visible_ts: Timestamp) -> usize {
let old_len = self.versions.len();
if self.versions.len() <= 1 {
return 0;
}
self.versions
.retain(|(info, _)| info.deleted_ts >= min_visible_ts);
if self.versions.is_empty() {
return old_len; }
old_len - self.versions.len()
}
pub fn version_count(&self) -> usize {
self.versions.len()
}
}
impl<V: Clone> Default for VersionChain<V> {
fn default() -> Self {
Self::new()
}
}
#[allow(deprecated)]
pub struct MvccStore<V> {
data: RwLock<HashMap<Vec<u8>, VersionChain<V>>>,
txn_manager: Arc<TransactionManager>,
}
#[allow(deprecated)]
impl<V: Clone + Send + Sync> MvccStore<V> {
pub fn new(txn_manager: Arc<TransactionManager>) -> Self {
Self {
data: RwLock::new(HashMap::new()),
txn_manager,
}
}
pub fn put(&self, key: &[u8], value: V, txn_id: TxnId) -> Timestamp {
let created_ts = self.txn_manager.current_timestamp();
let info = VersionInfo::new(txn_id, created_ts);
let mut data = self.data.write();
let chain = data.entry(key.to_vec()).or_default();
chain.add_version(info, value);
created_ts
}
pub fn get(&self, key: &[u8], snapshot: &Snapshot) -> Option<V> {
let data = self.data.read();
data.get(key)
.and_then(|chain| chain.get_visible(snapshot, &self.txn_manager))
.cloned()
}
pub fn delete(&self, key: &[u8], txn_id: TxnId) -> bool {
let deleted_ts = self.txn_manager.current_timestamp();
let mut data = self.data.write();
if let Some(chain) = data.get_mut(key) {
chain.delete(txn_id, deleted_ts)
} else {
false
}
}
pub fn gc(&self) -> usize {
let min_visible = self.txn_manager.min_active_txn_id();
let min_ts = self
.txn_manager
.get_commit_ts(min_visible)
.unwrap_or(self.txn_manager.current_timestamp());
let mut data = self.data.write();
let mut total_gc = 0;
for chain in data.values_mut() {
total_gc += chain.gc(min_ts);
}
total_gc
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_basic_visibility() {
let manager = TransactionManager::new();
let (txn1, _) = manager.begin();
let _snapshot1 = manager.acquire_snapshot(txn1);
manager.commit(txn1);
let (txn2, _) = manager.begin();
let snapshot2 = manager.acquire_snapshot(txn2);
assert!(snapshot2.is_txn_visible(txn1, manager.get_commit_ts(txn1)));
manager.commit(txn2);
}
#[test]
fn test_snapshot_isolation() {
let manager = Arc::new(TransactionManager::new());
let store = MvccStore::new(manager.clone());
let (txn1, _) = manager.begin();
store.put(b"key1", "value1".to_string(), txn1);
manager.commit(txn1);
let (txn2, _) = manager.begin();
let snapshot2 = manager.acquire_snapshot(txn2);
let (txn3, _) = manager.begin();
store.put(b"key1", "value2".to_string(), txn3);
manager.commit(txn3);
let value = store.get(b"key1", &snapshot2);
assert_eq!(value, Some("value1".to_string()));
manager.commit(txn2);
}
#[test]
fn test_version_chain() {
let manager = Arc::new(TransactionManager::new());
let mut chain: VersionChain<String> = VersionChain::new();
let (txn1, _) = manager.begin();
let info1 = VersionInfo::new(txn1, manager.current_timestamp());
chain.add_version(info1, "v1".to_string());
manager.commit(txn1);
let (txn2, _) = manager.begin();
let info2 = VersionInfo::new(txn2, manager.current_timestamp());
chain.add_version(info2, "v2".to_string());
manager.commit(txn2);
assert_eq!(chain.version_count(), 2);
let (txn3, _) = manager.begin();
let snapshot = manager.acquire_snapshot(txn3);
assert_eq!(
chain.get_visible(&snapshot, &manager),
Some(&"v2".to_string())
);
}
#[test]
#[ignore] fn test_abort_not_visible() {
let manager = Arc::new(TransactionManager::new());
let store = MvccStore::new(manager.clone());
let (txn1, _) = manager.begin();
store.put(b"key1", "value1".to_string(), txn1);
manager.abort(txn1);
let (txn2, _) = manager.begin();
let snapshot2 = manager.acquire_snapshot(txn2);
let value = store.get(b"key1", &snapshot2);
assert_eq!(value, None);
}
}
struct EpochTxnEntry {
txn_id: TxnId,
start_ts: Timestamp,
status: AtomicU64, }
impl EpochTxnEntry {
fn new(txn_id: TxnId, start_ts: Timestamp) -> Self {
Self {
txn_id,
start_ts,
status: AtomicU64::new(0), }
}
fn get_status(&self) -> TxnStatus {
let val = self.status.load(Ordering::Acquire);
if val == 0 {
TxnStatus::Active
} else if val == u64::MAX {
TxnStatus::Aborted
} else {
TxnStatus::Committed(val)
}
}
fn try_commit(&self, commit_ts: Timestamp) -> bool {
self.status
.compare_exchange(0, commit_ts, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
}
fn try_abort(&self) -> bool {
self.status
.compare_exchange(0, u64::MAX, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
}
}
struct EpochNode {
entry: EpochTxnEntry,
next: Atomic<EpochNode>,
}
pub struct LockFreeMvccManager {
next_txn_id: AtomicU64,
timestamp: AtomicU64,
active_head: Atomic<EpochNode>,
committed: crossbeam_skiplist::SkipMap<TxnId, Timestamp>,
min_visible_ts: AtomicU64,
active_count: AtomicU64,
}
impl LockFreeMvccManager {
pub fn new() -> Self {
Self {
next_txn_id: AtomicU64::new(1),
timestamp: AtomicU64::new(1),
active_head: Atomic::null(),
committed: crossbeam_skiplist::SkipMap::new(),
min_visible_ts: AtomicU64::new(0),
active_count: AtomicU64::new(0),
}
}
pub fn begin(&self) -> (TxnId, Timestamp) {
let txn_id = self.next_txn_id.fetch_add(1, Ordering::SeqCst);
let start_ts = self.timestamp.fetch_add(1, Ordering::SeqCst);
let guard = epoch::pin();
let mut new_node = Owned::new(EpochNode {
entry: EpochTxnEntry::new(txn_id, start_ts),
next: Atomic::null(),
});
loop {
let head = self.active_head.load(Ordering::Acquire, &guard);
new_node.next.store(head, Ordering::Release);
match self.active_head.compare_exchange(
head,
new_node,
Ordering::AcqRel,
Ordering::Acquire,
&guard,
) {
Ok(_) => {
self.active_count.fetch_add(1, Ordering::Relaxed);
break;
}
Err(e) => {
new_node = e.new;
}
}
}
(txn_id, start_ts)
}
pub fn commit(&self, txn_id: TxnId) -> Option<Timestamp> {
let commit_ts = self.timestamp.fetch_add(1, Ordering::SeqCst);
let guard = epoch::pin();
let mut current = self.active_head.load(Ordering::Acquire, &guard);
while let Some(node) = unsafe { current.as_ref() } {
if node.entry.txn_id == txn_id {
if node.entry.try_commit(commit_ts) {
self.committed.insert(txn_id, commit_ts);
self.active_count.fetch_sub(1, Ordering::Relaxed);
return Some(commit_ts);
} else {
return None; }
}
current = node.next.load(Ordering::Acquire, &guard);
}
None }
pub fn abort(&self, txn_id: TxnId) -> bool {
let guard = epoch::pin();
let mut current = self.active_head.load(Ordering::Acquire, &guard);
while let Some(node) = unsafe { current.as_ref() } {
if node.entry.txn_id == txn_id {
let success = node.entry.try_abort();
if success {
self.active_count.fetch_sub(1, Ordering::Relaxed);
}
return success;
}
current = node.next.load(Ordering::Acquire, &guard);
}
false
}
pub fn get_status(&self, txn_id: TxnId) -> Option<TxnStatus> {
if let Some(entry) = self.committed.get(&txn_id) {
return Some(TxnStatus::Committed(*entry.value()));
}
let guard = epoch::pin();
let mut current = self.active_head.load(Ordering::Acquire, &guard);
while let Some(node) = unsafe { current.as_ref() } {
if node.entry.txn_id == txn_id {
return Some(node.entry.get_status());
}
current = node.next.load(Ordering::Acquire, &guard);
}
None
}
pub fn acquire_snapshot(&self, txn_id: TxnId) -> Snapshot {
let guard = epoch::pin();
let start_ts = {
let mut ts = self.timestamp.load(Ordering::SeqCst);
let mut current = self.active_head.load(Ordering::Acquire, &guard);
while let Some(node) = unsafe { current.as_ref() } {
if node.entry.txn_id == txn_id {
ts = node.entry.start_ts;
break;
}
current = node.next.load(Ordering::Acquire, &guard);
}
ts
};
let mut active_set = HashSet::new();
let mut current = self.active_head.load(Ordering::Acquire, &guard);
while let Some(node) = unsafe { current.as_ref() } {
if node.entry.txn_id != txn_id && matches!(node.entry.get_status(), TxnStatus::Active) {
active_set.insert(node.entry.txn_id);
}
current = node.next.load(Ordering::Acquire, &guard);
}
Snapshot::new(txn_id, start_ts, active_set)
}
pub fn current_timestamp(&self) -> Timestamp {
self.timestamp.load(Ordering::SeqCst)
}
pub fn active_count(&self) -> u64 {
self.active_count.load(Ordering::Relaxed)
}
pub fn gc(&self, watermark: Timestamp) -> usize {
self.min_visible_ts.store(watermark, Ordering::Release);
let mut removed = 0;
let entries_to_remove: Vec<_> = self
.committed
.iter()
.filter(|entry| *entry.value() < watermark)
.map(|entry| *entry.key())
.collect();
for txn_id in entries_to_remove {
if self.committed.remove(&txn_id).is_some() {
removed += 1;
}
}
let guard = epoch::pin();
let _prev: Option<&EpochNode> = None;
let mut current = self.active_head.load(Ordering::Acquire, &guard);
while let Some(node) = unsafe { current.as_ref() } {
let status = node.entry.get_status();
match status {
TxnStatus::Committed(ts) if ts < watermark => {
}
TxnStatus::Aborted => {
}
_ => {}
}
current = node.next.load(Ordering::Acquire, &guard);
}
drop(guard);
epoch::pin().flush();
removed
}
}
impl Default for LockFreeMvccManager {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod lock_free_tests {
use super::*;
use std::thread;
#[test]
fn test_lock_free_basic() {
let manager = LockFreeMvccManager::new();
let (txn1, ts1) = manager.begin();
assert!(ts1 > 0);
assert_eq!(manager.get_status(txn1), Some(TxnStatus::Active));
let commit_ts = manager.commit(txn1).unwrap();
assert!(commit_ts > ts1);
assert_eq!(
manager.get_status(txn1),
Some(TxnStatus::Committed(commit_ts))
);
}
#[test]
fn test_lock_free_abort() {
let manager = LockFreeMvccManager::new();
let (txn1, _) = manager.begin();
assert!(manager.abort(txn1));
assert_eq!(manager.get_status(txn1), Some(TxnStatus::Aborted));
assert!(manager.commit(txn1).is_none());
}
#[test]
fn test_lock_free_concurrent() {
use std::sync::Arc;
let manager = Arc::new(LockFreeMvccManager::new());
let num_threads = 8;
let txns_per_thread = 100;
let handles: Vec<_> = (0..num_threads)
.map(|_| {
let m = Arc::clone(&manager);
thread::spawn(move || {
for _ in 0..txns_per_thread {
let (txn_id, _) = m.begin();
m.commit(txn_id);
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let total = num_threads * txns_per_thread;
assert!(manager.committed.len() >= total as usize);
}
#[test]
fn test_lock_free_snapshot() {
let manager = LockFreeMvccManager::new();
let (txn1, _) = manager.begin();
manager.commit(txn1);
let (txn2, _) = manager.begin();
let snapshot = manager.acquire_snapshot(txn2);
assert!(!snapshot.active_txns.contains(&txn1));
assert!(!snapshot.active_txns.contains(&txn2));
}
}