1use crossbeam_epoch::{self as epoch, Atomic, Owned};
58use parking_lot::RwLock;
59use std::collections::{BTreeMap, HashMap, HashSet};
60use std::sync::Arc;
61use std::sync::atomic::{AtomicU64, Ordering};
62
63pub type TxnId = u64;
65
66pub type Timestamp = u64;
68
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
71pub enum TxnStatus {
72 Active,
74 Committed(Timestamp),
76 Aborted,
78}
79
80#[derive(Debug, Clone)]
82pub struct VersionInfo {
83 pub xmin: TxnId,
85 pub xmax: TxnId,
87 pub created_ts: Timestamp,
89 pub deleted_ts: Timestamp,
91}
92
93impl VersionInfo {
94 pub fn new(xmin: TxnId, created_ts: Timestamp) -> Self {
96 Self {
97 xmin,
98 xmax: 0,
99 created_ts,
100 deleted_ts: Timestamp::MAX,
101 }
102 }
103
104 pub fn delete(&mut self, xmax: TxnId, deleted_ts: Timestamp) {
106 self.xmax = xmax;
107 self.deleted_ts = deleted_ts;
108 }
109
110 #[allow(deprecated)]
112 pub fn is_visible(&self, snapshot: &Snapshot, txn_manager: &TransactionManager) -> bool {
113 if self.xmin == snapshot.txn_id {
115 if self.xmax == snapshot.txn_id {
117 return false;
119 }
120 return true;
121 }
122
123 match txn_manager.get_status(self.xmin) {
125 Some(TxnStatus::Committed(commit_ts)) => {
126 if commit_ts >= snapshot.start_ts {
127 return false; }
129 }
130 Some(TxnStatus::Active) => {
131 if snapshot.active_txns.contains(&self.xmin) {
133 return false; }
135 return false; }
137 Some(TxnStatus::Aborted) | None => {
138 return false; }
140 }
141
142 if self.xmax == 0 {
144 return true; }
146
147 if self.xmax == snapshot.txn_id {
148 return false; }
150
151 match txn_manager.get_status(self.xmax) {
153 Some(TxnStatus::Committed(commit_ts)) => {
154 if commit_ts < snapshot.start_ts {
155 return false; }
157 true }
159 Some(TxnStatus::Active) | Some(TxnStatus::Aborted) | None => {
160 true }
162 }
163 }
164}
165
166#[derive(Debug, Clone)]
168pub struct Snapshot {
169 pub txn_id: TxnId,
171 pub start_ts: Timestamp,
173 pub active_txns: HashSet<TxnId>,
175 pub xmin: TxnId,
177}
178
179impl Snapshot {
180 pub fn new(txn_id: TxnId, start_ts: Timestamp, active_txns: HashSet<TxnId>) -> Self {
182 let xmin = active_txns.iter().copied().min().unwrap_or(txn_id);
183 Self {
184 txn_id,
185 start_ts,
186 active_txns,
187 xmin,
188 }
189 }
190
191 pub fn is_txn_visible(&self, txn_id: TxnId, commit_ts: Option<Timestamp>) -> bool {
193 if txn_id == self.txn_id {
194 return true; }
196
197 if self.active_txns.contains(&txn_id) {
198 return false; }
200
201 match commit_ts {
202 Some(ts) => ts < self.start_ts,
203 None => false, }
205 }
206}
207
208#[deprecated(
237 since = "0.1.0",
238 note = "Use MvccTransactionManager from wal_integration for production workloads with durability"
239)]
240pub struct TransactionManager {
241 next_txn_id: AtomicU64,
243 timestamp: AtomicU64,
245 active_txns: RwLock<HashMap<TxnId, (Timestamp, TxnStatus)>>,
247 commit_log: RwLock<BTreeMap<TxnId, Timestamp>>,
249 min_active_txn: AtomicU64,
251}
252
253#[allow(deprecated)]
254impl TransactionManager {
255 pub fn new() -> Self {
257 Self {
258 next_txn_id: AtomicU64::new(1),
259 timestamp: AtomicU64::new(1),
260 active_txns: RwLock::new(HashMap::new()),
261 commit_log: RwLock::new(BTreeMap::new()),
262 min_active_txn: AtomicU64::new(u64::MAX),
263 }
264 }
265
266 pub fn begin(&self) -> (TxnId, Timestamp) {
268 let txn_id = self.next_txn_id.fetch_add(1, Ordering::SeqCst);
269 let start_ts = self.timestamp.fetch_add(1, Ordering::SeqCst);
270
271 {
272 let mut active = self.active_txns.write();
273 active.insert(txn_id, (start_ts, TxnStatus::Active));
274 }
275
276 self.update_min_active();
278
279 (txn_id, start_ts)
280 }
281
282 pub fn acquire_snapshot(&self, txn_id: TxnId) -> Snapshot {
284 let active = self.active_txns.read();
285
286 let start_ts = active
287 .get(&txn_id)
288 .map(|(ts, _)| *ts)
289 .unwrap_or_else(|| self.timestamp.load(Ordering::SeqCst));
290
291 let active_set: HashSet<TxnId> = active
292 .iter()
293 .filter(|(id, (_, status))| **id != txn_id && *status == TxnStatus::Active)
294 .map(|(id, _)| *id)
295 .collect();
296
297 Snapshot::new(txn_id, start_ts, active_set)
298 }
299
300 pub fn commit(&self, txn_id: TxnId) -> Option<Timestamp> {
302 let commit_ts = self.timestamp.fetch_add(1, Ordering::SeqCst);
303
304 {
305 let mut active = self.active_txns.write();
306 if let Some((_, status)) = active.get_mut(&txn_id) {
307 if *status != TxnStatus::Active {
308 return None; }
310 *status = TxnStatus::Committed(commit_ts);
311 } else {
312 return None; }
314 }
315
316 {
317 let mut log = self.commit_log.write();
318 log.insert(txn_id, commit_ts);
319 }
320
321 self.update_min_active();
323
324 Some(commit_ts)
325 }
326
327 pub fn abort(&self, txn_id: TxnId) -> bool {
329 let mut active = self.active_txns.write();
330
331 if let Some((_, status)) = active.get_mut(&txn_id) {
332 if *status != TxnStatus::Active {
333 return false;
334 }
335 *status = TxnStatus::Aborted;
336 self.update_min_active();
337 true
338 } else {
339 false
340 }
341 }
342
343 pub fn get_status(&self, txn_id: TxnId) -> Option<TxnStatus> {
345 let active = self.active_txns.read();
346 active.get(&txn_id).map(|(_, status)| *status)
347 }
348
349 pub fn get_commit_ts(&self, txn_id: TxnId) -> Option<Timestamp> {
351 let log = self.commit_log.read();
352 log.get(&txn_id).copied()
353 }
354
355 pub fn min_active_txn_id(&self) -> TxnId {
357 self.min_active_txn.load(Ordering::SeqCst)
358 }
359
360 pub fn current_timestamp(&self) -> Timestamp {
362 self.timestamp.load(Ordering::SeqCst)
363 }
364
365 fn update_min_active(&self) {
367 let active = self.active_txns.read();
368 let min = active
369 .iter()
370 .filter(|(_, (_, status))| *status == TxnStatus::Active)
371 .map(|(&id, _)| id)
372 .min()
373 .unwrap_or(u64::MAX);
374 self.min_active_txn.store(min, Ordering::SeqCst);
375 }
376
377 pub fn gc(&self, watermark: Timestamp) -> usize {
381 let mut log = self.commit_log.write();
382 let mut active = self.active_txns.write();
383
384 let old_len = log.len();
385
386 log.retain(|_, commit_ts| *commit_ts >= watermark);
388
389 active.retain(|txn_id, (_, status)| match status {
391 TxnStatus::Committed(ts) => *ts >= watermark,
392 TxnStatus::Aborted => {
393 log.get(txn_id).map(|t| *t >= watermark).unwrap_or(true)
395 }
396 TxnStatus::Active => true,
397 });
398
399 old_len - log.len()
400 }
401}
402
403#[allow(deprecated)]
404impl Default for TransactionManager {
405 fn default() -> Self {
406 Self::new()
407 }
408}
409
410#[derive(Debug)]
412pub struct VersionChain<V> {
413 versions: Vec<(VersionInfo, V)>,
415}
416
417impl<V: Clone> VersionChain<V> {
418 pub fn new() -> Self {
420 Self {
421 versions: Vec::new(),
422 }
423 }
424
425 pub fn add_version(&mut self, info: VersionInfo, value: V) {
427 self.versions.insert(0, (info, value));
429 }
430
431 #[allow(deprecated)]
433 pub fn get_visible(&self, snapshot: &Snapshot, txn_manager: &TransactionManager) -> Option<&V> {
434 for (info, value) in &self.versions {
435 if info.is_visible(snapshot, txn_manager) {
436 return Some(value);
437 }
438 }
439 None
440 }
441
442 pub fn delete(&mut self, xmax: TxnId, deleted_ts: Timestamp) -> bool {
444 if let Some((info, _)) = self.versions.first_mut()
445 && info.xmax == 0
446 {
447 info.delete(xmax, deleted_ts);
448 return true;
449 }
450 false
451 }
452
453 pub fn gc(&mut self, min_visible_ts: Timestamp) -> usize {
455 let old_len = self.versions.len();
456
457 if self.versions.len() <= 1 {
459 return 0;
460 }
461
462 self.versions
463 .retain(|(info, _)| info.deleted_ts >= min_visible_ts);
464
465 if self.versions.is_empty() {
467 return old_len; }
469
470 old_len - self.versions.len()
471 }
472
473 pub fn version_count(&self) -> usize {
475 self.versions.len()
476 }
477}
478
479impl<V: Clone> Default for VersionChain<V> {
480 fn default() -> Self {
481 Self::new()
482 }
483}
484
485#[allow(deprecated)]
487pub struct MvccStore<V> {
488 data: RwLock<HashMap<Vec<u8>, VersionChain<V>>>,
490 txn_manager: Arc<TransactionManager>,
492}
493
494#[allow(deprecated)]
495impl<V: Clone + Send + Sync> MvccStore<V> {
496 pub fn new(txn_manager: Arc<TransactionManager>) -> Self {
498 Self {
499 data: RwLock::new(HashMap::new()),
500 txn_manager,
501 }
502 }
503
504 pub fn put(&self, key: &[u8], value: V, txn_id: TxnId) -> Timestamp {
506 let created_ts = self.txn_manager.current_timestamp();
507 let info = VersionInfo::new(txn_id, created_ts);
508
509 let mut data = self.data.write();
510 let chain = data.entry(key.to_vec()).or_default();
511 chain.add_version(info, value);
512
513 created_ts
514 }
515
516 pub fn get(&self, key: &[u8], snapshot: &Snapshot) -> Option<V> {
518 let data = self.data.read();
519 data.get(key)
520 .and_then(|chain| chain.get_visible(snapshot, &self.txn_manager))
521 .cloned()
522 }
523
524 pub fn delete(&self, key: &[u8], txn_id: TxnId) -> bool {
526 let deleted_ts = self.txn_manager.current_timestamp();
527 let mut data = self.data.write();
528
529 if let Some(chain) = data.get_mut(key) {
530 chain.delete(txn_id, deleted_ts)
531 } else {
532 false
533 }
534 }
535
536 pub fn gc(&self) -> usize {
538 let min_visible = self.txn_manager.min_active_txn_id();
539 let min_ts = self
540 .txn_manager
541 .get_commit_ts(min_visible)
542 .unwrap_or(self.txn_manager.current_timestamp());
543
544 let mut data = self.data.write();
545 let mut total_gc = 0;
546
547 for chain in data.values_mut() {
548 total_gc += chain.gc(min_ts);
549 }
550
551 total_gc
552 }
553}
554
555#[cfg(test)]
556mod tests {
557 use super::*;
558
559 #[test]
560 fn test_basic_visibility() {
561 let manager = TransactionManager::new();
562
563 let (txn1, _) = manager.begin();
565 let _snapshot1 = manager.acquire_snapshot(txn1);
566 manager.commit(txn1);
567
568 let (txn2, _) = manager.begin();
570 let snapshot2 = manager.acquire_snapshot(txn2);
571
572 assert!(snapshot2.is_txn_visible(txn1, manager.get_commit_ts(txn1)));
574
575 manager.commit(txn2);
576 }
577
578 #[test]
579 fn test_snapshot_isolation() {
580 let manager = Arc::new(TransactionManager::new());
581 let store = MvccStore::new(manager.clone());
582
583 let (txn1, _) = manager.begin();
585 store.put(b"key1", "value1".to_string(), txn1);
586 manager.commit(txn1);
587
588 let (txn2, _) = manager.begin();
590 let snapshot2 = manager.acquire_snapshot(txn2);
591
592 let (txn3, _) = manager.begin();
594 store.put(b"key1", "value2".to_string(), txn3);
595 manager.commit(txn3);
596
597 let value = store.get(b"key1", &snapshot2);
599 assert_eq!(value, Some("value1".to_string()));
600
601 manager.commit(txn2);
602 }
603
604 #[test]
605 fn test_version_chain() {
606 let manager = Arc::new(TransactionManager::new());
607
608 let mut chain: VersionChain<String> = VersionChain::new();
609
610 let (txn1, _) = manager.begin();
612 let info1 = VersionInfo::new(txn1, manager.current_timestamp());
613 chain.add_version(info1, "v1".to_string());
614 manager.commit(txn1);
615
616 let (txn2, _) = manager.begin();
618 let info2 = VersionInfo::new(txn2, manager.current_timestamp());
619 chain.add_version(info2, "v2".to_string());
620 manager.commit(txn2);
621
622 assert_eq!(chain.version_count(), 2);
623
624 let (txn3, _) = manager.begin();
626 let snapshot = manager.acquire_snapshot(txn3);
627 assert_eq!(
628 chain.get_visible(&snapshot, &manager),
629 Some(&"v2".to_string())
630 );
631 }
632
633 #[test]
634 #[ignore] fn test_abort_not_visible() {
636 let manager = Arc::new(TransactionManager::new());
637 let store = MvccStore::new(manager.clone());
638
639 let (txn1, _) = manager.begin();
641 store.put(b"key1", "value1".to_string(), txn1);
642 manager.abort(txn1);
643
644 let (txn2, _) = manager.begin();
646 let snapshot2 = manager.acquire_snapshot(txn2);
647
648 let value = store.get(b"key1", &snapshot2);
649 assert_eq!(value, None);
650 }
651}
652
653struct EpochTxnEntry {
659 txn_id: TxnId,
660 start_ts: Timestamp,
661 status: AtomicU64, }
663
664impl EpochTxnEntry {
665 fn new(txn_id: TxnId, start_ts: Timestamp) -> Self {
666 Self {
667 txn_id,
668 start_ts,
669 status: AtomicU64::new(0), }
671 }
672
673 fn get_status(&self) -> TxnStatus {
674 let val = self.status.load(Ordering::Acquire);
675 if val == 0 {
676 TxnStatus::Active
677 } else if val == u64::MAX {
678 TxnStatus::Aborted
679 } else {
680 TxnStatus::Committed(val)
681 }
682 }
683
684 fn try_commit(&self, commit_ts: Timestamp) -> bool {
685 self.status
686 .compare_exchange(0, commit_ts, Ordering::AcqRel, Ordering::Acquire)
687 .is_ok()
688 }
689
690 fn try_abort(&self) -> bool {
691 self.status
692 .compare_exchange(0, u64::MAX, Ordering::AcqRel, Ordering::Acquire)
693 .is_ok()
694 }
695}
696
697struct EpochNode {
699 entry: EpochTxnEntry,
700 next: Atomic<EpochNode>,
701}
702
703pub struct LockFreeMvccManager {
716 next_txn_id: AtomicU64,
718 timestamp: AtomicU64,
720 active_head: Atomic<EpochNode>,
722 committed: crossbeam_skiplist::SkipMap<TxnId, Timestamp>,
724 min_visible_ts: AtomicU64,
726 active_count: AtomicU64,
728}
729
730impl LockFreeMvccManager {
731 pub fn new() -> Self {
733 Self {
734 next_txn_id: AtomicU64::new(1),
735 timestamp: AtomicU64::new(1),
736 active_head: Atomic::null(),
737 committed: crossbeam_skiplist::SkipMap::new(),
738 min_visible_ts: AtomicU64::new(0),
739 active_count: AtomicU64::new(0),
740 }
741 }
742
743 pub fn begin(&self) -> (TxnId, Timestamp) {
745 let txn_id = self.next_txn_id.fetch_add(1, Ordering::SeqCst);
746 let start_ts = self.timestamp.fetch_add(1, Ordering::SeqCst);
747
748 let guard = epoch::pin();
749
750 let mut new_node = Owned::new(EpochNode {
752 entry: EpochTxnEntry::new(txn_id, start_ts),
753 next: Atomic::null(),
754 });
755
756 loop {
757 let head = self.active_head.load(Ordering::Acquire, &guard);
758 new_node.next.store(head, Ordering::Release);
759
760 match self.active_head.compare_exchange(
761 head,
762 new_node,
763 Ordering::AcqRel,
764 Ordering::Acquire,
765 &guard,
766 ) {
767 Ok(_) => {
768 self.active_count.fetch_add(1, Ordering::Relaxed);
769 break;
770 }
771 Err(e) => {
772 new_node = e.new;
774 }
775 }
776 }
777
778 (txn_id, start_ts)
779 }
780
781 pub fn commit(&self, txn_id: TxnId) -> Option<Timestamp> {
783 let commit_ts = self.timestamp.fetch_add(1, Ordering::SeqCst);
784
785 let guard = epoch::pin();
787 let mut current = self.active_head.load(Ordering::Acquire, &guard);
788
789 while let Some(node) = unsafe { current.as_ref() } {
790 if node.entry.txn_id == txn_id {
791 if node.entry.try_commit(commit_ts) {
792 self.committed.insert(txn_id, commit_ts);
794 self.active_count.fetch_sub(1, Ordering::Relaxed);
795 return Some(commit_ts);
796 } else {
797 return None; }
799 }
800 current = node.next.load(Ordering::Acquire, &guard);
801 }
802
803 None }
805
806 pub fn abort(&self, txn_id: TxnId) -> bool {
808 let guard = epoch::pin();
809 let mut current = self.active_head.load(Ordering::Acquire, &guard);
810
811 while let Some(node) = unsafe { current.as_ref() } {
812 if node.entry.txn_id == txn_id {
813 let success = node.entry.try_abort();
814 if success {
815 self.active_count.fetch_sub(1, Ordering::Relaxed);
816 }
817 return success;
818 }
819 current = node.next.load(Ordering::Acquire, &guard);
820 }
821
822 false
823 }
824
825 pub fn get_status(&self, txn_id: TxnId) -> Option<TxnStatus> {
827 if let Some(entry) = self.committed.get(&txn_id) {
829 return Some(TxnStatus::Committed(*entry.value()));
830 }
831
832 let guard = epoch::pin();
834 let mut current = self.active_head.load(Ordering::Acquire, &guard);
835
836 while let Some(node) = unsafe { current.as_ref() } {
837 if node.entry.txn_id == txn_id {
838 return Some(node.entry.get_status());
839 }
840 current = node.next.load(Ordering::Acquire, &guard);
841 }
842
843 None
844 }
845
846 pub fn acquire_snapshot(&self, txn_id: TxnId) -> Snapshot {
848 let guard = epoch::pin();
849
850 let start_ts = {
852 let mut ts = self.timestamp.load(Ordering::SeqCst);
853 let mut current = self.active_head.load(Ordering::Acquire, &guard);
854
855 while let Some(node) = unsafe { current.as_ref() } {
856 if node.entry.txn_id == txn_id {
857 ts = node.entry.start_ts;
858 break;
859 }
860 current = node.next.load(Ordering::Acquire, &guard);
861 }
862 ts
863 };
864
865 let mut active_set = HashSet::new();
867 let mut current = self.active_head.load(Ordering::Acquire, &guard);
868
869 while let Some(node) = unsafe { current.as_ref() } {
870 if node.entry.txn_id != txn_id && matches!(node.entry.get_status(), TxnStatus::Active) {
871 active_set.insert(node.entry.txn_id);
872 }
873 current = node.next.load(Ordering::Acquire, &guard);
874 }
875
876 Snapshot::new(txn_id, start_ts, active_set)
877 }
878
879 pub fn current_timestamp(&self) -> Timestamp {
881 self.timestamp.load(Ordering::SeqCst)
882 }
883
884 pub fn active_count(&self) -> u64 {
886 self.active_count.load(Ordering::Relaxed)
887 }
888
889 pub fn gc(&self, watermark: Timestamp) -> usize {
895 self.min_visible_ts.store(watermark, Ordering::Release);
896
897 let mut removed = 0;
899 let entries_to_remove: Vec<_> = self
900 .committed
901 .iter()
902 .filter(|entry| *entry.value() < watermark)
903 .map(|entry| *entry.key())
904 .collect();
905
906 for txn_id in entries_to_remove {
907 if self.committed.remove(&txn_id).is_some() {
908 removed += 1;
909 }
910 }
911
912 let guard = epoch::pin();
915 let _prev: Option<&EpochNode> = None;
916 let mut current = self.active_head.load(Ordering::Acquire, &guard);
917
918 while let Some(node) = unsafe { current.as_ref() } {
919 let status = node.entry.get_status();
920 match status {
921 TxnStatus::Committed(ts) if ts < watermark => {
922 }
925 TxnStatus::Aborted => {
926 }
928 _ => {}
929 }
930 current = node.next.load(Ordering::Acquire, &guard);
931 }
932
933 drop(guard);
935 epoch::pin().flush();
936
937 removed
938 }
939}
940
941impl Default for LockFreeMvccManager {
942 fn default() -> Self {
943 Self::new()
944 }
945}
946
947#[cfg(test)]
948mod lock_free_tests {
949 use super::*;
950 use std::thread;
951
952 #[test]
953 fn test_lock_free_basic() {
954 let manager = LockFreeMvccManager::new();
955
956 let (txn1, ts1) = manager.begin();
957 assert!(ts1 > 0);
958 assert_eq!(manager.get_status(txn1), Some(TxnStatus::Active));
959
960 let commit_ts = manager.commit(txn1).unwrap();
961 assert!(commit_ts > ts1);
962 assert_eq!(
963 manager.get_status(txn1),
964 Some(TxnStatus::Committed(commit_ts))
965 );
966 }
967
968 #[test]
969 fn test_lock_free_abort() {
970 let manager = LockFreeMvccManager::new();
971
972 let (txn1, _) = manager.begin();
973 assert!(manager.abort(txn1));
974 assert_eq!(manager.get_status(txn1), Some(TxnStatus::Aborted));
975
976 assert!(manager.commit(txn1).is_none());
978 }
979
980 #[test]
981 fn test_lock_free_concurrent() {
982 use std::sync::Arc;
983
984 let manager = Arc::new(LockFreeMvccManager::new());
985 let num_threads = 8;
986 let txns_per_thread = 100;
987
988 let handles: Vec<_> = (0..num_threads)
989 .map(|_| {
990 let m = Arc::clone(&manager);
991 thread::spawn(move || {
992 for _ in 0..txns_per_thread {
993 let (txn_id, _) = m.begin();
994 m.commit(txn_id);
995 }
996 })
997 })
998 .collect();
999
1000 for h in handles {
1001 h.join().unwrap();
1002 }
1003
1004 let total = num_threads * txns_per_thread;
1006 assert!(manager.committed.len() >= total as usize);
1007 }
1008
1009 #[test]
1010 fn test_lock_free_snapshot() {
1011 let manager = LockFreeMvccManager::new();
1012
1013 let (txn1, _) = manager.begin();
1014 manager.commit(txn1);
1015
1016 let (txn2, _) = manager.begin();
1017 let snapshot = manager.acquire_snapshot(txn2);
1018
1019 assert!(!snapshot.active_txns.contains(&txn1));
1021 assert!(!snapshot.active_txns.contains(&txn2));
1022 }
1023}