1use crossbeam_epoch::{self as epoch, Atomic, Owned};
61use parking_lot::RwLock;
62use std::collections::{BTreeMap, HashMap, HashSet};
63use std::sync::Arc;
64use std::sync::atomic::{AtomicU64, Ordering};
65
66pub type TxnId = u64;
68
69pub type Timestamp = u64;
71
72#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74pub enum TxnStatus {
75 Active,
77 Committed(Timestamp),
79 Aborted,
81}
82
83#[derive(Debug, Clone)]
85pub struct VersionInfo {
86 pub xmin: TxnId,
88 pub xmax: TxnId,
90 pub created_ts: Timestamp,
92 pub deleted_ts: Timestamp,
94}
95
96impl VersionInfo {
97 pub fn new(xmin: TxnId, created_ts: Timestamp) -> Self {
99 Self {
100 xmin,
101 xmax: 0,
102 created_ts,
103 deleted_ts: Timestamp::MAX,
104 }
105 }
106
107 pub fn delete(&mut self, xmax: TxnId, deleted_ts: Timestamp) {
109 self.xmax = xmax;
110 self.deleted_ts = deleted_ts;
111 }
112
113 #[allow(deprecated)]
115 pub fn is_visible(&self, snapshot: &Snapshot, txn_manager: &TransactionManager) -> bool {
116 if self.xmin == snapshot.txn_id {
118 if self.xmax == snapshot.txn_id {
120 return false;
122 }
123 return true;
124 }
125
126 match txn_manager.get_status(self.xmin) {
128 Some(TxnStatus::Committed(commit_ts)) => {
129 if commit_ts >= snapshot.start_ts {
130 return false; }
132 }
133 Some(TxnStatus::Active) => {
134 if snapshot.active_txns.contains(&self.xmin) {
136 return false; }
138 return false; }
140 Some(TxnStatus::Aborted) | None => {
141 return false; }
143 }
144
145 if self.xmax == 0 {
147 return true; }
149
150 if self.xmax == snapshot.txn_id {
151 return false; }
153
154 match txn_manager.get_status(self.xmax) {
156 Some(TxnStatus::Committed(commit_ts)) => {
157 if commit_ts < snapshot.start_ts {
158 return false; }
160 true }
162 Some(TxnStatus::Active) | Some(TxnStatus::Aborted) | None => {
163 true }
165 }
166 }
167}
168
169#[derive(Debug, Clone)]
171pub struct Snapshot {
172 pub txn_id: TxnId,
174 pub start_ts: Timestamp,
176 pub active_txns: HashSet<TxnId>,
178 pub xmin: TxnId,
180}
181
182impl Snapshot {
183 pub fn new(txn_id: TxnId, start_ts: Timestamp, active_txns: HashSet<TxnId>) -> Self {
185 let xmin = active_txns.iter().copied().min().unwrap_or(txn_id);
186 Self {
187 txn_id,
188 start_ts,
189 active_txns,
190 xmin,
191 }
192 }
193
194 pub fn is_txn_visible(&self, txn_id: TxnId, commit_ts: Option<Timestamp>) -> bool {
196 if txn_id == self.txn_id {
197 return true; }
199
200 if self.active_txns.contains(&txn_id) {
201 return false; }
203
204 match commit_ts {
205 Some(ts) => ts < self.start_ts,
206 None => false, }
208 }
209}
210
211#[deprecated(
240 since = "0.1.0",
241 note = "Use MvccTransactionManager from wal_integration for production workloads with durability"
242)]
243pub struct TransactionManager {
244 next_txn_id: AtomicU64,
246 timestamp: AtomicU64,
248 active_txns: RwLock<HashMap<TxnId, (Timestamp, TxnStatus)>>,
250 commit_log: RwLock<BTreeMap<TxnId, Timestamp>>,
252 min_active_txn: AtomicU64,
254}
255
256#[allow(deprecated)]
257impl TransactionManager {
258 pub fn new() -> Self {
260 Self {
261 next_txn_id: AtomicU64::new(1),
262 timestamp: AtomicU64::new(1),
263 active_txns: RwLock::new(HashMap::new()),
264 commit_log: RwLock::new(BTreeMap::new()),
265 min_active_txn: AtomicU64::new(u64::MAX),
266 }
267 }
268
269 pub fn begin(&self) -> (TxnId, Timestamp) {
271 let txn_id = self.next_txn_id.fetch_add(1, Ordering::SeqCst);
272 let start_ts = self.timestamp.fetch_add(1, Ordering::SeqCst);
273
274 {
275 let mut active = self.active_txns.write();
276 active.insert(txn_id, (start_ts, TxnStatus::Active));
277 }
278
279 self.update_min_active();
281
282 (txn_id, start_ts)
283 }
284
285 pub fn acquire_snapshot(&self, txn_id: TxnId) -> Snapshot {
287 let active = self.active_txns.read();
288
289 let start_ts = active
290 .get(&txn_id)
291 .map(|(ts, _)| *ts)
292 .unwrap_or_else(|| self.timestamp.load(Ordering::SeqCst));
293
294 let active_set: HashSet<TxnId> = active
295 .iter()
296 .filter(|(id, (_, status))| **id != txn_id && *status == TxnStatus::Active)
297 .map(|(id, _)| *id)
298 .collect();
299
300 Snapshot::new(txn_id, start_ts, active_set)
301 }
302
303 pub fn commit(&self, txn_id: TxnId) -> Option<Timestamp> {
305 let commit_ts = self.timestamp.fetch_add(1, Ordering::SeqCst);
306
307 {
308 let mut active = self.active_txns.write();
309 if let Some((_, status)) = active.get_mut(&txn_id) {
310 if *status != TxnStatus::Active {
311 return None; }
313 *status = TxnStatus::Committed(commit_ts);
314 } else {
315 return None; }
317 }
318
319 {
320 let mut log = self.commit_log.write();
321 log.insert(txn_id, commit_ts);
322 }
323
324 self.update_min_active();
326
327 Some(commit_ts)
328 }
329
330 pub fn abort(&self, txn_id: TxnId) -> bool {
332 let mut active = self.active_txns.write();
333
334 if let Some((_, status)) = active.get_mut(&txn_id) {
335 if *status != TxnStatus::Active {
336 return false;
337 }
338 *status = TxnStatus::Aborted;
339 self.update_min_active();
340 true
341 } else {
342 false
343 }
344 }
345
346 pub fn get_status(&self, txn_id: TxnId) -> Option<TxnStatus> {
348 let active = self.active_txns.read();
349 active.get(&txn_id).map(|(_, status)| *status)
350 }
351
352 pub fn get_commit_ts(&self, txn_id: TxnId) -> Option<Timestamp> {
354 let log = self.commit_log.read();
355 log.get(&txn_id).copied()
356 }
357
358 pub fn min_active_txn_id(&self) -> TxnId {
360 self.min_active_txn.load(Ordering::SeqCst)
361 }
362
363 pub fn current_timestamp(&self) -> Timestamp {
365 self.timestamp.load(Ordering::SeqCst)
366 }
367
368 fn update_min_active(&self) {
370 let active = self.active_txns.read();
371 let min = active
372 .iter()
373 .filter(|(_, (_, status))| *status == TxnStatus::Active)
374 .map(|(&id, _)| id)
375 .min()
376 .unwrap_or(u64::MAX);
377 self.min_active_txn.store(min, Ordering::SeqCst);
378 }
379
380 pub fn gc(&self, watermark: Timestamp) -> usize {
384 let mut log = self.commit_log.write();
385 let mut active = self.active_txns.write();
386
387 let old_len = log.len();
388
389 log.retain(|_, commit_ts| *commit_ts >= watermark);
391
392 active.retain(|txn_id, (_, status)| match status {
394 TxnStatus::Committed(ts) => *ts >= watermark,
395 TxnStatus::Aborted => {
396 log.get(txn_id).map(|t| *t >= watermark).unwrap_or(true)
398 }
399 TxnStatus::Active => true,
400 });
401
402 old_len - log.len()
403 }
404}
405
406#[allow(deprecated)]
407impl Default for TransactionManager {
408 fn default() -> Self {
409 Self::new()
410 }
411}
412
413#[derive(Debug)]
415pub struct VersionChain<V> {
416 versions: Vec<(VersionInfo, V)>,
418}
419
420impl<V: Clone> VersionChain<V> {
421 pub fn new() -> Self {
423 Self {
424 versions: Vec::new(),
425 }
426 }
427
428 pub fn add_version(&mut self, info: VersionInfo, value: V) {
430 self.versions.insert(0, (info, value));
432 }
433
434 #[allow(deprecated)]
436 pub fn get_visible(&self, snapshot: &Snapshot, txn_manager: &TransactionManager) -> Option<&V> {
437 for (info, value) in &self.versions {
438 if info.is_visible(snapshot, txn_manager) {
439 return Some(value);
440 }
441 }
442 None
443 }
444
445 pub fn delete(&mut self, xmax: TxnId, deleted_ts: Timestamp) -> bool {
447 if let Some((info, _)) = self.versions.first_mut()
448 && info.xmax == 0
449 {
450 info.delete(xmax, deleted_ts);
451 return true;
452 }
453 false
454 }
455
456 pub fn gc(&mut self, min_visible_ts: Timestamp) -> usize {
458 let old_len = self.versions.len();
459
460 if self.versions.len() <= 1 {
462 return 0;
463 }
464
465 self.versions
466 .retain(|(info, _)| info.deleted_ts >= min_visible_ts);
467
468 if self.versions.is_empty() {
470 return old_len; }
472
473 old_len - self.versions.len()
474 }
475
476 pub fn version_count(&self) -> usize {
478 self.versions.len()
479 }
480}
481
482impl<V: Clone> Default for VersionChain<V> {
483 fn default() -> Self {
484 Self::new()
485 }
486}
487
488#[allow(deprecated)]
490pub struct MvccStore<V> {
491 data: RwLock<HashMap<Vec<u8>, VersionChain<V>>>,
493 txn_manager: Arc<TransactionManager>,
495}
496
497#[allow(deprecated)]
498impl<V: Clone + Send + Sync> MvccStore<V> {
499 pub fn new(txn_manager: Arc<TransactionManager>) -> Self {
501 Self {
502 data: RwLock::new(HashMap::new()),
503 txn_manager,
504 }
505 }
506
507 pub fn put(&self, key: &[u8], value: V, txn_id: TxnId) -> Timestamp {
509 let created_ts = self.txn_manager.current_timestamp();
510 let info = VersionInfo::new(txn_id, created_ts);
511
512 let mut data = self.data.write();
513 let chain = data.entry(key.to_vec()).or_default();
514 chain.add_version(info, value);
515
516 created_ts
517 }
518
519 pub fn get(&self, key: &[u8], snapshot: &Snapshot) -> Option<V> {
521 let data = self.data.read();
522 data.get(key)
523 .and_then(|chain| chain.get_visible(snapshot, &self.txn_manager))
524 .cloned()
525 }
526
527 pub fn delete(&self, key: &[u8], txn_id: TxnId) -> bool {
529 let deleted_ts = self.txn_manager.current_timestamp();
530 let mut data = self.data.write();
531
532 if let Some(chain) = data.get_mut(key) {
533 chain.delete(txn_id, deleted_ts)
534 } else {
535 false
536 }
537 }
538
539 pub fn gc(&self) -> usize {
541 let min_visible = self.txn_manager.min_active_txn_id();
542 let min_ts = self
543 .txn_manager
544 .get_commit_ts(min_visible)
545 .unwrap_or(self.txn_manager.current_timestamp());
546
547 let mut data = self.data.write();
548 let mut total_gc = 0;
549
550 for chain in data.values_mut() {
551 total_gc += chain.gc(min_ts);
552 }
553
554 total_gc
555 }
556}
557
558#[cfg(test)]
559mod tests {
560 use super::*;
561
562 #[test]
563 fn test_basic_visibility() {
564 let manager = TransactionManager::new();
565
566 let (txn1, _) = manager.begin();
568 let _snapshot1 = manager.acquire_snapshot(txn1);
569 manager.commit(txn1);
570
571 let (txn2, _) = manager.begin();
573 let snapshot2 = manager.acquire_snapshot(txn2);
574
575 assert!(snapshot2.is_txn_visible(txn1, manager.get_commit_ts(txn1)));
577
578 manager.commit(txn2);
579 }
580
581 #[test]
582 fn test_snapshot_isolation() {
583 let manager = Arc::new(TransactionManager::new());
584 let store = MvccStore::new(manager.clone());
585
586 let (txn1, _) = manager.begin();
588 store.put(b"key1", "value1".to_string(), txn1);
589 manager.commit(txn1);
590
591 let (txn2, _) = manager.begin();
593 let snapshot2 = manager.acquire_snapshot(txn2);
594
595 let (txn3, _) = manager.begin();
597 store.put(b"key1", "value2".to_string(), txn3);
598 manager.commit(txn3);
599
600 let value = store.get(b"key1", &snapshot2);
602 assert_eq!(value, Some("value1".to_string()));
603
604 manager.commit(txn2);
605 }
606
607 #[test]
608 fn test_version_chain() {
609 let manager = Arc::new(TransactionManager::new());
610
611 let mut chain: VersionChain<String> = VersionChain::new();
612
613 let (txn1, _) = manager.begin();
615 let info1 = VersionInfo::new(txn1, manager.current_timestamp());
616 chain.add_version(info1, "v1".to_string());
617 manager.commit(txn1);
618
619 let (txn2, _) = manager.begin();
621 let info2 = VersionInfo::new(txn2, manager.current_timestamp());
622 chain.add_version(info2, "v2".to_string());
623 manager.commit(txn2);
624
625 assert_eq!(chain.version_count(), 2);
626
627 let (txn3, _) = manager.begin();
629 let snapshot = manager.acquire_snapshot(txn3);
630 assert_eq!(
631 chain.get_visible(&snapshot, &manager),
632 Some(&"v2".to_string())
633 );
634 }
635
636 #[test]
637 #[ignore] fn test_abort_not_visible() {
639 let manager = Arc::new(TransactionManager::new());
640 let store = MvccStore::new(manager.clone());
641
642 let (txn1, _) = manager.begin();
644 store.put(b"key1", "value1".to_string(), txn1);
645 manager.abort(txn1);
646
647 let (txn2, _) = manager.begin();
649 let snapshot2 = manager.acquire_snapshot(txn2);
650
651 let value = store.get(b"key1", &snapshot2);
652 assert_eq!(value, None);
653 }
654}
655
656struct EpochTxnEntry {
662 txn_id: TxnId,
663 start_ts: Timestamp,
664 status: AtomicU64, }
666
667impl EpochTxnEntry {
668 fn new(txn_id: TxnId, start_ts: Timestamp) -> Self {
669 Self {
670 txn_id,
671 start_ts,
672 status: AtomicU64::new(0), }
674 }
675
676 fn get_status(&self) -> TxnStatus {
677 let val = self.status.load(Ordering::Acquire);
678 if val == 0 {
679 TxnStatus::Active
680 } else if val == u64::MAX {
681 TxnStatus::Aborted
682 } else {
683 TxnStatus::Committed(val)
684 }
685 }
686
687 fn try_commit(&self, commit_ts: Timestamp) -> bool {
688 self.status
689 .compare_exchange(0, commit_ts, Ordering::AcqRel, Ordering::Acquire)
690 .is_ok()
691 }
692
693 fn try_abort(&self) -> bool {
694 self.status
695 .compare_exchange(0, u64::MAX, Ordering::AcqRel, Ordering::Acquire)
696 .is_ok()
697 }
698}
699
700struct EpochNode {
702 entry: EpochTxnEntry,
703 next: Atomic<EpochNode>,
704}
705
706pub struct LockFreeMvccManager {
719 next_txn_id: AtomicU64,
721 timestamp: AtomicU64,
723 active_head: Atomic<EpochNode>,
725 committed: crossbeam_skiplist::SkipMap<TxnId, Timestamp>,
727 min_visible_ts: AtomicU64,
729 active_count: AtomicU64,
731}
732
733impl LockFreeMvccManager {
734 pub fn new() -> Self {
736 Self {
737 next_txn_id: AtomicU64::new(1),
738 timestamp: AtomicU64::new(1),
739 active_head: Atomic::null(),
740 committed: crossbeam_skiplist::SkipMap::new(),
741 min_visible_ts: AtomicU64::new(0),
742 active_count: AtomicU64::new(0),
743 }
744 }
745
746 pub fn begin(&self) -> (TxnId, Timestamp) {
748 let txn_id = self.next_txn_id.fetch_add(1, Ordering::SeqCst);
749 let start_ts = self.timestamp.fetch_add(1, Ordering::SeqCst);
750
751 let guard = epoch::pin();
752
753 let mut new_node = Owned::new(EpochNode {
755 entry: EpochTxnEntry::new(txn_id, start_ts),
756 next: Atomic::null(),
757 });
758
759 loop {
760 let head = self.active_head.load(Ordering::Acquire, &guard);
761 new_node.next.store(head, Ordering::Release);
762
763 match self.active_head.compare_exchange(
764 head,
765 new_node,
766 Ordering::AcqRel,
767 Ordering::Acquire,
768 &guard,
769 ) {
770 Ok(_) => {
771 self.active_count.fetch_add(1, Ordering::Relaxed);
772 break;
773 }
774 Err(e) => {
775 new_node = e.new;
777 }
778 }
779 }
780
781 (txn_id, start_ts)
782 }
783
784 pub fn commit(&self, txn_id: TxnId) -> Option<Timestamp> {
786 let commit_ts = self.timestamp.fetch_add(1, Ordering::SeqCst);
787
788 let guard = epoch::pin();
790 let mut current = self.active_head.load(Ordering::Acquire, &guard);
791
792 while let Some(node) = unsafe { current.as_ref() } {
793 if node.entry.txn_id == txn_id {
794 if node.entry.try_commit(commit_ts) {
795 self.committed.insert(txn_id, commit_ts);
797 self.active_count.fetch_sub(1, Ordering::Relaxed);
798 return Some(commit_ts);
799 } else {
800 return None; }
802 }
803 current = node.next.load(Ordering::Acquire, &guard);
804 }
805
806 None }
808
809 pub fn abort(&self, txn_id: TxnId) -> bool {
811 let guard = epoch::pin();
812 let mut current = self.active_head.load(Ordering::Acquire, &guard);
813
814 while let Some(node) = unsafe { current.as_ref() } {
815 if node.entry.txn_id == txn_id {
816 let success = node.entry.try_abort();
817 if success {
818 self.active_count.fetch_sub(1, Ordering::Relaxed);
819 }
820 return success;
821 }
822 current = node.next.load(Ordering::Acquire, &guard);
823 }
824
825 false
826 }
827
828 pub fn get_status(&self, txn_id: TxnId) -> Option<TxnStatus> {
830 if let Some(entry) = self.committed.get(&txn_id) {
832 return Some(TxnStatus::Committed(*entry.value()));
833 }
834
835 let guard = epoch::pin();
837 let mut current = self.active_head.load(Ordering::Acquire, &guard);
838
839 while let Some(node) = unsafe { current.as_ref() } {
840 if node.entry.txn_id == txn_id {
841 return Some(node.entry.get_status());
842 }
843 current = node.next.load(Ordering::Acquire, &guard);
844 }
845
846 None
847 }
848
849 pub fn acquire_snapshot(&self, txn_id: TxnId) -> Snapshot {
851 let guard = epoch::pin();
852
853 let start_ts = {
855 let mut ts = self.timestamp.load(Ordering::SeqCst);
856 let mut current = self.active_head.load(Ordering::Acquire, &guard);
857
858 while let Some(node) = unsafe { current.as_ref() } {
859 if node.entry.txn_id == txn_id {
860 ts = node.entry.start_ts;
861 break;
862 }
863 current = node.next.load(Ordering::Acquire, &guard);
864 }
865 ts
866 };
867
868 let mut active_set = HashSet::new();
870 let mut current = self.active_head.load(Ordering::Acquire, &guard);
871
872 while let Some(node) = unsafe { current.as_ref() } {
873 if node.entry.txn_id != txn_id && matches!(node.entry.get_status(), TxnStatus::Active) {
874 active_set.insert(node.entry.txn_id);
875 }
876 current = node.next.load(Ordering::Acquire, &guard);
877 }
878
879 Snapshot::new(txn_id, start_ts, active_set)
880 }
881
882 pub fn current_timestamp(&self) -> Timestamp {
884 self.timestamp.load(Ordering::SeqCst)
885 }
886
887 pub fn active_count(&self) -> u64 {
889 self.active_count.load(Ordering::Relaxed)
890 }
891
892 pub fn gc(&self, watermark: Timestamp) -> usize {
898 self.min_visible_ts.store(watermark, Ordering::Release);
899
900 let mut removed = 0;
902 let entries_to_remove: Vec<_> = self
903 .committed
904 .iter()
905 .filter(|entry| *entry.value() < watermark)
906 .map(|entry| *entry.key())
907 .collect();
908
909 for txn_id in entries_to_remove {
910 if self.committed.remove(&txn_id).is_some() {
911 removed += 1;
912 }
913 }
914
915 let guard = epoch::pin();
918 let _prev: Option<&EpochNode> = None;
919 let mut current = self.active_head.load(Ordering::Acquire, &guard);
920
921 while let Some(node) = unsafe { current.as_ref() } {
922 let status = node.entry.get_status();
923 match status {
924 TxnStatus::Committed(ts) if ts < watermark => {
925 }
928 TxnStatus::Aborted => {
929 }
931 _ => {}
932 }
933 current = node.next.load(Ordering::Acquire, &guard);
934 }
935
936 drop(guard);
938 epoch::pin().flush();
939
940 removed
941 }
942}
943
944impl Default for LockFreeMvccManager {
945 fn default() -> Self {
946 Self::new()
947 }
948}
949
950#[cfg(test)]
951mod lock_free_tests {
952 use super::*;
953 use std::thread;
954
955 #[test]
956 fn test_lock_free_basic() {
957 let manager = LockFreeMvccManager::new();
958
959 let (txn1, ts1) = manager.begin();
960 assert!(ts1 > 0);
961 assert_eq!(manager.get_status(txn1), Some(TxnStatus::Active));
962
963 let commit_ts = manager.commit(txn1).unwrap();
964 assert!(commit_ts > ts1);
965 assert_eq!(
966 manager.get_status(txn1),
967 Some(TxnStatus::Committed(commit_ts))
968 );
969 }
970
971 #[test]
972 fn test_lock_free_abort() {
973 let manager = LockFreeMvccManager::new();
974
975 let (txn1, _) = manager.begin();
976 assert!(manager.abort(txn1));
977 assert_eq!(manager.get_status(txn1), Some(TxnStatus::Aborted));
978
979 assert!(manager.commit(txn1).is_none());
981 }
982
983 #[test]
984 fn test_lock_free_concurrent() {
985 use std::sync::Arc;
986
987 let manager = Arc::new(LockFreeMvccManager::new());
988 let num_threads = 8;
989 let txns_per_thread = 100;
990
991 let handles: Vec<_> = (0..num_threads)
992 .map(|_| {
993 let m = Arc::clone(&manager);
994 thread::spawn(move || {
995 for _ in 0..txns_per_thread {
996 let (txn_id, _) = m.begin();
997 m.commit(txn_id);
998 }
999 })
1000 })
1001 .collect();
1002
1003 for h in handles {
1004 h.join().unwrap();
1005 }
1006
1007 let total = num_threads * txns_per_thread;
1009 assert!(manager.committed.len() >= total as usize);
1010 }
1011
1012 #[test]
1013 fn test_lock_free_snapshot() {
1014 let manager = LockFreeMvccManager::new();
1015
1016 let (txn1, _) = manager.begin();
1017 manager.commit(txn1);
1018
1019 let (txn2, _) = manager.begin();
1020 let snapshot = manager.acquire_snapshot(txn2);
1021
1022 assert!(!snapshot.active_txns.contains(&txn1));
1024 assert!(!snapshot.active_txns.contains(&txn2));
1025 }
1026}