1use std::collections::HashSet;
4use std::sync::atomic::{AtomicU64, Ordering};
5
6use grafeo_common::types::{EdgeId, EpochId, NodeId, TransactionId};
7use grafeo_common::utils::error::{Error, Result, TransactionError};
8use grafeo_common::utils::hash::FxHashMap;
9use parking_lot::RwLock;
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum TransactionState {
14 Active,
16 Committed,
18 Aborted,
20}
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
41pub enum IsolationLevel {
42 ReadCommitted,
47
48 #[default]
54 SnapshotIsolation,
55
56 Serializable,
62}
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
66pub enum EntityId {
67 Node(NodeId),
69 Edge(EdgeId),
71}
72
73impl From<NodeId> for EntityId {
74 fn from(id: NodeId) -> Self {
75 Self::Node(id)
76 }
77}
78
79impl From<EdgeId> for EntityId {
80 fn from(id: EdgeId) -> Self {
81 Self::Edge(id)
82 }
83}
84
85pub struct TransactionInfo {
87 pub state: TransactionState,
89 pub isolation_level: IsolationLevel,
91 pub start_epoch: EpochId,
93 pub write_set: HashSet<EntityId>,
95 pub read_set: HashSet<EntityId>,
97}
98
99impl TransactionInfo {
100 fn new(start_epoch: EpochId, isolation_level: IsolationLevel) -> Self {
102 Self {
103 state: TransactionState::Active,
104 isolation_level,
105 start_epoch,
106 write_set: HashSet::new(),
107 read_set: HashSet::new(),
108 }
109 }
110}
111
112pub struct TransactionManager {
114 next_transaction_id: AtomicU64,
116 current_epoch: AtomicU64,
118 transactions: RwLock<FxHashMap<TransactionId, TransactionInfo>>,
120 committed_epochs: RwLock<FxHashMap<TransactionId, EpochId>>,
123}
124
125impl TransactionManager {
126 #[must_use]
128 pub fn new() -> Self {
129 Self {
130 next_transaction_id: AtomicU64::new(2),
133 current_epoch: AtomicU64::new(0),
134 transactions: RwLock::new(FxHashMap::default()),
135 committed_epochs: RwLock::new(FxHashMap::default()),
136 }
137 }
138
139 pub fn begin(&self) -> TransactionId {
141 self.begin_with_isolation(IsolationLevel::default())
142 }
143
144 pub fn begin_with_isolation(&self, isolation_level: IsolationLevel) -> TransactionId {
146 let transaction_id =
147 TransactionId::new(self.next_transaction_id.fetch_add(1, Ordering::Relaxed));
148 let epoch = EpochId::new(self.current_epoch.load(Ordering::Acquire));
149
150 let info = TransactionInfo::new(epoch, isolation_level);
151 self.transactions.write().insert(transaction_id, info);
152 transaction_id
153 }
154
155 pub fn isolation_level(&self, transaction_id: TransactionId) -> Option<IsolationLevel> {
157 self.transactions
158 .read()
159 .get(&transaction_id)
160 .map(|info| info.isolation_level)
161 }
162
163 pub fn record_write(
169 &self,
170 transaction_id: TransactionId,
171 entity: impl Into<EntityId>,
172 ) -> Result<()> {
173 let mut txns = self.transactions.write();
174 let info = txns.get_mut(&transaction_id).ok_or_else(|| {
175 Error::Transaction(TransactionError::InvalidState(
176 "Transaction not found".to_string(),
177 ))
178 })?;
179
180 if info.state != TransactionState::Active {
181 return Err(Error::Transaction(TransactionError::InvalidState(
182 "Transaction is not active".to_string(),
183 )));
184 }
185
186 info.write_set.insert(entity.into());
187 Ok(())
188 }
189
190 pub fn record_read(
196 &self,
197 transaction_id: TransactionId,
198 entity: impl Into<EntityId>,
199 ) -> Result<()> {
200 let mut txns = self.transactions.write();
201 let info = txns.get_mut(&transaction_id).ok_or_else(|| {
202 Error::Transaction(TransactionError::InvalidState(
203 "Transaction not found".to_string(),
204 ))
205 })?;
206
207 if info.state != TransactionState::Active {
208 return Err(Error::Transaction(TransactionError::InvalidState(
209 "Transaction is not active".to_string(),
210 )));
211 }
212
213 info.read_set.insert(entity.into());
214 Ok(())
215 }
216
217 pub fn commit(&self, transaction_id: TransactionId) -> Result<EpochId> {
235 let mut txns = self.transactions.write();
236 let committed = self.committed_epochs.read();
237
238 let (our_isolation, our_start_epoch, our_write_set, our_read_set) = {
240 let info = txns.get(&transaction_id).ok_or_else(|| {
241 Error::Transaction(TransactionError::InvalidState(
242 "Transaction not found".to_string(),
243 ))
244 })?;
245
246 if info.state != TransactionState::Active {
247 return Err(Error::Transaction(TransactionError::InvalidState(
248 "Transaction is not active".to_string(),
249 )));
250 }
251
252 (
253 info.isolation_level,
254 info.start_epoch,
255 info.write_set.clone(),
256 info.read_set.clone(),
257 )
258 };
259
260 for (other_tx, commit_epoch) in committed.iter() {
265 if *other_tx != transaction_id && commit_epoch.as_u64() > our_start_epoch.as_u64() {
266 if let Some(other_info) = txns.get(other_tx) {
268 for entity in &our_write_set {
269 if other_info.write_set.contains(entity) {
270 return Err(Error::Transaction(TransactionError::WriteConflict(
271 format!("Write-write conflict on entity {:?}", entity),
272 )));
273 }
274 }
275 }
276 }
277 }
278
279 if our_isolation == IsolationLevel::Serializable && !our_read_set.is_empty() {
284 for (other_tx, commit_epoch) in committed.iter() {
285 if *other_tx != transaction_id && commit_epoch.as_u64() > our_start_epoch.as_u64() {
286 if let Some(other_info) = txns.get(other_tx) {
288 for entity in &our_read_set {
289 if other_info.write_set.contains(entity) {
290 return Err(Error::Transaction(
291 TransactionError::SerializationFailure(format!(
292 "Read-write conflict on entity {:?}: \
293 another transaction modified data we read",
294 entity
295 )),
296 ));
297 }
298 }
299 }
300 }
301 }
302
303 for (other_tx, other_info) in txns.iter() {
306 if *other_tx == transaction_id {
307 continue;
308 }
309 if other_info.state == TransactionState::Committed {
310 for entity in &our_read_set {
312 if other_info.write_set.contains(entity) {
313 if let Some(commit_epoch) = committed.get(other_tx)
315 && commit_epoch.as_u64() > our_start_epoch.as_u64()
316 {
317 return Err(Error::Transaction(
318 TransactionError::SerializationFailure(format!(
319 "Read-write conflict on entity {:?}: \
320 another transaction modified data we read",
321 entity
322 )),
323 ));
324 }
325 }
326 }
327 }
328 }
329 }
330
331 let commit_epoch = EpochId::new(self.current_epoch.fetch_add(1, Ordering::SeqCst) + 1);
334
335 if let Some(info) = txns.get_mut(&transaction_id) {
337 info.state = TransactionState::Committed;
338 }
339
340 drop(committed);
342 self.committed_epochs
343 .write()
344 .insert(transaction_id, commit_epoch);
345
346 Ok(commit_epoch)
347 }
348
349 pub fn abort(&self, transaction_id: TransactionId) -> Result<()> {
355 let mut txns = self.transactions.write();
356
357 let info = txns.get_mut(&transaction_id).ok_or_else(|| {
358 Error::Transaction(TransactionError::InvalidState(
359 "Transaction not found".to_string(),
360 ))
361 })?;
362
363 if info.state != TransactionState::Active {
364 return Err(Error::Transaction(TransactionError::InvalidState(
365 "Transaction is not active".to_string(),
366 )));
367 }
368
369 info.state = TransactionState::Aborted;
370 Ok(())
371 }
372
373 pub fn get_write_set(&self, transaction_id: TransactionId) -> Result<HashSet<EntityId>> {
378 let txns = self.transactions.read();
379 let info = txns.get(&transaction_id).ok_or_else(|| {
380 Error::Transaction(TransactionError::InvalidState(
381 "Transaction not found".to_string(),
382 ))
383 })?;
384 Ok(info.write_set.clone())
385 }
386
387 pub fn reset_write_set(
393 &self,
394 transaction_id: TransactionId,
395 write_set: HashSet<EntityId>,
396 ) -> Result<()> {
397 let mut txns = self.transactions.write();
398 let info = txns.get_mut(&transaction_id).ok_or_else(|| {
399 Error::Transaction(TransactionError::InvalidState(
400 "Transaction not found".to_string(),
401 ))
402 })?;
403 info.write_set = write_set;
404 Ok(())
405 }
406
407 pub fn abort_all_active(&self) {
411 let mut txns = self.transactions.write();
412 for info in txns.values_mut() {
413 if info.state == TransactionState::Active {
414 info.state = TransactionState::Aborted;
415 }
416 }
417 }
418
419 pub fn state(&self, transaction_id: TransactionId) -> Option<TransactionState> {
421 self.transactions
422 .read()
423 .get(&transaction_id)
424 .map(|info| info.state)
425 }
426
427 pub fn start_epoch(&self, transaction_id: TransactionId) -> Option<EpochId> {
429 self.transactions
430 .read()
431 .get(&transaction_id)
432 .map(|info| info.start_epoch)
433 }
434
435 #[must_use]
437 pub fn current_epoch(&self) -> EpochId {
438 EpochId::new(self.current_epoch.load(Ordering::Acquire))
439 }
440
441 #[must_use]
446 pub fn min_active_epoch(&self) -> EpochId {
447 let txns = self.transactions.read();
448 txns.values()
449 .filter(|info| info.state == TransactionState::Active)
450 .map(|info| info.start_epoch)
451 .min()
452 .unwrap_or_else(|| self.current_epoch())
453 }
454
455 #[must_use]
457 pub fn active_count(&self) -> usize {
458 self.transactions
459 .read()
460 .values()
461 .filter(|info| info.state == TransactionState::Active)
462 .count()
463 }
464
465 pub fn gc(&self) -> usize {
473 let mut txns = self.transactions.write();
474 let mut committed = self.committed_epochs.write();
475
476 let min_active_start = txns
478 .values()
479 .filter(|info| info.state == TransactionState::Active)
480 .map(|info| info.start_epoch)
481 .min();
482
483 let initial_count = txns.len();
484
485 let to_remove: Vec<TransactionId> = txns
487 .iter()
488 .filter(|(transaction_id, info)| {
489 match info.state {
490 TransactionState::Active => false, TransactionState::Aborted => true, TransactionState::Committed => {
493 if let Some(min_start) = min_active_start {
496 if let Some(commit_epoch) = committed.get(*transaction_id) {
497 commit_epoch.as_u64() < min_start.as_u64()
499 } else {
500 false
502 }
503 } else {
504 true
506 }
507 }
508 }
509 })
510 .map(|(id, _)| *id)
511 .collect();
512
513 for id in &to_remove {
514 txns.remove(id);
515 committed.remove(id);
516 }
517
518 initial_count - txns.len()
519 }
520
521 pub fn mark_committed(&self, transaction_id: TransactionId, epoch: EpochId) {
525 self.committed_epochs.write().insert(transaction_id, epoch);
526 }
527
528 #[must_use]
532 pub fn last_assigned_transaction_id(&self) -> Option<TransactionId> {
533 let next = self.next_transaction_id.load(Ordering::Relaxed);
534 if next > 1 {
535 Some(TransactionId::new(next - 1))
536 } else {
537 None
538 }
539 }
540}
541
542impl Default for TransactionManager {
543 fn default() -> Self {
544 Self::new()
545 }
546}
547
548#[cfg(test)]
549mod tests {
550 use super::*;
551
552 #[test]
553 fn test_begin_commit() {
554 let mgr = TransactionManager::new();
555
556 let tx = mgr.begin();
557 assert_eq!(mgr.state(tx), Some(TransactionState::Active));
558
559 let commit_epoch = mgr.commit(tx).unwrap();
560 assert_eq!(mgr.state(tx), Some(TransactionState::Committed));
561 assert!(commit_epoch.as_u64() > 0);
562 }
563
564 #[test]
565 fn test_begin_abort() {
566 let mgr = TransactionManager::new();
567
568 let tx = mgr.begin();
569 mgr.abort(tx).unwrap();
570 assert_eq!(mgr.state(tx), Some(TransactionState::Aborted));
571 }
572
573 #[test]
574 fn test_epoch_advancement() {
575 let mgr = TransactionManager::new();
576
577 let initial_epoch = mgr.current_epoch();
578
579 let tx = mgr.begin();
580 let commit_epoch = mgr.commit(tx).unwrap();
581
582 assert!(mgr.current_epoch().as_u64() > initial_epoch.as_u64());
583 assert!(commit_epoch.as_u64() > initial_epoch.as_u64());
584 }
585
586 #[test]
587 fn test_gc_preserves_needed_write_sets() {
588 let mgr = TransactionManager::new();
589
590 let tx1 = mgr.begin();
591 let tx2 = mgr.begin();
592
593 mgr.commit(tx1).unwrap();
594 assert_eq!(mgr.active_count(), 1);
597
598 let cleaned = mgr.gc();
600 assert_eq!(cleaned, 0);
601
602 assert_eq!(mgr.state(tx1), Some(TransactionState::Committed));
604 assert_eq!(mgr.state(tx2), Some(TransactionState::Active));
605 }
606
607 #[test]
608 fn test_gc_removes_old_commits() {
609 let mgr = TransactionManager::new();
610
611 let tx1 = mgr.begin();
613 mgr.commit(tx1).unwrap();
614
615 let tx2 = mgr.begin();
617 mgr.commit(tx2).unwrap();
618
619 let tx3 = mgr.begin();
621
622 let cleaned = mgr.gc();
626 assert_eq!(cleaned, 1); assert_eq!(mgr.state(tx1), None);
629 assert_eq!(mgr.state(tx2), Some(TransactionState::Committed)); assert_eq!(mgr.state(tx3), Some(TransactionState::Active));
631
632 mgr.commit(tx3).unwrap();
634 let cleaned = mgr.gc();
635 assert_eq!(cleaned, 2); }
637
638 #[test]
639 fn test_gc_removes_aborted() {
640 let mgr = TransactionManager::new();
641
642 let tx1 = mgr.begin();
643 let tx2 = mgr.begin();
644
645 mgr.abort(tx1).unwrap();
646 let cleaned = mgr.gc();
650 assert_eq!(cleaned, 1);
651
652 assert_eq!(mgr.state(tx1), None);
653 assert_eq!(mgr.state(tx2), Some(TransactionState::Active));
654 }
655
656 #[test]
657 fn test_write_tracking() {
658 let mgr = TransactionManager::new();
659
660 let tx = mgr.begin();
661
662 mgr.record_write(tx, NodeId::new(1)).unwrap();
664 mgr.record_write(tx, NodeId::new(2)).unwrap();
665 mgr.record_write(tx, EdgeId::new(100)).unwrap();
666
667 assert!(mgr.commit(tx).is_ok());
669 }
670
671 #[test]
672 fn test_min_active_epoch() {
673 let mgr = TransactionManager::new();
674
675 assert_eq!(mgr.min_active_epoch(), mgr.current_epoch());
677
678 let tx1 = mgr.begin();
680 let epoch1 = mgr.start_epoch(tx1).unwrap();
681
682 let tx2 = mgr.begin();
684 mgr.commit(tx2).unwrap();
685
686 let _tx3 = mgr.begin();
687
688 assert_eq!(mgr.min_active_epoch(), epoch1);
690 }
691
692 #[test]
693 fn test_abort_all_active() {
694 let mgr = TransactionManager::new();
695
696 let tx1 = mgr.begin();
697 let tx2 = mgr.begin();
698 let tx3 = mgr.begin();
699
700 mgr.commit(tx1).unwrap();
701 mgr.abort_all_active();
704
705 assert_eq!(mgr.state(tx1), Some(TransactionState::Committed)); assert_eq!(mgr.state(tx2), Some(TransactionState::Aborted));
707 assert_eq!(mgr.state(tx3), Some(TransactionState::Aborted));
708 }
709
710 #[test]
711 fn test_start_epoch_snapshot() {
712 let mgr = TransactionManager::new();
713
714 let tx1 = mgr.begin();
716 let start1 = mgr.start_epoch(tx1).unwrap();
717
718 mgr.commit(tx1).unwrap();
720
721 let tx2 = mgr.begin();
723 let start2 = mgr.start_epoch(tx2).unwrap();
724
725 assert!(start2.as_u64() > start1.as_u64());
727 }
728
729 #[test]
730 fn test_write_write_conflict_detection() {
731 let mgr = TransactionManager::new();
732
733 let tx1 = mgr.begin();
735 let tx2 = mgr.begin();
736
737 let entity = NodeId::new(42);
739 mgr.record_write(tx1, entity).unwrap();
740 mgr.record_write(tx2, entity).unwrap();
741
742 let result1 = mgr.commit(tx1);
744 assert!(result1.is_ok());
745
746 let result2 = mgr.commit(tx2);
748 assert!(result2.is_err());
749 assert!(
750 result2
751 .unwrap_err()
752 .to_string()
753 .contains("Write-write conflict"),
754 "Expected write-write conflict error"
755 );
756 }
757
758 #[test]
759 fn test_commit_epoch_monotonicity() {
760 let mgr = TransactionManager::new();
761
762 let mut epochs = Vec::new();
763
764 for _ in 0..10 {
766 let tx = mgr.begin();
767 let epoch = mgr.commit(tx).unwrap();
768 epochs.push(epoch.as_u64());
769 }
770
771 for i in 1..epochs.len() {
773 assert!(
774 epochs[i] > epochs[i - 1],
775 "Epoch {} ({}) should be greater than epoch {} ({})",
776 i,
777 epochs[i],
778 i - 1,
779 epochs[i - 1]
780 );
781 }
782 }
783
784 #[test]
785 fn test_concurrent_commits_via_threads() {
786 use std::sync::Arc;
787 use std::thread;
788
789 let mgr = Arc::new(TransactionManager::new());
790 let num_threads = 10;
791 let commits_per_thread = 100;
792
793 let handles: Vec<_> = (0..num_threads)
794 .map(|_| {
795 let mgr = Arc::clone(&mgr);
796 thread::spawn(move || {
797 let mut epochs = Vec::new();
798 for _ in 0..commits_per_thread {
799 let tx = mgr.begin();
800 let epoch = mgr.commit(tx).unwrap();
801 epochs.push(epoch.as_u64());
802 }
803 epochs
804 })
805 })
806 .collect();
807
808 let mut all_epochs: Vec<u64> = handles
809 .into_iter()
810 .flat_map(|h| h.join().unwrap())
811 .collect();
812
813 all_epochs.sort_unstable();
815 let unique_count = all_epochs.len();
816 all_epochs.dedup();
817 assert_eq!(
818 all_epochs.len(),
819 unique_count,
820 "All commit epochs should be unique"
821 );
822
823 assert_eq!(
825 mgr.current_epoch().as_u64(),
826 (num_threads * commits_per_thread) as u64,
827 "Final epoch should equal total commits"
828 );
829 }
830
831 #[test]
832 fn test_isolation_level_default() {
833 let mgr = TransactionManager::new();
834
835 let tx = mgr.begin();
836 assert_eq!(
837 mgr.isolation_level(tx),
838 Some(IsolationLevel::SnapshotIsolation)
839 );
840 }
841
842 #[test]
843 fn test_isolation_level_explicit() {
844 let mgr = TransactionManager::new();
845
846 let transaction_rc = mgr.begin_with_isolation(IsolationLevel::ReadCommitted);
847 let transaction_si = mgr.begin_with_isolation(IsolationLevel::SnapshotIsolation);
848 let transaction_ser = mgr.begin_with_isolation(IsolationLevel::Serializable);
849
850 assert_eq!(
851 mgr.isolation_level(transaction_rc),
852 Some(IsolationLevel::ReadCommitted)
853 );
854 assert_eq!(
855 mgr.isolation_level(transaction_si),
856 Some(IsolationLevel::SnapshotIsolation)
857 );
858 assert_eq!(
859 mgr.isolation_level(transaction_ser),
860 Some(IsolationLevel::Serializable)
861 );
862 }
863
864 #[test]
865 fn test_ssi_read_write_conflict_detected() {
866 let mgr = TransactionManager::new();
867
868 let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
870
871 let tx2 = mgr.begin();
873
874 let entity = NodeId::new(42);
876 mgr.record_read(tx1, entity).unwrap();
877
878 mgr.record_write(tx2, entity).unwrap();
880 mgr.commit(tx2).unwrap();
881
882 let result = mgr.commit(tx1);
884 assert!(result.is_err());
885 assert!(
886 result
887 .unwrap_err()
888 .to_string()
889 .contains("Serialization failure"),
890 "Expected serialization failure error"
891 );
892 }
893
894 #[test]
895 fn test_ssi_no_conflict_when_not_serializable() {
896 let mgr = TransactionManager::new();
897
898 let tx1 = mgr.begin();
900
901 let tx2 = mgr.begin();
903
904 let entity = NodeId::new(42);
906 mgr.record_read(tx1, entity).unwrap();
907
908 mgr.record_write(tx2, entity).unwrap();
910 mgr.commit(tx2).unwrap();
911
912 let result = mgr.commit(tx1);
914 assert!(
915 result.is_ok(),
916 "Snapshot Isolation should not detect read-write conflicts"
917 );
918 }
919
920 #[test]
921 fn test_ssi_no_conflict_when_write_before_read() {
922 let mgr = TransactionManager::new();
923
924 let tx1 = mgr.begin();
926 let entity = NodeId::new(42);
927 mgr.record_write(tx1, entity).unwrap();
928 mgr.commit(tx1).unwrap();
929
930 let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
932 mgr.record_read(tx2, entity).unwrap();
933
934 let result = mgr.commit(tx2);
936 assert!(
937 result.is_ok(),
938 "Should not conflict when writer committed before reader started"
939 );
940 }
941
942 #[test]
943 fn test_write_skew_prevented_by_ssi() {
944 let mgr = TransactionManager::new();
951
952 let account_a = NodeId::new(1);
953 let account_b = NodeId::new(2);
954
955 let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
957 let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
958
959 mgr.record_read(tx1, account_a).unwrap();
961 mgr.record_read(tx1, account_b).unwrap();
962 mgr.record_read(tx2, account_a).unwrap();
963 mgr.record_read(tx2, account_b).unwrap();
964
965 mgr.record_write(tx1, account_a).unwrap();
967 mgr.record_write(tx2, account_b).unwrap();
968
969 let result1 = mgr.commit(tx1);
971 assert!(result1.is_ok(), "First commit should succeed");
972
973 let result2 = mgr.commit(tx2);
975 assert!(result2.is_err(), "Second commit should fail due to SSI");
976 assert!(
977 result2
978 .unwrap_err()
979 .to_string()
980 .contains("Serialization failure"),
981 "Expected serialization failure error for write skew prevention"
982 );
983 }
984
985 #[test]
986 fn test_read_committed_allows_non_repeatable_reads() {
987 let mgr = TransactionManager::new();
988
989 let tx1 = mgr.begin_with_isolation(IsolationLevel::ReadCommitted);
991 let entity = NodeId::new(42);
992
993 mgr.record_read(tx1, entity).unwrap();
995
996 let tx2 = mgr.begin();
998 mgr.record_write(tx2, entity).unwrap();
999 mgr.commit(tx2).unwrap();
1000
1001 let result = mgr.commit(tx1);
1003 assert!(
1004 result.is_ok(),
1005 "ReadCommitted should allow non-repeatable reads"
1006 );
1007 }
1008
1009 #[test]
1010 fn test_isolation_level_debug() {
1011 assert_eq!(
1012 format!("{:?}", IsolationLevel::ReadCommitted),
1013 "ReadCommitted"
1014 );
1015 assert_eq!(
1016 format!("{:?}", IsolationLevel::SnapshotIsolation),
1017 "SnapshotIsolation"
1018 );
1019 assert_eq!(
1020 format!("{:?}", IsolationLevel::Serializable),
1021 "Serializable"
1022 );
1023 }
1024
1025 #[test]
1026 fn test_isolation_level_default_trait() {
1027 let default: IsolationLevel = Default::default();
1028 assert_eq!(default, IsolationLevel::SnapshotIsolation);
1029 }
1030
1031 #[test]
1032 fn test_ssi_concurrent_reads_no_conflict() {
1033 let mgr = TransactionManager::new();
1034
1035 let entity = NodeId::new(42);
1036
1037 let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1039 let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1040
1041 mgr.record_read(tx1, entity).unwrap();
1042 mgr.record_read(tx2, entity).unwrap();
1043
1044 assert!(mgr.commit(tx1).is_ok());
1046 assert!(mgr.commit(tx2).is_ok());
1047 }
1048
1049 #[test]
1050 fn test_ssi_write_write_conflict() {
1051 let mgr = TransactionManager::new();
1052
1053 let entity = NodeId::new(42);
1054
1055 let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1057 let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1058
1059 mgr.record_write(tx1, entity).unwrap();
1060 mgr.record_write(tx2, entity).unwrap();
1061
1062 assert!(mgr.commit(tx1).is_ok());
1064
1065 let result = mgr.commit(tx2);
1067 assert!(result.is_err());
1068 }
1069}