1use std::collections::HashSet;
4use std::sync::atomic::{AtomicU64, Ordering};
5
6use grafeo_common::types::{EdgeId, EpochId, NodeId, TransactionId};
7use grafeo_common::utils::error::{Error, Result, TransactionError};
8use grafeo_common::utils::hash::FxHashMap;
9use parking_lot::RwLock;
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum TransactionState {
14 Active,
16 Committed,
18 Aborted,
20}
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
41pub enum IsolationLevel {
42 ReadCommitted,
47
48 #[default]
54 SnapshotIsolation,
55
56 Serializable,
62}
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
66pub enum EntityId {
67 Node(NodeId),
69 Edge(EdgeId),
71}
72
73impl From<NodeId> for EntityId {
74 fn from(id: NodeId) -> Self {
75 Self::Node(id)
76 }
77}
78
79impl From<EdgeId> for EntityId {
80 fn from(id: EdgeId) -> Self {
81 Self::Edge(id)
82 }
83}
84
85pub struct TransactionInfo {
87 pub state: TransactionState,
89 pub isolation_level: IsolationLevel,
91 pub start_epoch: EpochId,
93 pub write_set: HashSet<EntityId>,
95 pub read_set: HashSet<EntityId>,
97}
98
99impl TransactionInfo {
100 fn new(start_epoch: EpochId, isolation_level: IsolationLevel) -> Self {
102 Self {
103 state: TransactionState::Active,
104 isolation_level,
105 start_epoch,
106 write_set: HashSet::new(),
107 read_set: HashSet::new(),
108 }
109 }
110}
111
112pub struct TransactionManager {
114 next_transaction_id: AtomicU64,
116 current_epoch: AtomicU64,
118 transactions: RwLock<FxHashMap<TransactionId, TransactionInfo>>,
120 committed_epochs: RwLock<FxHashMap<TransactionId, EpochId>>,
123}
124
125impl TransactionManager {
126 #[must_use]
128 pub fn new() -> Self {
129 Self {
130 next_transaction_id: AtomicU64::new(2),
133 current_epoch: AtomicU64::new(0),
134 transactions: RwLock::new(FxHashMap::default()),
135 committed_epochs: RwLock::new(FxHashMap::default()),
136 }
137 }
138
139 pub fn begin(&self) -> TransactionId {
141 self.begin_with_isolation(IsolationLevel::default())
142 }
143
144 pub fn begin_with_isolation(&self, isolation_level: IsolationLevel) -> TransactionId {
146 let transaction_id =
147 TransactionId::new(self.next_transaction_id.fetch_add(1, Ordering::Relaxed));
148 let epoch = EpochId::new(self.current_epoch.load(Ordering::Acquire));
149
150 let info = TransactionInfo::new(epoch, isolation_level);
151 self.transactions.write().insert(transaction_id, info);
152 transaction_id
153 }
154
155 pub fn isolation_level(&self, transaction_id: TransactionId) -> Option<IsolationLevel> {
157 self.transactions
158 .read()
159 .get(&transaction_id)
160 .map(|info| info.isolation_level)
161 }
162
163 pub fn record_write(
169 &self,
170 transaction_id: TransactionId,
171 entity: impl Into<EntityId>,
172 ) -> Result<()> {
173 let mut txns = self.transactions.write();
174 let info = txns.get_mut(&transaction_id).ok_or_else(|| {
175 Error::Transaction(TransactionError::InvalidState(
176 "Transaction not found".to_string(),
177 ))
178 })?;
179
180 if info.state != TransactionState::Active {
181 return Err(Error::Transaction(TransactionError::InvalidState(
182 "Transaction is not active".to_string(),
183 )));
184 }
185
186 info.write_set.insert(entity.into());
187 Ok(())
188 }
189
190 pub fn record_read(
196 &self,
197 transaction_id: TransactionId,
198 entity: impl Into<EntityId>,
199 ) -> Result<()> {
200 let mut txns = self.transactions.write();
201 let info = txns.get_mut(&transaction_id).ok_or_else(|| {
202 Error::Transaction(TransactionError::InvalidState(
203 "Transaction not found".to_string(),
204 ))
205 })?;
206
207 if info.state != TransactionState::Active {
208 return Err(Error::Transaction(TransactionError::InvalidState(
209 "Transaction is not active".to_string(),
210 )));
211 }
212
213 info.read_set.insert(entity.into());
214 Ok(())
215 }
216
217 pub fn commit(&self, transaction_id: TransactionId) -> Result<EpochId> {
235 let mut txns = self.transactions.write();
236 let committed = self.committed_epochs.read();
237
238 let (our_isolation, our_start_epoch, our_write_set, our_read_set) = {
240 let info = txns.get(&transaction_id).ok_or_else(|| {
241 Error::Transaction(TransactionError::InvalidState(
242 "Transaction not found".to_string(),
243 ))
244 })?;
245
246 if info.state != TransactionState::Active {
247 return Err(Error::Transaction(TransactionError::InvalidState(
248 "Transaction is not active".to_string(),
249 )));
250 }
251
252 (
253 info.isolation_level,
254 info.start_epoch,
255 info.write_set.clone(),
256 info.read_set.clone(),
257 )
258 };
259
260 for (other_tx, other_info) in txns.iter() {
262 if *other_tx == transaction_id {
263 continue;
264 }
265 if other_info.state == TransactionState::Committed {
266 for entity in &our_write_set {
268 if other_info.write_set.contains(entity) {
269 return Err(Error::Transaction(TransactionError::WriteConflict(
270 format!("Write-write conflict on entity {:?}", entity),
271 )));
272 }
273 }
274 }
275 }
276
277 for (other_tx, commit_epoch) in committed.iter() {
279 if *other_tx != transaction_id && commit_epoch.as_u64() > our_start_epoch.as_u64() {
280 if let Some(other_info) = txns.get(other_tx) {
282 for entity in &our_write_set {
283 if other_info.write_set.contains(entity) {
284 return Err(Error::Transaction(TransactionError::WriteConflict(
285 format!("Write-write conflict on entity {:?}", entity),
286 )));
287 }
288 }
289 }
290 }
291 }
292
293 if our_isolation == IsolationLevel::Serializable && !our_read_set.is_empty() {
298 for (other_tx, commit_epoch) in committed.iter() {
299 if *other_tx != transaction_id && commit_epoch.as_u64() > our_start_epoch.as_u64() {
300 if let Some(other_info) = txns.get(other_tx) {
302 for entity in &our_read_set {
303 if other_info.write_set.contains(entity) {
304 return Err(Error::Transaction(
305 TransactionError::SerializationFailure(format!(
306 "Read-write conflict on entity {:?}: \
307 another transaction modified data we read",
308 entity
309 )),
310 ));
311 }
312 }
313 }
314 }
315 }
316
317 for (other_tx, other_info) in txns.iter() {
320 if *other_tx == transaction_id {
321 continue;
322 }
323 if other_info.state == TransactionState::Committed {
324 for entity in &our_read_set {
326 if other_info.write_set.contains(entity) {
327 if let Some(commit_epoch) = committed.get(other_tx)
329 && commit_epoch.as_u64() > our_start_epoch.as_u64()
330 {
331 return Err(Error::Transaction(
332 TransactionError::SerializationFailure(format!(
333 "Read-write conflict on entity {:?}: \
334 another transaction modified data we read",
335 entity
336 )),
337 ));
338 }
339 }
340 }
341 }
342 }
343 }
344
345 let commit_epoch = EpochId::new(self.current_epoch.fetch_add(1, Ordering::SeqCst) + 1);
348
349 if let Some(info) = txns.get_mut(&transaction_id) {
351 info.state = TransactionState::Committed;
352 }
353
354 drop(committed);
356 self.committed_epochs
357 .write()
358 .insert(transaction_id, commit_epoch);
359
360 Ok(commit_epoch)
361 }
362
363 pub fn abort(&self, transaction_id: TransactionId) -> Result<()> {
369 let mut txns = self.transactions.write();
370
371 let info = txns.get_mut(&transaction_id).ok_or_else(|| {
372 Error::Transaction(TransactionError::InvalidState(
373 "Transaction not found".to_string(),
374 ))
375 })?;
376
377 if info.state != TransactionState::Active {
378 return Err(Error::Transaction(TransactionError::InvalidState(
379 "Transaction is not active".to_string(),
380 )));
381 }
382
383 info.state = TransactionState::Aborted;
384 Ok(())
385 }
386
387 pub fn get_write_set(&self, transaction_id: TransactionId) -> Result<HashSet<EntityId>> {
392 let txns = self.transactions.read();
393 let info = txns.get(&transaction_id).ok_or_else(|| {
394 Error::Transaction(TransactionError::InvalidState(
395 "Transaction not found".to_string(),
396 ))
397 })?;
398 Ok(info.write_set.clone())
399 }
400
401 pub fn reset_write_set(
407 &self,
408 transaction_id: TransactionId,
409 write_set: HashSet<EntityId>,
410 ) -> Result<()> {
411 let mut txns = self.transactions.write();
412 let info = txns.get_mut(&transaction_id).ok_or_else(|| {
413 Error::Transaction(TransactionError::InvalidState(
414 "Transaction not found".to_string(),
415 ))
416 })?;
417 info.write_set = write_set;
418 Ok(())
419 }
420
421 pub fn abort_all_active(&self) {
425 let mut txns = self.transactions.write();
426 for info in txns.values_mut() {
427 if info.state == TransactionState::Active {
428 info.state = TransactionState::Aborted;
429 }
430 }
431 }
432
433 pub fn state(&self, transaction_id: TransactionId) -> Option<TransactionState> {
435 self.transactions
436 .read()
437 .get(&transaction_id)
438 .map(|info| info.state)
439 }
440
441 pub fn start_epoch(&self, transaction_id: TransactionId) -> Option<EpochId> {
443 self.transactions
444 .read()
445 .get(&transaction_id)
446 .map(|info| info.start_epoch)
447 }
448
449 #[must_use]
451 pub fn current_epoch(&self) -> EpochId {
452 EpochId::new(self.current_epoch.load(Ordering::Acquire))
453 }
454
455 #[must_use]
460 pub fn min_active_epoch(&self) -> EpochId {
461 let txns = self.transactions.read();
462 txns.values()
463 .filter(|info| info.state == TransactionState::Active)
464 .map(|info| info.start_epoch)
465 .min()
466 .unwrap_or_else(|| self.current_epoch())
467 }
468
469 #[must_use]
471 pub fn active_count(&self) -> usize {
472 self.transactions
473 .read()
474 .values()
475 .filter(|info| info.state == TransactionState::Active)
476 .count()
477 }
478
479 pub fn gc(&self) -> usize {
487 let mut txns = self.transactions.write();
488 let mut committed = self.committed_epochs.write();
489
490 let min_active_start = txns
492 .values()
493 .filter(|info| info.state == TransactionState::Active)
494 .map(|info| info.start_epoch)
495 .min();
496
497 let initial_count = txns.len();
498
499 let to_remove: Vec<TransactionId> = txns
501 .iter()
502 .filter(|(transaction_id, info)| {
503 match info.state {
504 TransactionState::Active => false, TransactionState::Aborted => true, TransactionState::Committed => {
507 if let Some(min_start) = min_active_start {
510 if let Some(commit_epoch) = committed.get(*transaction_id) {
511 commit_epoch.as_u64() < min_start.as_u64()
513 } else {
514 false
516 }
517 } else {
518 true
520 }
521 }
522 }
523 })
524 .map(|(id, _)| *id)
525 .collect();
526
527 for id in &to_remove {
528 txns.remove(id);
529 committed.remove(id);
530 }
531
532 initial_count - txns.len()
533 }
534
535 pub fn mark_committed(&self, transaction_id: TransactionId, epoch: EpochId) {
539 self.committed_epochs.write().insert(transaction_id, epoch);
540 }
541
542 #[must_use]
546 pub fn last_assigned_transaction_id(&self) -> Option<TransactionId> {
547 let next = self.next_transaction_id.load(Ordering::Relaxed);
548 if next > 1 {
549 Some(TransactionId::new(next - 1))
550 } else {
551 None
552 }
553 }
554}
555
556impl Default for TransactionManager {
557 fn default() -> Self {
558 Self::new()
559 }
560}
561
562#[cfg(test)]
563mod tests {
564 use super::*;
565
566 #[test]
567 fn test_begin_commit() {
568 let mgr = TransactionManager::new();
569
570 let tx = mgr.begin();
571 assert_eq!(mgr.state(tx), Some(TransactionState::Active));
572
573 let commit_epoch = mgr.commit(tx).unwrap();
574 assert_eq!(mgr.state(tx), Some(TransactionState::Committed));
575 assert!(commit_epoch.as_u64() > 0);
576 }
577
578 #[test]
579 fn test_begin_abort() {
580 let mgr = TransactionManager::new();
581
582 let tx = mgr.begin();
583 mgr.abort(tx).unwrap();
584 assert_eq!(mgr.state(tx), Some(TransactionState::Aborted));
585 }
586
587 #[test]
588 fn test_epoch_advancement() {
589 let mgr = TransactionManager::new();
590
591 let initial_epoch = mgr.current_epoch();
592
593 let tx = mgr.begin();
594 let commit_epoch = mgr.commit(tx).unwrap();
595
596 assert!(mgr.current_epoch().as_u64() > initial_epoch.as_u64());
597 assert!(commit_epoch.as_u64() > initial_epoch.as_u64());
598 }
599
600 #[test]
601 fn test_gc_preserves_needed_write_sets() {
602 let mgr = TransactionManager::new();
603
604 let tx1 = mgr.begin();
605 let tx2 = mgr.begin();
606
607 mgr.commit(tx1).unwrap();
608 assert_eq!(mgr.active_count(), 1);
611
612 let cleaned = mgr.gc();
614 assert_eq!(cleaned, 0);
615
616 assert_eq!(mgr.state(tx1), Some(TransactionState::Committed));
618 assert_eq!(mgr.state(tx2), Some(TransactionState::Active));
619 }
620
621 #[test]
622 fn test_gc_removes_old_commits() {
623 let mgr = TransactionManager::new();
624
625 let tx1 = mgr.begin();
627 mgr.commit(tx1).unwrap();
628
629 let tx2 = mgr.begin();
631 mgr.commit(tx2).unwrap();
632
633 let tx3 = mgr.begin();
635
636 let cleaned = mgr.gc();
640 assert_eq!(cleaned, 1); assert_eq!(mgr.state(tx1), None);
643 assert_eq!(mgr.state(tx2), Some(TransactionState::Committed)); assert_eq!(mgr.state(tx3), Some(TransactionState::Active));
645
646 mgr.commit(tx3).unwrap();
648 let cleaned = mgr.gc();
649 assert_eq!(cleaned, 2); }
651
652 #[test]
653 fn test_gc_removes_aborted() {
654 let mgr = TransactionManager::new();
655
656 let tx1 = mgr.begin();
657 let tx2 = mgr.begin();
658
659 mgr.abort(tx1).unwrap();
660 let cleaned = mgr.gc();
664 assert_eq!(cleaned, 1);
665
666 assert_eq!(mgr.state(tx1), None);
667 assert_eq!(mgr.state(tx2), Some(TransactionState::Active));
668 }
669
670 #[test]
671 fn test_write_tracking() {
672 let mgr = TransactionManager::new();
673
674 let tx = mgr.begin();
675
676 mgr.record_write(tx, NodeId::new(1)).unwrap();
678 mgr.record_write(tx, NodeId::new(2)).unwrap();
679 mgr.record_write(tx, EdgeId::new(100)).unwrap();
680
681 assert!(mgr.commit(tx).is_ok());
683 }
684
685 #[test]
686 fn test_min_active_epoch() {
687 let mgr = TransactionManager::new();
688
689 assert_eq!(mgr.min_active_epoch(), mgr.current_epoch());
691
692 let tx1 = mgr.begin();
694 let epoch1 = mgr.start_epoch(tx1).unwrap();
695
696 let tx2 = mgr.begin();
698 mgr.commit(tx2).unwrap();
699
700 let _tx3 = mgr.begin();
701
702 assert_eq!(mgr.min_active_epoch(), epoch1);
704 }
705
706 #[test]
707 fn test_abort_all_active() {
708 let mgr = TransactionManager::new();
709
710 let tx1 = mgr.begin();
711 let tx2 = mgr.begin();
712 let tx3 = mgr.begin();
713
714 mgr.commit(tx1).unwrap();
715 mgr.abort_all_active();
718
719 assert_eq!(mgr.state(tx1), Some(TransactionState::Committed)); assert_eq!(mgr.state(tx2), Some(TransactionState::Aborted));
721 assert_eq!(mgr.state(tx3), Some(TransactionState::Aborted));
722 }
723
724 #[test]
725 fn test_start_epoch_snapshot() {
726 let mgr = TransactionManager::new();
727
728 let tx1 = mgr.begin();
730 let start1 = mgr.start_epoch(tx1).unwrap();
731
732 mgr.commit(tx1).unwrap();
734
735 let tx2 = mgr.begin();
737 let start2 = mgr.start_epoch(tx2).unwrap();
738
739 assert!(start2.as_u64() > start1.as_u64());
741 }
742
743 #[test]
744 fn test_write_write_conflict_detection() {
745 let mgr = TransactionManager::new();
746
747 let tx1 = mgr.begin();
749 let tx2 = mgr.begin();
750
751 let entity = NodeId::new(42);
753 mgr.record_write(tx1, entity).unwrap();
754 mgr.record_write(tx2, entity).unwrap();
755
756 let result1 = mgr.commit(tx1);
758 assert!(result1.is_ok());
759
760 let result2 = mgr.commit(tx2);
762 assert!(result2.is_err());
763 assert!(
764 result2
765 .unwrap_err()
766 .to_string()
767 .contains("Write-write conflict"),
768 "Expected write-write conflict error"
769 );
770 }
771
772 #[test]
773 fn test_commit_epoch_monotonicity() {
774 let mgr = TransactionManager::new();
775
776 let mut epochs = Vec::new();
777
778 for _ in 0..10 {
780 let tx = mgr.begin();
781 let epoch = mgr.commit(tx).unwrap();
782 epochs.push(epoch.as_u64());
783 }
784
785 for i in 1..epochs.len() {
787 assert!(
788 epochs[i] > epochs[i - 1],
789 "Epoch {} ({}) should be greater than epoch {} ({})",
790 i,
791 epochs[i],
792 i - 1,
793 epochs[i - 1]
794 );
795 }
796 }
797
798 #[test]
799 fn test_concurrent_commits_via_threads() {
800 use std::sync::Arc;
801 use std::thread;
802
803 let mgr = Arc::new(TransactionManager::new());
804 let num_threads = 10;
805 let commits_per_thread = 100;
806
807 let handles: Vec<_> = (0..num_threads)
808 .map(|_| {
809 let mgr = Arc::clone(&mgr);
810 thread::spawn(move || {
811 let mut epochs = Vec::new();
812 for _ in 0..commits_per_thread {
813 let tx = mgr.begin();
814 let epoch = mgr.commit(tx).unwrap();
815 epochs.push(epoch.as_u64());
816 }
817 epochs
818 })
819 })
820 .collect();
821
822 let mut all_epochs: Vec<u64> = handles
823 .into_iter()
824 .flat_map(|h| h.join().unwrap())
825 .collect();
826
827 all_epochs.sort_unstable();
829 let unique_count = all_epochs.len();
830 all_epochs.dedup();
831 assert_eq!(
832 all_epochs.len(),
833 unique_count,
834 "All commit epochs should be unique"
835 );
836
837 assert_eq!(
839 mgr.current_epoch().as_u64(),
840 (num_threads * commits_per_thread) as u64,
841 "Final epoch should equal total commits"
842 );
843 }
844
845 #[test]
846 fn test_isolation_level_default() {
847 let mgr = TransactionManager::new();
848
849 let tx = mgr.begin();
850 assert_eq!(
851 mgr.isolation_level(tx),
852 Some(IsolationLevel::SnapshotIsolation)
853 );
854 }
855
856 #[test]
857 fn test_isolation_level_explicit() {
858 let mgr = TransactionManager::new();
859
860 let transaction_rc = mgr.begin_with_isolation(IsolationLevel::ReadCommitted);
861 let transaction_si = mgr.begin_with_isolation(IsolationLevel::SnapshotIsolation);
862 let transaction_ser = mgr.begin_with_isolation(IsolationLevel::Serializable);
863
864 assert_eq!(
865 mgr.isolation_level(transaction_rc),
866 Some(IsolationLevel::ReadCommitted)
867 );
868 assert_eq!(
869 mgr.isolation_level(transaction_si),
870 Some(IsolationLevel::SnapshotIsolation)
871 );
872 assert_eq!(
873 mgr.isolation_level(transaction_ser),
874 Some(IsolationLevel::Serializable)
875 );
876 }
877
878 #[test]
879 fn test_ssi_read_write_conflict_detected() {
880 let mgr = TransactionManager::new();
881
882 let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
884
885 let tx2 = mgr.begin();
887
888 let entity = NodeId::new(42);
890 mgr.record_read(tx1, entity).unwrap();
891
892 mgr.record_write(tx2, entity).unwrap();
894 mgr.commit(tx2).unwrap();
895
896 let result = mgr.commit(tx1);
898 assert!(result.is_err());
899 assert!(
900 result
901 .unwrap_err()
902 .to_string()
903 .contains("Serialization failure"),
904 "Expected serialization failure error"
905 );
906 }
907
908 #[test]
909 fn test_ssi_no_conflict_when_not_serializable() {
910 let mgr = TransactionManager::new();
911
912 let tx1 = mgr.begin();
914
915 let tx2 = mgr.begin();
917
918 let entity = NodeId::new(42);
920 mgr.record_read(tx1, entity).unwrap();
921
922 mgr.record_write(tx2, entity).unwrap();
924 mgr.commit(tx2).unwrap();
925
926 let result = mgr.commit(tx1);
928 assert!(
929 result.is_ok(),
930 "Snapshot Isolation should not detect read-write conflicts"
931 );
932 }
933
934 #[test]
935 fn test_ssi_no_conflict_when_write_before_read() {
936 let mgr = TransactionManager::new();
937
938 let tx1 = mgr.begin();
940 let entity = NodeId::new(42);
941 mgr.record_write(tx1, entity).unwrap();
942 mgr.commit(tx1).unwrap();
943
944 let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
946 mgr.record_read(tx2, entity).unwrap();
947
948 let result = mgr.commit(tx2);
950 assert!(
951 result.is_ok(),
952 "Should not conflict when writer committed before reader started"
953 );
954 }
955
956 #[test]
957 fn test_write_skew_prevented_by_ssi() {
958 let mgr = TransactionManager::new();
965
966 let account_a = NodeId::new(1);
967 let account_b = NodeId::new(2);
968
969 let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
971 let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
972
973 mgr.record_read(tx1, account_a).unwrap();
975 mgr.record_read(tx1, account_b).unwrap();
976 mgr.record_read(tx2, account_a).unwrap();
977 mgr.record_read(tx2, account_b).unwrap();
978
979 mgr.record_write(tx1, account_a).unwrap();
981 mgr.record_write(tx2, account_b).unwrap();
982
983 let result1 = mgr.commit(tx1);
985 assert!(result1.is_ok(), "First commit should succeed");
986
987 let result2 = mgr.commit(tx2);
989 assert!(result2.is_err(), "Second commit should fail due to SSI");
990 assert!(
991 result2
992 .unwrap_err()
993 .to_string()
994 .contains("Serialization failure"),
995 "Expected serialization failure error for write skew prevention"
996 );
997 }
998
999 #[test]
1000 fn test_read_committed_allows_non_repeatable_reads() {
1001 let mgr = TransactionManager::new();
1002
1003 let tx1 = mgr.begin_with_isolation(IsolationLevel::ReadCommitted);
1005 let entity = NodeId::new(42);
1006
1007 mgr.record_read(tx1, entity).unwrap();
1009
1010 let tx2 = mgr.begin();
1012 mgr.record_write(tx2, entity).unwrap();
1013 mgr.commit(tx2).unwrap();
1014
1015 let result = mgr.commit(tx1);
1017 assert!(
1018 result.is_ok(),
1019 "ReadCommitted should allow non-repeatable reads"
1020 );
1021 }
1022
1023 #[test]
1024 fn test_isolation_level_debug() {
1025 assert_eq!(
1026 format!("{:?}", IsolationLevel::ReadCommitted),
1027 "ReadCommitted"
1028 );
1029 assert_eq!(
1030 format!("{:?}", IsolationLevel::SnapshotIsolation),
1031 "SnapshotIsolation"
1032 );
1033 assert_eq!(
1034 format!("{:?}", IsolationLevel::Serializable),
1035 "Serializable"
1036 );
1037 }
1038
1039 #[test]
1040 fn test_isolation_level_default_trait() {
1041 let default: IsolationLevel = Default::default();
1042 assert_eq!(default, IsolationLevel::SnapshotIsolation);
1043 }
1044
1045 #[test]
1046 fn test_ssi_concurrent_reads_no_conflict() {
1047 let mgr = TransactionManager::new();
1048
1049 let entity = NodeId::new(42);
1050
1051 let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1053 let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1054
1055 mgr.record_read(tx1, entity).unwrap();
1056 mgr.record_read(tx2, entity).unwrap();
1057
1058 assert!(mgr.commit(tx1).is_ok());
1060 assert!(mgr.commit(tx2).is_ok());
1061 }
1062
1063 #[test]
1064 fn test_ssi_write_write_conflict() {
1065 let mgr = TransactionManager::new();
1066
1067 let entity = NodeId::new(42);
1068
1069 let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1071 let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1072
1073 mgr.record_write(tx1, entity).unwrap();
1074 mgr.record_write(tx2, entity).unwrap();
1075
1076 assert!(mgr.commit(tx1).is_ok());
1078
1079 let result = mgr.commit(tx2);
1081 assert!(result.is_err());
1082 }
1083}