1use std::collections::HashSet;
4use std::sync::atomic::{AtomicU64, Ordering};
5
6use grafeo_common::types::{EdgeId, EpochId, NodeId, TransactionId};
7use grafeo_common::utils::error::{Error, Result, TransactionError};
8use grafeo_common::utils::hash::FxHashMap;
9use parking_lot::RwLock;
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum TransactionState {
14 Active,
16 Committed,
18 Aborted,
20}
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
41pub enum IsolationLevel {
42 ReadCommitted,
47
48 #[default]
54 SnapshotIsolation,
55
56 Serializable,
62}
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
66pub enum EntityId {
67 Node(NodeId),
69 Edge(EdgeId),
71}
72
73impl From<NodeId> for EntityId {
74 fn from(id: NodeId) -> Self {
75 Self::Node(id)
76 }
77}
78
79impl From<EdgeId> for EntityId {
80 fn from(id: EdgeId) -> Self {
81 Self::Edge(id)
82 }
83}
84
85pub struct TransactionInfo {
87 pub state: TransactionState,
89 pub isolation_level: IsolationLevel,
91 pub start_epoch: EpochId,
93 pub write_set: HashSet<EntityId>,
95 pub read_set: HashSet<EntityId>,
97}
98
99impl TransactionInfo {
100 fn new(start_epoch: EpochId, isolation_level: IsolationLevel) -> Self {
102 Self {
103 state: TransactionState::Active,
104 isolation_level,
105 start_epoch,
106 write_set: HashSet::new(),
107 read_set: HashSet::new(),
108 }
109 }
110}
111
112pub struct TransactionManager {
114 next_transaction_id: AtomicU64,
116 current_epoch: AtomicU64,
118 active_count: AtomicU64,
120 transactions: RwLock<FxHashMap<TransactionId, TransactionInfo>>,
122 committed_epochs: RwLock<FxHashMap<TransactionId, EpochId>>,
125}
126
127impl TransactionManager {
128 #[must_use]
130 pub fn new() -> Self {
131 Self {
132 next_transaction_id: AtomicU64::new(2),
135 current_epoch: AtomicU64::new(0),
136 active_count: AtomicU64::new(0),
137 transactions: RwLock::new(FxHashMap::default()),
138 committed_epochs: RwLock::new(FxHashMap::default()),
139 }
140 }
141
142 pub fn begin(&self) -> TransactionId {
144 self.begin_with_isolation(IsolationLevel::default())
145 }
146
147 pub fn begin_with_isolation(&self, isolation_level: IsolationLevel) -> TransactionId {
149 let transaction_id =
150 TransactionId::new(self.next_transaction_id.fetch_add(1, Ordering::Relaxed));
151 let epoch = EpochId::new(self.current_epoch.load(Ordering::Acquire));
152
153 let info = TransactionInfo::new(epoch, isolation_level);
154 self.transactions.write().insert(transaction_id, info);
155 self.active_count.fetch_add(1, Ordering::Relaxed);
156 transaction_id
157 }
158
159 pub fn isolation_level(&self, transaction_id: TransactionId) -> Option<IsolationLevel> {
161 self.transactions
162 .read()
163 .get(&transaction_id)
164 .map(|info| info.isolation_level)
165 }
166
167 pub fn record_write(
178 &self,
179 transaction_id: TransactionId,
180 entity: impl Into<EntityId>,
181 ) -> Result<()> {
182 let entity = entity.into();
183 let mut txns = self.transactions.write();
184
185 if self.active_count.load(Ordering::Relaxed) > 1 {
188 for (other_tx, other_info) in txns.iter() {
189 if *other_tx != transaction_id
190 && other_info.state == TransactionState::Active
191 && other_info.write_set.contains(&entity)
192 {
193 return Err(Error::Transaction(TransactionError::WriteConflict(
194 format!("Write-write conflict on entity {entity:?}"),
195 )));
196 }
197 }
198 }
199
200 let info = txns.get_mut(&transaction_id).ok_or_else(|| {
202 Error::Transaction(TransactionError::InvalidState(
203 "Transaction not found".to_string(),
204 ))
205 })?;
206
207 if info.state != TransactionState::Active {
208 return Err(Error::Transaction(TransactionError::InvalidState(
209 "Transaction is not active".to_string(),
210 )));
211 }
212
213 info.write_set.insert(entity);
214 Ok(())
215 }
216
217 pub fn record_read(
223 &self,
224 transaction_id: TransactionId,
225 entity: impl Into<EntityId>,
226 ) -> Result<()> {
227 let mut txns = self.transactions.write();
228 let info = txns.get_mut(&transaction_id).ok_or_else(|| {
229 Error::Transaction(TransactionError::InvalidState(
230 "Transaction not found".to_string(),
231 ))
232 })?;
233
234 if info.state != TransactionState::Active {
235 return Err(Error::Transaction(TransactionError::InvalidState(
236 "Transaction is not active".to_string(),
237 )));
238 }
239
240 info.read_set.insert(entity.into());
241 Ok(())
242 }
243
244 pub fn commit(&self, transaction_id: TransactionId) -> Result<EpochId> {
262 let mut txns = self.transactions.write();
263 let committed = self.committed_epochs.read();
264
265 let (our_isolation, our_start_epoch, our_write_set, our_read_set) = {
267 let info = txns.get(&transaction_id).ok_or_else(|| {
268 Error::Transaction(TransactionError::InvalidState(
269 "Transaction not found".to_string(),
270 ))
271 })?;
272
273 if info.state != TransactionState::Active {
274 return Err(Error::Transaction(TransactionError::InvalidState(
275 "Transaction is not active".to_string(),
276 )));
277 }
278
279 (
280 info.isolation_level,
281 info.start_epoch,
282 info.write_set.clone(),
283 info.read_set.clone(),
284 )
285 };
286
287 for (other_tx, commit_epoch) in committed.iter() {
292 if *other_tx != transaction_id && commit_epoch.as_u64() > our_start_epoch.as_u64() {
293 if let Some(other_info) = txns.get(other_tx) {
295 for entity in &our_write_set {
296 if other_info.write_set.contains(entity) {
297 return Err(Error::Transaction(TransactionError::WriteConflict(
298 format!("Write-write conflict on entity {:?}", entity),
299 )));
300 }
301 }
302 }
303 }
304 }
305
306 if our_isolation == IsolationLevel::Serializable && !our_read_set.is_empty() {
311 for (other_tx, commit_epoch) in committed.iter() {
312 if *other_tx != transaction_id && commit_epoch.as_u64() > our_start_epoch.as_u64() {
313 if let Some(other_info) = txns.get(other_tx) {
315 for entity in &our_read_set {
316 if other_info.write_set.contains(entity) {
317 return Err(Error::Transaction(
318 TransactionError::SerializationFailure(format!(
319 "Read-write conflict on entity {:?}: \
320 another transaction modified data we read",
321 entity
322 )),
323 ));
324 }
325 }
326 }
327 }
328 }
329
330 for (other_tx, other_info) in txns.iter() {
333 if *other_tx == transaction_id {
334 continue;
335 }
336 if other_info.state == TransactionState::Committed {
337 for entity in &our_read_set {
339 if other_info.write_set.contains(entity) {
340 if let Some(commit_epoch) = committed.get(other_tx)
342 && commit_epoch.as_u64() > our_start_epoch.as_u64()
343 {
344 return Err(Error::Transaction(
345 TransactionError::SerializationFailure(format!(
346 "Read-write conflict on entity {:?}: \
347 another transaction modified data we read",
348 entity
349 )),
350 ));
351 }
352 }
353 }
354 }
355 }
356 }
357
358 let commit_epoch = EpochId::new(self.current_epoch.fetch_add(1, Ordering::SeqCst) + 1);
361
362 if let Some(info) = txns.get_mut(&transaction_id) {
364 info.state = TransactionState::Committed;
365 }
366 self.active_count.fetch_sub(1, Ordering::Relaxed);
367
368 drop(committed);
370 self.committed_epochs
371 .write()
372 .insert(transaction_id, commit_epoch);
373
374 Ok(commit_epoch)
375 }
376
377 pub fn abort(&self, transaction_id: TransactionId) -> Result<()> {
383 let mut txns = self.transactions.write();
384
385 let info = txns.get_mut(&transaction_id).ok_or_else(|| {
386 Error::Transaction(TransactionError::InvalidState(
387 "Transaction not found".to_string(),
388 ))
389 })?;
390
391 if info.state != TransactionState::Active {
392 return Err(Error::Transaction(TransactionError::InvalidState(
393 "Transaction is not active".to_string(),
394 )));
395 }
396
397 info.state = TransactionState::Aborted;
398 self.active_count.fetch_sub(1, Ordering::Relaxed);
399 Ok(())
400 }
401
402 pub fn get_write_set(&self, transaction_id: TransactionId) -> Result<HashSet<EntityId>> {
407 let txns = self.transactions.read();
408 let info = txns.get(&transaction_id).ok_or_else(|| {
409 Error::Transaction(TransactionError::InvalidState(
410 "Transaction not found".to_string(),
411 ))
412 })?;
413 Ok(info.write_set.clone())
414 }
415
416 pub fn reset_write_set(
422 &self,
423 transaction_id: TransactionId,
424 write_set: HashSet<EntityId>,
425 ) -> Result<()> {
426 let mut txns = self.transactions.write();
427 let info = txns.get_mut(&transaction_id).ok_or_else(|| {
428 Error::Transaction(TransactionError::InvalidState(
429 "Transaction not found".to_string(),
430 ))
431 })?;
432 info.write_set = write_set;
433 Ok(())
434 }
435
436 pub fn abort_all_active(&self) {
440 let mut txns = self.transactions.write();
441 for info in txns.values_mut() {
442 if info.state == TransactionState::Active {
443 info.state = TransactionState::Aborted;
444 self.active_count.fetch_sub(1, Ordering::Relaxed);
445 }
446 }
447 }
448
449 pub fn state(&self, transaction_id: TransactionId) -> Option<TransactionState> {
451 self.transactions
452 .read()
453 .get(&transaction_id)
454 .map(|info| info.state)
455 }
456
457 pub fn start_epoch(&self, transaction_id: TransactionId) -> Option<EpochId> {
459 self.transactions
460 .read()
461 .get(&transaction_id)
462 .map(|info| info.start_epoch)
463 }
464
465 #[must_use]
467 pub fn current_epoch(&self) -> EpochId {
468 EpochId::new(self.current_epoch.load(Ordering::Acquire))
469 }
470
471 pub fn sync_epoch(&self, epoch: EpochId) {
476 self.current_epoch
477 .fetch_max(epoch.as_u64(), Ordering::SeqCst);
478 }
479
480 #[must_use]
485 pub fn min_active_epoch(&self) -> EpochId {
486 let txns = self.transactions.read();
487 txns.values()
488 .filter(|info| info.state == TransactionState::Active)
489 .map(|info| info.start_epoch)
490 .min()
491 .unwrap_or_else(|| self.current_epoch())
492 }
493
494 #[must_use]
496 pub fn active_count(&self) -> usize {
497 self.transactions
498 .read()
499 .values()
500 .filter(|info| info.state == TransactionState::Active)
501 .count()
502 }
503
504 pub fn gc(&self) -> usize {
512 let mut txns = self.transactions.write();
513 let mut committed = self.committed_epochs.write();
514
515 let min_active_start = txns
517 .values()
518 .filter(|info| info.state == TransactionState::Active)
519 .map(|info| info.start_epoch)
520 .min();
521
522 let initial_count = txns.len();
523
524 let to_remove: Vec<TransactionId> = txns
526 .iter()
527 .filter(|(transaction_id, info)| {
528 match info.state {
529 TransactionState::Active => false, TransactionState::Aborted => true, TransactionState::Committed => {
532 if let Some(min_start) = min_active_start {
535 if let Some(commit_epoch) = committed.get(*transaction_id) {
536 commit_epoch.as_u64() < min_start.as_u64()
538 } else {
539 false
541 }
542 } else {
543 true
545 }
546 }
547 }
548 })
549 .map(|(id, _)| *id)
550 .collect();
551
552 for id in &to_remove {
553 txns.remove(id);
554 committed.remove(id);
555 }
556
557 initial_count - txns.len()
558 }
559
560 pub fn mark_committed(&self, transaction_id: TransactionId, epoch: EpochId) {
564 self.committed_epochs.write().insert(transaction_id, epoch);
565 }
566
567 #[must_use]
571 pub fn last_assigned_transaction_id(&self) -> Option<TransactionId> {
572 let next = self.next_transaction_id.load(Ordering::Relaxed);
573 if next > 1 {
574 Some(TransactionId::new(next - 1))
575 } else {
576 None
577 }
578 }
579}
580
581impl Default for TransactionManager {
582 fn default() -> Self {
583 Self::new()
584 }
585}
586
587#[cfg(test)]
588mod tests {
589 use super::*;
590
591 #[test]
592 fn test_begin_commit() {
593 let mgr = TransactionManager::new();
594
595 let tx = mgr.begin();
596 assert_eq!(mgr.state(tx), Some(TransactionState::Active));
597
598 let commit_epoch = mgr.commit(tx).unwrap();
599 assert_eq!(mgr.state(tx), Some(TransactionState::Committed));
600 assert!(commit_epoch.as_u64() > 0);
601 }
602
603 #[test]
604 fn test_begin_abort() {
605 let mgr = TransactionManager::new();
606
607 let tx = mgr.begin();
608 mgr.abort(tx).unwrap();
609 assert_eq!(mgr.state(tx), Some(TransactionState::Aborted));
610 }
611
612 #[test]
613 fn test_epoch_advancement() {
614 let mgr = TransactionManager::new();
615
616 let initial_epoch = mgr.current_epoch();
617
618 let tx = mgr.begin();
619 let commit_epoch = mgr.commit(tx).unwrap();
620
621 assert!(mgr.current_epoch().as_u64() > initial_epoch.as_u64());
622 assert!(commit_epoch.as_u64() > initial_epoch.as_u64());
623 }
624
625 #[test]
626 fn test_gc_preserves_needed_write_sets() {
627 let mgr = TransactionManager::new();
628
629 let tx1 = mgr.begin();
630 let tx2 = mgr.begin();
631
632 mgr.commit(tx1).unwrap();
633 assert_eq!(mgr.active_count(), 1);
636
637 let cleaned = mgr.gc();
639 assert_eq!(cleaned, 0);
640
641 assert_eq!(mgr.state(tx1), Some(TransactionState::Committed));
643 assert_eq!(mgr.state(tx2), Some(TransactionState::Active));
644 }
645
646 #[test]
647 fn test_gc_removes_old_commits() {
648 let mgr = TransactionManager::new();
649
650 let tx1 = mgr.begin();
652 mgr.commit(tx1).unwrap();
653
654 let tx2 = mgr.begin();
656 mgr.commit(tx2).unwrap();
657
658 let tx3 = mgr.begin();
660
661 let cleaned = mgr.gc();
665 assert_eq!(cleaned, 1); assert_eq!(mgr.state(tx1), None);
668 assert_eq!(mgr.state(tx2), Some(TransactionState::Committed)); assert_eq!(mgr.state(tx3), Some(TransactionState::Active));
670
671 mgr.commit(tx3).unwrap();
673 let cleaned = mgr.gc();
674 assert_eq!(cleaned, 2); }
676
677 #[test]
678 fn test_gc_removes_aborted() {
679 let mgr = TransactionManager::new();
680
681 let tx1 = mgr.begin();
682 let tx2 = mgr.begin();
683
684 mgr.abort(tx1).unwrap();
685 let cleaned = mgr.gc();
689 assert_eq!(cleaned, 1);
690
691 assert_eq!(mgr.state(tx1), None);
692 assert_eq!(mgr.state(tx2), Some(TransactionState::Active));
693 }
694
695 #[test]
696 fn test_write_tracking() {
697 let mgr = TransactionManager::new();
698
699 let tx = mgr.begin();
700
701 mgr.record_write(tx, NodeId::new(1)).unwrap();
703 mgr.record_write(tx, NodeId::new(2)).unwrap();
704 mgr.record_write(tx, EdgeId::new(100)).unwrap();
705
706 assert!(mgr.commit(tx).is_ok());
708 }
709
710 #[test]
711 fn test_min_active_epoch() {
712 let mgr = TransactionManager::new();
713
714 assert_eq!(mgr.min_active_epoch(), mgr.current_epoch());
716
717 let tx1 = mgr.begin();
719 let epoch1 = mgr.start_epoch(tx1).unwrap();
720
721 let tx2 = mgr.begin();
723 mgr.commit(tx2).unwrap();
724
725 let _tx3 = mgr.begin();
726
727 assert_eq!(mgr.min_active_epoch(), epoch1);
729 }
730
731 #[test]
732 fn test_abort_all_active() {
733 let mgr = TransactionManager::new();
734
735 let tx1 = mgr.begin();
736 let tx2 = mgr.begin();
737 let tx3 = mgr.begin();
738
739 mgr.commit(tx1).unwrap();
740 mgr.abort_all_active();
743
744 assert_eq!(mgr.state(tx1), Some(TransactionState::Committed)); assert_eq!(mgr.state(tx2), Some(TransactionState::Aborted));
746 assert_eq!(mgr.state(tx3), Some(TransactionState::Aborted));
747 }
748
749 #[test]
750 fn test_start_epoch_snapshot() {
751 let mgr = TransactionManager::new();
752
753 let tx1 = mgr.begin();
755 let start1 = mgr.start_epoch(tx1).unwrap();
756
757 mgr.commit(tx1).unwrap();
759
760 let tx2 = mgr.begin();
762 let start2 = mgr.start_epoch(tx2).unwrap();
763
764 assert!(start2.as_u64() > start1.as_u64());
766 }
767
768 #[test]
769 fn test_write_write_conflict_detection() {
770 let mgr = TransactionManager::new();
771
772 let tx1 = mgr.begin();
774 let tx2 = mgr.begin();
775
776 let entity = NodeId::new(42);
778 mgr.record_write(tx1, entity).unwrap();
779
780 let result = mgr.record_write(tx2, entity);
782 assert!(result.is_err());
783 assert!(
784 result
785 .unwrap_err()
786 .to_string()
787 .contains("Write-write conflict"),
788 "Expected write-write conflict error"
789 );
790
791 let result1 = mgr.commit(tx1);
793 assert!(result1.is_ok());
794 }
795
796 #[test]
797 fn test_commit_epoch_monotonicity() {
798 let mgr = TransactionManager::new();
799
800 let mut epochs = Vec::new();
801
802 for _ in 0..10 {
804 let tx = mgr.begin();
805 let epoch = mgr.commit(tx).unwrap();
806 epochs.push(epoch.as_u64());
807 }
808
809 for i in 1..epochs.len() {
811 assert!(
812 epochs[i] > epochs[i - 1],
813 "Epoch {} ({}) should be greater than epoch {} ({})",
814 i,
815 epochs[i],
816 i - 1,
817 epochs[i - 1]
818 );
819 }
820 }
821
822 #[test]
823 fn test_concurrent_commits_via_threads() {
824 use std::sync::Arc;
825 use std::thread;
826
827 let mgr = Arc::new(TransactionManager::new());
828 let num_threads = 10;
829 let commits_per_thread = 100;
830
831 let handles: Vec<_> = (0..num_threads)
832 .map(|_| {
833 let mgr = Arc::clone(&mgr);
834 thread::spawn(move || {
835 let mut epochs = Vec::new();
836 for _ in 0..commits_per_thread {
837 let tx = mgr.begin();
838 let epoch = mgr.commit(tx).unwrap();
839 epochs.push(epoch.as_u64());
840 }
841 epochs
842 })
843 })
844 .collect();
845
846 let mut all_epochs: Vec<u64> = handles
847 .into_iter()
848 .flat_map(|h| h.join().unwrap())
849 .collect();
850
851 all_epochs.sort_unstable();
853 let unique_count = all_epochs.len();
854 all_epochs.dedup();
855 assert_eq!(
856 all_epochs.len(),
857 unique_count,
858 "All commit epochs should be unique"
859 );
860
861 assert_eq!(
863 mgr.current_epoch().as_u64(),
864 (num_threads * commits_per_thread) as u64,
865 "Final epoch should equal total commits"
866 );
867 }
868
869 #[test]
870 fn test_isolation_level_default() {
871 let mgr = TransactionManager::new();
872
873 let tx = mgr.begin();
874 assert_eq!(
875 mgr.isolation_level(tx),
876 Some(IsolationLevel::SnapshotIsolation)
877 );
878 }
879
880 #[test]
881 fn test_isolation_level_explicit() {
882 let mgr = TransactionManager::new();
883
884 let transaction_rc = mgr.begin_with_isolation(IsolationLevel::ReadCommitted);
885 let transaction_si = mgr.begin_with_isolation(IsolationLevel::SnapshotIsolation);
886 let transaction_ser = mgr.begin_with_isolation(IsolationLevel::Serializable);
887
888 assert_eq!(
889 mgr.isolation_level(transaction_rc),
890 Some(IsolationLevel::ReadCommitted)
891 );
892 assert_eq!(
893 mgr.isolation_level(transaction_si),
894 Some(IsolationLevel::SnapshotIsolation)
895 );
896 assert_eq!(
897 mgr.isolation_level(transaction_ser),
898 Some(IsolationLevel::Serializable)
899 );
900 }
901
902 #[test]
903 fn test_ssi_read_write_conflict_detected() {
904 let mgr = TransactionManager::new();
905
906 let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
908
909 let tx2 = mgr.begin();
911
912 let entity = NodeId::new(42);
914 mgr.record_read(tx1, entity).unwrap();
915
916 mgr.record_write(tx2, entity).unwrap();
918 mgr.commit(tx2).unwrap();
919
920 let result = mgr.commit(tx1);
922 assert!(result.is_err());
923 assert!(
924 result
925 .unwrap_err()
926 .to_string()
927 .contains("Serialization failure"),
928 "Expected serialization failure error"
929 );
930 }
931
932 #[test]
933 fn test_ssi_no_conflict_when_not_serializable() {
934 let mgr = TransactionManager::new();
935
936 let tx1 = mgr.begin();
938
939 let tx2 = mgr.begin();
941
942 let entity = NodeId::new(42);
944 mgr.record_read(tx1, entity).unwrap();
945
946 mgr.record_write(tx2, entity).unwrap();
948 mgr.commit(tx2).unwrap();
949
950 let result = mgr.commit(tx1);
952 assert!(
953 result.is_ok(),
954 "Snapshot Isolation should not detect read-write conflicts"
955 );
956 }
957
958 #[test]
959 fn test_ssi_no_conflict_when_write_before_read() {
960 let mgr = TransactionManager::new();
961
962 let tx1 = mgr.begin();
964 let entity = NodeId::new(42);
965 mgr.record_write(tx1, entity).unwrap();
966 mgr.commit(tx1).unwrap();
967
968 let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
970 mgr.record_read(tx2, entity).unwrap();
971
972 let result = mgr.commit(tx2);
974 assert!(
975 result.is_ok(),
976 "Should not conflict when writer committed before reader started"
977 );
978 }
979
980 #[test]
981 fn test_write_skew_prevented_by_ssi() {
982 let mgr = TransactionManager::new();
989
990 let account_a = NodeId::new(1);
991 let account_b = NodeId::new(2);
992
993 let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
995 let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
996
997 mgr.record_read(tx1, account_a).unwrap();
999 mgr.record_read(tx1, account_b).unwrap();
1000 mgr.record_read(tx2, account_a).unwrap();
1001 mgr.record_read(tx2, account_b).unwrap();
1002
1003 mgr.record_write(tx1, account_a).unwrap();
1005 mgr.record_write(tx2, account_b).unwrap();
1006
1007 let result1 = mgr.commit(tx1);
1009 assert!(result1.is_ok(), "First commit should succeed");
1010
1011 let result2 = mgr.commit(tx2);
1013 assert!(result2.is_err(), "Second commit should fail due to SSI");
1014 assert!(
1015 result2
1016 .unwrap_err()
1017 .to_string()
1018 .contains("Serialization failure"),
1019 "Expected serialization failure error for write skew prevention"
1020 );
1021 }
1022
1023 #[test]
1024 fn test_read_committed_allows_non_repeatable_reads() {
1025 let mgr = TransactionManager::new();
1026
1027 let tx1 = mgr.begin_with_isolation(IsolationLevel::ReadCommitted);
1029 let entity = NodeId::new(42);
1030
1031 mgr.record_read(tx1, entity).unwrap();
1033
1034 let tx2 = mgr.begin();
1036 mgr.record_write(tx2, entity).unwrap();
1037 mgr.commit(tx2).unwrap();
1038
1039 let result = mgr.commit(tx1);
1041 assert!(
1042 result.is_ok(),
1043 "ReadCommitted should allow non-repeatable reads"
1044 );
1045 }
1046
1047 #[test]
1048 fn test_isolation_level_debug() {
1049 assert_eq!(
1050 format!("{:?}", IsolationLevel::ReadCommitted),
1051 "ReadCommitted"
1052 );
1053 assert_eq!(
1054 format!("{:?}", IsolationLevel::SnapshotIsolation),
1055 "SnapshotIsolation"
1056 );
1057 assert_eq!(
1058 format!("{:?}", IsolationLevel::Serializable),
1059 "Serializable"
1060 );
1061 }
1062
1063 #[test]
1064 fn test_isolation_level_default_trait() {
1065 let default: IsolationLevel = Default::default();
1066 assert_eq!(default, IsolationLevel::SnapshotIsolation);
1067 }
1068
1069 #[test]
1070 fn test_ssi_concurrent_reads_no_conflict() {
1071 let mgr = TransactionManager::new();
1072
1073 let entity = NodeId::new(42);
1074
1075 let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1077 let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1078
1079 mgr.record_read(tx1, entity).unwrap();
1080 mgr.record_read(tx2, entity).unwrap();
1081
1082 assert!(mgr.commit(tx1).is_ok());
1084 assert!(mgr.commit(tx2).is_ok());
1085 }
1086
1087 #[test]
1088 fn test_ssi_write_write_conflict() {
1089 let mgr = TransactionManager::new();
1090
1091 let entity = NodeId::new(42);
1092
1093 let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1095 let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1096
1097 mgr.record_write(tx1, entity).unwrap();
1099
1100 let result = mgr.record_write(tx2, entity);
1102 assert!(
1103 result.is_err(),
1104 "Second record_write should fail with write-write conflict"
1105 );
1106
1107 assert!(mgr.commit(tx1).is_ok());
1109 }
1110}