1use std::collections::HashSet;
4use std::sync::atomic::{AtomicU64, Ordering};
5
6use grafeo_common::types::{EdgeId, EpochId, NodeId, TransactionId};
7use grafeo_common::utils::error::{Error, Result, TransactionError};
8use grafeo_common::utils::hash::FxHashMap;
9use parking_lot::RwLock;
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum TransactionState {
14 Active,
16 Committed,
18 Aborted,
20}
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
41pub enum IsolationLevel {
42 ReadCommitted,
47
48 #[default]
54 SnapshotIsolation,
55
56 Serializable,
62}
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
66pub enum EntityId {
67 Node(NodeId),
69 Edge(EdgeId),
71}
72
73impl From<NodeId> for EntityId {
74 fn from(id: NodeId) -> Self {
75 Self::Node(id)
76 }
77}
78
79impl From<EdgeId> for EntityId {
80 fn from(id: EdgeId) -> Self {
81 Self::Edge(id)
82 }
83}
84
85pub struct TransactionInfo {
87 pub state: TransactionState,
89 pub isolation_level: IsolationLevel,
91 pub start_epoch: EpochId,
93 pub write_set: HashSet<EntityId>,
95 pub read_set: HashSet<EntityId>,
97}
98
99impl TransactionInfo {
100 fn new(start_epoch: EpochId, isolation_level: IsolationLevel) -> Self {
102 Self {
103 state: TransactionState::Active,
104 isolation_level,
105 start_epoch,
106 write_set: HashSet::new(),
107 read_set: HashSet::new(),
108 }
109 }
110}
111
112pub struct TransactionManager {
114 next_transaction_id: AtomicU64,
116 current_epoch: AtomicU64,
118 active_count: AtomicU64,
120 transactions: RwLock<FxHashMap<TransactionId, TransactionInfo>>,
122 committed_epochs: RwLock<FxHashMap<TransactionId, EpochId>>,
125}
126
127impl TransactionManager {
128 #[must_use]
130 pub fn new() -> Self {
131 Self {
132 next_transaction_id: AtomicU64::new(2),
135 current_epoch: AtomicU64::new(0),
136 active_count: AtomicU64::new(0),
137 transactions: RwLock::new(FxHashMap::default()),
138 committed_epochs: RwLock::new(FxHashMap::default()),
139 }
140 }
141
142 pub fn begin(&self) -> TransactionId {
144 self.begin_with_isolation(IsolationLevel::default())
145 }
146
147 pub fn begin_with_isolation(&self, isolation_level: IsolationLevel) -> TransactionId {
149 let transaction_id =
150 TransactionId::new(self.next_transaction_id.fetch_add(1, Ordering::Relaxed));
151 let epoch = EpochId::new(self.current_epoch.load(Ordering::Acquire));
152
153 let info = TransactionInfo::new(epoch, isolation_level);
154 self.transactions.write().insert(transaction_id, info);
155 self.active_count.fetch_add(1, Ordering::Relaxed);
156 transaction_id
157 }
158
159 pub fn isolation_level(&self, transaction_id: TransactionId) -> Option<IsolationLevel> {
161 self.transactions
162 .read()
163 .get(&transaction_id)
164 .map(|info| info.isolation_level)
165 }
166
167 pub fn record_write(
178 &self,
179 transaction_id: TransactionId,
180 entity: impl Into<EntityId>,
181 ) -> Result<()> {
182 let entity = entity.into();
183 let mut txns = self.transactions.write();
184 let info = txns.get(&transaction_id).ok_or_else(|| {
185 Error::Transaction(TransactionError::InvalidState(
186 "Transaction not found".to_string(),
187 ))
188 })?;
189
190 if info.state != TransactionState::Active {
191 return Err(Error::Transaction(TransactionError::InvalidState(
192 "Transaction is not active".to_string(),
193 )));
194 }
195
196 if self.active_count.load(Ordering::Relaxed) > 1 {
202 for (other_tx, other_info) in txns.iter() {
203 if *other_tx != transaction_id
204 && other_info.state == TransactionState::Active
205 && other_info.write_set.contains(&entity)
206 {
207 return Err(Error::Transaction(TransactionError::WriteConflict(
208 format!("Write-write conflict on entity {entity:?}"),
209 )));
210 }
211 }
212 }
213
214 let info = txns.get_mut(&transaction_id).expect("checked above");
216 info.write_set.insert(entity);
217 Ok(())
218 }
219
220 pub fn record_read(
226 &self,
227 transaction_id: TransactionId,
228 entity: impl Into<EntityId>,
229 ) -> Result<()> {
230 let mut txns = self.transactions.write();
231 let info = txns.get_mut(&transaction_id).ok_or_else(|| {
232 Error::Transaction(TransactionError::InvalidState(
233 "Transaction not found".to_string(),
234 ))
235 })?;
236
237 if info.state != TransactionState::Active {
238 return Err(Error::Transaction(TransactionError::InvalidState(
239 "Transaction is not active".to_string(),
240 )));
241 }
242
243 info.read_set.insert(entity.into());
244 Ok(())
245 }
246
247 pub fn commit(&self, transaction_id: TransactionId) -> Result<EpochId> {
265 let mut txns = self.transactions.write();
266 let committed = self.committed_epochs.read();
267
268 let (our_isolation, our_start_epoch, our_write_set, our_read_set) = {
270 let info = txns.get(&transaction_id).ok_or_else(|| {
271 Error::Transaction(TransactionError::InvalidState(
272 "Transaction not found".to_string(),
273 ))
274 })?;
275
276 if info.state != TransactionState::Active {
277 return Err(Error::Transaction(TransactionError::InvalidState(
278 "Transaction is not active".to_string(),
279 )));
280 }
281
282 (
283 info.isolation_level,
284 info.start_epoch,
285 info.write_set.clone(),
286 info.read_set.clone(),
287 )
288 };
289
290 for (other_tx, commit_epoch) in committed.iter() {
295 if *other_tx != transaction_id && commit_epoch.as_u64() > our_start_epoch.as_u64() {
296 if let Some(other_info) = txns.get(other_tx) {
298 for entity in &our_write_set {
299 if other_info.write_set.contains(entity) {
300 return Err(Error::Transaction(TransactionError::WriteConflict(
301 format!("Write-write conflict on entity {:?}", entity),
302 )));
303 }
304 }
305 }
306 }
307 }
308
309 if our_isolation == IsolationLevel::Serializable && !our_read_set.is_empty() {
314 for (other_tx, commit_epoch) in committed.iter() {
315 if *other_tx != transaction_id && commit_epoch.as_u64() > our_start_epoch.as_u64() {
316 if let Some(other_info) = txns.get(other_tx) {
318 for entity in &our_read_set {
319 if other_info.write_set.contains(entity) {
320 return Err(Error::Transaction(
321 TransactionError::SerializationFailure(format!(
322 "Read-write conflict on entity {:?}: \
323 another transaction modified data we read",
324 entity
325 )),
326 ));
327 }
328 }
329 }
330 }
331 }
332
333 for (other_tx, other_info) in txns.iter() {
336 if *other_tx == transaction_id {
337 continue;
338 }
339 if other_info.state == TransactionState::Committed {
340 for entity in &our_read_set {
342 if other_info.write_set.contains(entity) {
343 if let Some(commit_epoch) = committed.get(other_tx)
345 && commit_epoch.as_u64() > our_start_epoch.as_u64()
346 {
347 return Err(Error::Transaction(
348 TransactionError::SerializationFailure(format!(
349 "Read-write conflict on entity {:?}: \
350 another transaction modified data we read",
351 entity
352 )),
353 ));
354 }
355 }
356 }
357 }
358 }
359 }
360
361 let commit_epoch = EpochId::new(self.current_epoch.fetch_add(1, Ordering::SeqCst) + 1);
364
365 if let Some(info) = txns.get_mut(&transaction_id) {
367 info.state = TransactionState::Committed;
368 }
369 self.active_count.fetch_sub(1, Ordering::Relaxed);
370
371 drop(committed);
373 self.committed_epochs
374 .write()
375 .insert(transaction_id, commit_epoch);
376
377 Ok(commit_epoch)
378 }
379
380 pub fn abort(&self, transaction_id: TransactionId) -> Result<()> {
386 let mut txns = self.transactions.write();
387
388 let info = txns.get_mut(&transaction_id).ok_or_else(|| {
389 Error::Transaction(TransactionError::InvalidState(
390 "Transaction not found".to_string(),
391 ))
392 })?;
393
394 if info.state != TransactionState::Active {
395 return Err(Error::Transaction(TransactionError::InvalidState(
396 "Transaction is not active".to_string(),
397 )));
398 }
399
400 info.state = TransactionState::Aborted;
401 self.active_count.fetch_sub(1, Ordering::Relaxed);
402 Ok(())
403 }
404
405 pub fn get_write_set(&self, transaction_id: TransactionId) -> Result<HashSet<EntityId>> {
410 let txns = self.transactions.read();
411 let info = txns.get(&transaction_id).ok_or_else(|| {
412 Error::Transaction(TransactionError::InvalidState(
413 "Transaction not found".to_string(),
414 ))
415 })?;
416 Ok(info.write_set.clone())
417 }
418
419 pub fn reset_write_set(
425 &self,
426 transaction_id: TransactionId,
427 write_set: HashSet<EntityId>,
428 ) -> Result<()> {
429 let mut txns = self.transactions.write();
430 let info = txns.get_mut(&transaction_id).ok_or_else(|| {
431 Error::Transaction(TransactionError::InvalidState(
432 "Transaction not found".to_string(),
433 ))
434 })?;
435 info.write_set = write_set;
436 Ok(())
437 }
438
439 pub fn abort_all_active(&self) {
443 let mut txns = self.transactions.write();
444 for info in txns.values_mut() {
445 if info.state == TransactionState::Active {
446 info.state = TransactionState::Aborted;
447 self.active_count.fetch_sub(1, Ordering::Relaxed);
448 }
449 }
450 }
451
452 pub fn state(&self, transaction_id: TransactionId) -> Option<TransactionState> {
454 self.transactions
455 .read()
456 .get(&transaction_id)
457 .map(|info| info.state)
458 }
459
460 pub fn start_epoch(&self, transaction_id: TransactionId) -> Option<EpochId> {
462 self.transactions
463 .read()
464 .get(&transaction_id)
465 .map(|info| info.start_epoch)
466 }
467
468 #[must_use]
470 pub fn current_epoch(&self) -> EpochId {
471 EpochId::new(self.current_epoch.load(Ordering::Acquire))
472 }
473
474 pub fn sync_epoch(&self, epoch: EpochId) {
479 self.current_epoch
480 .fetch_max(epoch.as_u64(), Ordering::SeqCst);
481 }
482
483 #[must_use]
488 pub fn min_active_epoch(&self) -> EpochId {
489 let txns = self.transactions.read();
490 txns.values()
491 .filter(|info| info.state == TransactionState::Active)
492 .map(|info| info.start_epoch)
493 .min()
494 .unwrap_or_else(|| self.current_epoch())
495 }
496
497 #[must_use]
499 pub fn active_count(&self) -> usize {
500 self.transactions
501 .read()
502 .values()
503 .filter(|info| info.state == TransactionState::Active)
504 .count()
505 }
506
507 pub fn gc(&self) -> usize {
515 let mut txns = self.transactions.write();
516 let mut committed = self.committed_epochs.write();
517
518 let min_active_start = txns
520 .values()
521 .filter(|info| info.state == TransactionState::Active)
522 .map(|info| info.start_epoch)
523 .min();
524
525 let initial_count = txns.len();
526
527 let to_remove: Vec<TransactionId> = txns
529 .iter()
530 .filter(|(transaction_id, info)| {
531 match info.state {
532 TransactionState::Active => false, TransactionState::Aborted => true, TransactionState::Committed => {
535 if let Some(min_start) = min_active_start {
538 if let Some(commit_epoch) = committed.get(*transaction_id) {
539 commit_epoch.as_u64() < min_start.as_u64()
541 } else {
542 false
544 }
545 } else {
546 true
548 }
549 }
550 }
551 })
552 .map(|(id, _)| *id)
553 .collect();
554
555 for id in &to_remove {
556 txns.remove(id);
557 committed.remove(id);
558 }
559
560 initial_count - txns.len()
561 }
562
563 pub fn mark_committed(&self, transaction_id: TransactionId, epoch: EpochId) {
567 self.committed_epochs.write().insert(transaction_id, epoch);
568 }
569
570 #[must_use]
574 pub fn last_assigned_transaction_id(&self) -> Option<TransactionId> {
575 let next = self.next_transaction_id.load(Ordering::Relaxed);
576 if next > 1 {
577 Some(TransactionId::new(next - 1))
578 } else {
579 None
580 }
581 }
582}
583
584impl Default for TransactionManager {
585 fn default() -> Self {
586 Self::new()
587 }
588}
589
590#[cfg(test)]
591mod tests {
592 use super::*;
593
594 #[test]
595 fn test_begin_commit() {
596 let mgr = TransactionManager::new();
597
598 let tx = mgr.begin();
599 assert_eq!(mgr.state(tx), Some(TransactionState::Active));
600
601 let commit_epoch = mgr.commit(tx).unwrap();
602 assert_eq!(mgr.state(tx), Some(TransactionState::Committed));
603 assert!(commit_epoch.as_u64() > 0);
604 }
605
606 #[test]
607 fn test_begin_abort() {
608 let mgr = TransactionManager::new();
609
610 let tx = mgr.begin();
611 mgr.abort(tx).unwrap();
612 assert_eq!(mgr.state(tx), Some(TransactionState::Aborted));
613 }
614
615 #[test]
616 fn test_epoch_advancement() {
617 let mgr = TransactionManager::new();
618
619 let initial_epoch = mgr.current_epoch();
620
621 let tx = mgr.begin();
622 let commit_epoch = mgr.commit(tx).unwrap();
623
624 assert!(mgr.current_epoch().as_u64() > initial_epoch.as_u64());
625 assert!(commit_epoch.as_u64() > initial_epoch.as_u64());
626 }
627
628 #[test]
629 fn test_gc_preserves_needed_write_sets() {
630 let mgr = TransactionManager::new();
631
632 let tx1 = mgr.begin();
633 let tx2 = mgr.begin();
634
635 mgr.commit(tx1).unwrap();
636 assert_eq!(mgr.active_count(), 1);
639
640 let cleaned = mgr.gc();
642 assert_eq!(cleaned, 0);
643
644 assert_eq!(mgr.state(tx1), Some(TransactionState::Committed));
646 assert_eq!(mgr.state(tx2), Some(TransactionState::Active));
647 }
648
649 #[test]
650 fn test_gc_removes_old_commits() {
651 let mgr = TransactionManager::new();
652
653 let tx1 = mgr.begin();
655 mgr.commit(tx1).unwrap();
656
657 let tx2 = mgr.begin();
659 mgr.commit(tx2).unwrap();
660
661 let tx3 = mgr.begin();
663
664 let cleaned = mgr.gc();
668 assert_eq!(cleaned, 1); assert_eq!(mgr.state(tx1), None);
671 assert_eq!(mgr.state(tx2), Some(TransactionState::Committed)); assert_eq!(mgr.state(tx3), Some(TransactionState::Active));
673
674 mgr.commit(tx3).unwrap();
676 let cleaned = mgr.gc();
677 assert_eq!(cleaned, 2); }
679
680 #[test]
681 fn test_gc_removes_aborted() {
682 let mgr = TransactionManager::new();
683
684 let tx1 = mgr.begin();
685 let tx2 = mgr.begin();
686
687 mgr.abort(tx1).unwrap();
688 let cleaned = mgr.gc();
692 assert_eq!(cleaned, 1);
693
694 assert_eq!(mgr.state(tx1), None);
695 assert_eq!(mgr.state(tx2), Some(TransactionState::Active));
696 }
697
698 #[test]
699 fn test_write_tracking() {
700 let mgr = TransactionManager::new();
701
702 let tx = mgr.begin();
703
704 mgr.record_write(tx, NodeId::new(1)).unwrap();
706 mgr.record_write(tx, NodeId::new(2)).unwrap();
707 mgr.record_write(tx, EdgeId::new(100)).unwrap();
708
709 assert!(mgr.commit(tx).is_ok());
711 }
712
713 #[test]
714 fn test_min_active_epoch() {
715 let mgr = TransactionManager::new();
716
717 assert_eq!(mgr.min_active_epoch(), mgr.current_epoch());
719
720 let tx1 = mgr.begin();
722 let epoch1 = mgr.start_epoch(tx1).unwrap();
723
724 let tx2 = mgr.begin();
726 mgr.commit(tx2).unwrap();
727
728 let _tx3 = mgr.begin();
729
730 assert_eq!(mgr.min_active_epoch(), epoch1);
732 }
733
734 #[test]
735 fn test_abort_all_active() {
736 let mgr = TransactionManager::new();
737
738 let tx1 = mgr.begin();
739 let tx2 = mgr.begin();
740 let tx3 = mgr.begin();
741
742 mgr.commit(tx1).unwrap();
743 mgr.abort_all_active();
746
747 assert_eq!(mgr.state(tx1), Some(TransactionState::Committed)); assert_eq!(mgr.state(tx2), Some(TransactionState::Aborted));
749 assert_eq!(mgr.state(tx3), Some(TransactionState::Aborted));
750 }
751
752 #[test]
753 fn test_start_epoch_snapshot() {
754 let mgr = TransactionManager::new();
755
756 let tx1 = mgr.begin();
758 let start1 = mgr.start_epoch(tx1).unwrap();
759
760 mgr.commit(tx1).unwrap();
762
763 let tx2 = mgr.begin();
765 let start2 = mgr.start_epoch(tx2).unwrap();
766
767 assert!(start2.as_u64() > start1.as_u64());
769 }
770
771 #[test]
772 fn test_write_write_conflict_detection() {
773 let mgr = TransactionManager::new();
774
775 let tx1 = mgr.begin();
777 let tx2 = mgr.begin();
778
779 let entity = NodeId::new(42);
781 mgr.record_write(tx1, entity).unwrap();
782
783 let result = mgr.record_write(tx2, entity);
785 assert!(result.is_err());
786 assert!(
787 result
788 .unwrap_err()
789 .to_string()
790 .contains("Write-write conflict"),
791 "Expected write-write conflict error"
792 );
793
794 let result1 = mgr.commit(tx1);
796 assert!(result1.is_ok());
797 }
798
799 #[test]
800 fn test_commit_epoch_monotonicity() {
801 let mgr = TransactionManager::new();
802
803 let mut epochs = Vec::new();
804
805 for _ in 0..10 {
807 let tx = mgr.begin();
808 let epoch = mgr.commit(tx).unwrap();
809 epochs.push(epoch.as_u64());
810 }
811
812 for i in 1..epochs.len() {
814 assert!(
815 epochs[i] > epochs[i - 1],
816 "Epoch {} ({}) should be greater than epoch {} ({})",
817 i,
818 epochs[i],
819 i - 1,
820 epochs[i - 1]
821 );
822 }
823 }
824
825 #[test]
826 fn test_concurrent_commits_via_threads() {
827 use std::sync::Arc;
828 use std::thread;
829
830 let mgr = Arc::new(TransactionManager::new());
831 let num_threads = 10;
832 let commits_per_thread = 100;
833
834 let handles: Vec<_> = (0..num_threads)
835 .map(|_| {
836 let mgr = Arc::clone(&mgr);
837 thread::spawn(move || {
838 let mut epochs = Vec::new();
839 for _ in 0..commits_per_thread {
840 let tx = mgr.begin();
841 let epoch = mgr.commit(tx).unwrap();
842 epochs.push(epoch.as_u64());
843 }
844 epochs
845 })
846 })
847 .collect();
848
849 let mut all_epochs: Vec<u64> = handles
850 .into_iter()
851 .flat_map(|h| h.join().unwrap())
852 .collect();
853
854 all_epochs.sort_unstable();
856 let unique_count = all_epochs.len();
857 all_epochs.dedup();
858 assert_eq!(
859 all_epochs.len(),
860 unique_count,
861 "All commit epochs should be unique"
862 );
863
864 assert_eq!(
866 mgr.current_epoch().as_u64(),
867 (num_threads * commits_per_thread) as u64,
868 "Final epoch should equal total commits"
869 );
870 }
871
872 #[test]
873 fn test_isolation_level_default() {
874 let mgr = TransactionManager::new();
875
876 let tx = mgr.begin();
877 assert_eq!(
878 mgr.isolation_level(tx),
879 Some(IsolationLevel::SnapshotIsolation)
880 );
881 }
882
883 #[test]
884 fn test_isolation_level_explicit() {
885 let mgr = TransactionManager::new();
886
887 let transaction_rc = mgr.begin_with_isolation(IsolationLevel::ReadCommitted);
888 let transaction_si = mgr.begin_with_isolation(IsolationLevel::SnapshotIsolation);
889 let transaction_ser = mgr.begin_with_isolation(IsolationLevel::Serializable);
890
891 assert_eq!(
892 mgr.isolation_level(transaction_rc),
893 Some(IsolationLevel::ReadCommitted)
894 );
895 assert_eq!(
896 mgr.isolation_level(transaction_si),
897 Some(IsolationLevel::SnapshotIsolation)
898 );
899 assert_eq!(
900 mgr.isolation_level(transaction_ser),
901 Some(IsolationLevel::Serializable)
902 );
903 }
904
905 #[test]
906 fn test_ssi_read_write_conflict_detected() {
907 let mgr = TransactionManager::new();
908
909 let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
911
912 let tx2 = mgr.begin();
914
915 let entity = NodeId::new(42);
917 mgr.record_read(tx1, entity).unwrap();
918
919 mgr.record_write(tx2, entity).unwrap();
921 mgr.commit(tx2).unwrap();
922
923 let result = mgr.commit(tx1);
925 assert!(result.is_err());
926 assert!(
927 result
928 .unwrap_err()
929 .to_string()
930 .contains("Serialization failure"),
931 "Expected serialization failure error"
932 );
933 }
934
935 #[test]
936 fn test_ssi_no_conflict_when_not_serializable() {
937 let mgr = TransactionManager::new();
938
939 let tx1 = mgr.begin();
941
942 let tx2 = mgr.begin();
944
945 let entity = NodeId::new(42);
947 mgr.record_read(tx1, entity).unwrap();
948
949 mgr.record_write(tx2, entity).unwrap();
951 mgr.commit(tx2).unwrap();
952
953 let result = mgr.commit(tx1);
955 assert!(
956 result.is_ok(),
957 "Snapshot Isolation should not detect read-write conflicts"
958 );
959 }
960
961 #[test]
962 fn test_ssi_no_conflict_when_write_before_read() {
963 let mgr = TransactionManager::new();
964
965 let tx1 = mgr.begin();
967 let entity = NodeId::new(42);
968 mgr.record_write(tx1, entity).unwrap();
969 mgr.commit(tx1).unwrap();
970
971 let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
973 mgr.record_read(tx2, entity).unwrap();
974
975 let result = mgr.commit(tx2);
977 assert!(
978 result.is_ok(),
979 "Should not conflict when writer committed before reader started"
980 );
981 }
982
983 #[test]
984 fn test_write_skew_prevented_by_ssi() {
985 let mgr = TransactionManager::new();
992
993 let account_a = NodeId::new(1);
994 let account_b = NodeId::new(2);
995
996 let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
998 let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
999
1000 mgr.record_read(tx1, account_a).unwrap();
1002 mgr.record_read(tx1, account_b).unwrap();
1003 mgr.record_read(tx2, account_a).unwrap();
1004 mgr.record_read(tx2, account_b).unwrap();
1005
1006 mgr.record_write(tx1, account_a).unwrap();
1008 mgr.record_write(tx2, account_b).unwrap();
1009
1010 let result1 = mgr.commit(tx1);
1012 assert!(result1.is_ok(), "First commit should succeed");
1013
1014 let result2 = mgr.commit(tx2);
1016 assert!(result2.is_err(), "Second commit should fail due to SSI");
1017 assert!(
1018 result2
1019 .unwrap_err()
1020 .to_string()
1021 .contains("Serialization failure"),
1022 "Expected serialization failure error for write skew prevention"
1023 );
1024 }
1025
1026 #[test]
1027 fn test_read_committed_allows_non_repeatable_reads() {
1028 let mgr = TransactionManager::new();
1029
1030 let tx1 = mgr.begin_with_isolation(IsolationLevel::ReadCommitted);
1032 let entity = NodeId::new(42);
1033
1034 mgr.record_read(tx1, entity).unwrap();
1036
1037 let tx2 = mgr.begin();
1039 mgr.record_write(tx2, entity).unwrap();
1040 mgr.commit(tx2).unwrap();
1041
1042 let result = mgr.commit(tx1);
1044 assert!(
1045 result.is_ok(),
1046 "ReadCommitted should allow non-repeatable reads"
1047 );
1048 }
1049
1050 #[test]
1051 fn test_isolation_level_debug() {
1052 assert_eq!(
1053 format!("{:?}", IsolationLevel::ReadCommitted),
1054 "ReadCommitted"
1055 );
1056 assert_eq!(
1057 format!("{:?}", IsolationLevel::SnapshotIsolation),
1058 "SnapshotIsolation"
1059 );
1060 assert_eq!(
1061 format!("{:?}", IsolationLevel::Serializable),
1062 "Serializable"
1063 );
1064 }
1065
1066 #[test]
1067 fn test_isolation_level_default_trait() {
1068 let default: IsolationLevel = Default::default();
1069 assert_eq!(default, IsolationLevel::SnapshotIsolation);
1070 }
1071
1072 #[test]
1073 fn test_ssi_concurrent_reads_no_conflict() {
1074 let mgr = TransactionManager::new();
1075
1076 let entity = NodeId::new(42);
1077
1078 let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1080 let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1081
1082 mgr.record_read(tx1, entity).unwrap();
1083 mgr.record_read(tx2, entity).unwrap();
1084
1085 assert!(mgr.commit(tx1).is_ok());
1087 assert!(mgr.commit(tx2).is_ok());
1088 }
1089
1090 #[test]
1091 fn test_ssi_write_write_conflict() {
1092 let mgr = TransactionManager::new();
1093
1094 let entity = NodeId::new(42);
1095
1096 let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1098 let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1099
1100 mgr.record_write(tx1, entity).unwrap();
1102
1103 let result = mgr.record_write(tx2, entity);
1105 assert!(
1106 result.is_err(),
1107 "Second record_write should fail with write-write conflict"
1108 );
1109
1110 assert!(mgr.commit(tx1).is_ok());
1112 }
1113}