1use std::collections::HashSet;
4use std::sync::atomic::{AtomicU64, Ordering};
5
6use grafeo_common::types::{EdgeId, EpochId, NodeId, TransactionId};
7use grafeo_common::utils::error::{Error, Result, TransactionError};
8use grafeo_common::utils::hash::FxHashMap;
9use parking_lot::RwLock;
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum TransactionState {
14 Active,
16 Committed,
18 Aborted,
20}
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
41#[non_exhaustive]
42pub enum IsolationLevel {
43 ReadCommitted,
48
49 #[default]
55 SnapshotIsolation,
56
57 Serializable,
63}
64
65#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
67pub enum EntityId {
68 Node(NodeId),
70 Edge(EdgeId),
72}
73
74impl From<NodeId> for EntityId {
75 fn from(id: NodeId) -> Self {
76 Self::Node(id)
77 }
78}
79
80impl From<EdgeId> for EntityId {
81 fn from(id: EdgeId) -> Self {
82 Self::Edge(id)
83 }
84}
85
86pub struct TransactionInfo {
88 pub state: TransactionState,
90 pub isolation_level: IsolationLevel,
92 pub start_epoch: EpochId,
94 pub write_set: HashSet<EntityId>,
96 pub read_set: HashSet<EntityId>,
98}
99
100impl TransactionInfo {
101 fn new(start_epoch: EpochId, isolation_level: IsolationLevel) -> Self {
103 Self {
104 state: TransactionState::Active,
105 isolation_level,
106 start_epoch,
107 write_set: HashSet::new(),
108 read_set: HashSet::new(),
109 }
110 }
111}
112
113pub struct TransactionManager {
115 next_transaction_id: AtomicU64,
117 current_epoch: AtomicU64,
119 active_count: AtomicU64,
121 transactions: RwLock<FxHashMap<TransactionId, TransactionInfo>>,
123 committed_epochs: RwLock<FxHashMap<TransactionId, EpochId>>,
126}
127
128impl TransactionManager {
129 #[must_use]
131 pub fn new() -> Self {
132 Self {
133 next_transaction_id: AtomicU64::new(2),
136 current_epoch: AtomicU64::new(0),
137 active_count: AtomicU64::new(0),
138 transactions: RwLock::new(FxHashMap::default()),
139 committed_epochs: RwLock::new(FxHashMap::default()),
140 }
141 }
142
143 pub fn begin(&self) -> TransactionId {
145 self.begin_with_isolation(IsolationLevel::default())
146 }
147
148 pub fn begin_with_isolation(&self, isolation_level: IsolationLevel) -> TransactionId {
150 let transaction_id =
151 TransactionId::new(self.next_transaction_id.fetch_add(1, Ordering::Relaxed));
152 let epoch = EpochId::new(self.current_epoch.load(Ordering::Acquire));
153
154 let info = TransactionInfo::new(epoch, isolation_level);
155 self.transactions.write().insert(transaction_id, info);
156 self.active_count.fetch_add(1, Ordering::Relaxed);
157 transaction_id
158 }
159
160 pub fn isolation_level(&self, transaction_id: TransactionId) -> Option<IsolationLevel> {
162 self.transactions
163 .read()
164 .get(&transaction_id)
165 .map(|info| info.isolation_level)
166 }
167
168 pub fn record_write(
179 &self,
180 transaction_id: TransactionId,
181 entity: impl Into<EntityId>,
182 ) -> Result<()> {
183 let entity = entity.into();
184 let mut txns = self.transactions.write();
185
186 if self.active_count.load(Ordering::Relaxed) > 1 {
189 for (other_tx, other_info) in txns.iter() {
190 if *other_tx != transaction_id
191 && other_info.state == TransactionState::Active
192 && other_info.write_set.contains(&entity)
193 {
194 return Err(Error::Transaction(TransactionError::WriteConflict(
195 format!("Write-write conflict on entity {entity:?}"),
196 )));
197 }
198 }
199 }
200
201 let info = txns.get_mut(&transaction_id).ok_or_else(|| {
203 Error::Transaction(TransactionError::InvalidState(
204 "Transaction not found".to_string(),
205 ))
206 })?;
207
208 if info.state != TransactionState::Active {
209 return Err(Error::Transaction(TransactionError::InvalidState(
210 "Transaction is not active".to_string(),
211 )));
212 }
213
214 info.write_set.insert(entity);
215 Ok(())
216 }
217
218 pub fn record_read(
224 &self,
225 transaction_id: TransactionId,
226 entity: impl Into<EntityId>,
227 ) -> Result<()> {
228 let mut txns = self.transactions.write();
229 let info = txns.get_mut(&transaction_id).ok_or_else(|| {
230 Error::Transaction(TransactionError::InvalidState(
231 "Transaction not found".to_string(),
232 ))
233 })?;
234
235 if info.state != TransactionState::Active {
236 return Err(Error::Transaction(TransactionError::InvalidState(
237 "Transaction is not active".to_string(),
238 )));
239 }
240
241 info.read_set.insert(entity.into());
242 Ok(())
243 }
244
245 pub fn commit(&self, transaction_id: TransactionId) -> Result<EpochId> {
263 let mut txns = self.transactions.write();
264 let committed = self.committed_epochs.read();
265
266 let (our_isolation, our_start_epoch, our_write_set, our_read_set) = {
268 let info = txns.get(&transaction_id).ok_or_else(|| {
269 Error::Transaction(TransactionError::InvalidState(
270 "Transaction not found".to_string(),
271 ))
272 })?;
273
274 if info.state != TransactionState::Active {
275 return Err(Error::Transaction(TransactionError::InvalidState(
276 "Transaction is not active".to_string(),
277 )));
278 }
279
280 (
281 info.isolation_level,
282 info.start_epoch,
283 info.write_set.clone(),
284 info.read_set.clone(),
285 )
286 };
287
288 for (other_tx, commit_epoch) in committed.iter() {
293 if *other_tx != transaction_id && commit_epoch.as_u64() > our_start_epoch.as_u64() {
294 if let Some(other_info) = txns.get(other_tx) {
296 for entity in &our_write_set {
297 if other_info.write_set.contains(entity) {
298 return Err(Error::Transaction(TransactionError::WriteConflict(
299 format!("Write-write conflict on entity {:?}", entity),
300 )));
301 }
302 }
303 }
304 }
305 }
306
307 if our_isolation == IsolationLevel::Serializable && !our_read_set.is_empty() {
312 for (other_tx, commit_epoch) in committed.iter() {
313 if *other_tx != transaction_id && commit_epoch.as_u64() > our_start_epoch.as_u64() {
314 if let Some(other_info) = txns.get(other_tx) {
316 for entity in &our_read_set {
317 if other_info.write_set.contains(entity) {
318 return Err(Error::Transaction(
319 TransactionError::SerializationFailure(format!(
320 "Read-write conflict on entity {:?}: \
321 another transaction modified data we read",
322 entity
323 )),
324 ));
325 }
326 }
327 }
328 }
329 }
330
331 for (other_tx, other_info) in txns.iter() {
334 if *other_tx == transaction_id {
335 continue;
336 }
337 if other_info.state == TransactionState::Committed {
338 for entity in &our_read_set {
340 if other_info.write_set.contains(entity) {
341 if let Some(commit_epoch) = committed.get(other_tx)
343 && commit_epoch.as_u64() > our_start_epoch.as_u64()
344 {
345 return Err(Error::Transaction(
346 TransactionError::SerializationFailure(format!(
347 "Read-write conflict on entity {:?}: \
348 another transaction modified data we read",
349 entity
350 )),
351 ));
352 }
353 }
354 }
355 }
356 }
357 }
358
359 let commit_epoch = EpochId::new(self.current_epoch.fetch_add(1, Ordering::SeqCst) + 1);
362
363 if let Some(info) = txns.get_mut(&transaction_id) {
365 info.state = TransactionState::Committed;
366 }
367 self.active_count.fetch_sub(1, Ordering::Relaxed);
368
369 drop(committed);
371 self.committed_epochs
372 .write()
373 .insert(transaction_id, commit_epoch);
374
375 Ok(commit_epoch)
376 }
377
378 pub fn abort(&self, transaction_id: TransactionId) -> Result<()> {
384 let mut txns = self.transactions.write();
385
386 let info = txns.get_mut(&transaction_id).ok_or_else(|| {
387 Error::Transaction(TransactionError::InvalidState(
388 "Transaction not found".to_string(),
389 ))
390 })?;
391
392 if info.state != TransactionState::Active {
393 return Err(Error::Transaction(TransactionError::InvalidState(
394 "Transaction is not active".to_string(),
395 )));
396 }
397
398 info.state = TransactionState::Aborted;
399 self.active_count.fetch_sub(1, Ordering::Relaxed);
400 Ok(())
401 }
402
403 pub fn get_write_set(&self, transaction_id: TransactionId) -> Result<HashSet<EntityId>> {
412 let txns = self.transactions.read();
413 let info = txns.get(&transaction_id).ok_or_else(|| {
414 Error::Transaction(TransactionError::InvalidState(
415 "Transaction not found".to_string(),
416 ))
417 })?;
418 Ok(info.write_set.clone())
419 }
420
421 pub fn reset_write_set(
427 &self,
428 transaction_id: TransactionId,
429 write_set: HashSet<EntityId>,
430 ) -> Result<()> {
431 let mut txns = self.transactions.write();
432 let info = txns.get_mut(&transaction_id).ok_or_else(|| {
433 Error::Transaction(TransactionError::InvalidState(
434 "Transaction not found".to_string(),
435 ))
436 })?;
437 info.write_set = write_set;
438 Ok(())
439 }
440
441 pub fn abort_all_active(&self) {
445 let mut txns = self.transactions.write();
446 for info in txns.values_mut() {
447 if info.state == TransactionState::Active {
448 info.state = TransactionState::Aborted;
449 self.active_count.fetch_sub(1, Ordering::Relaxed);
450 }
451 }
452 }
453
454 pub fn state(&self, transaction_id: TransactionId) -> Option<TransactionState> {
456 self.transactions
457 .read()
458 .get(&transaction_id)
459 .map(|info| info.state)
460 }
461
462 pub fn start_epoch(&self, transaction_id: TransactionId) -> Option<EpochId> {
464 self.transactions
465 .read()
466 .get(&transaction_id)
467 .map(|info| info.start_epoch)
468 }
469
470 #[must_use]
472 pub fn current_epoch(&self) -> EpochId {
473 EpochId::new(self.current_epoch.load(Ordering::Acquire))
474 }
475
476 pub fn sync_epoch(&self, epoch: EpochId) {
481 self.current_epoch
482 .fetch_max(epoch.as_u64(), Ordering::SeqCst);
483 }
484
485 #[must_use]
490 pub fn min_active_epoch(&self) -> EpochId {
491 let txns = self.transactions.read();
492 txns.values()
493 .filter(|info| info.state == TransactionState::Active)
494 .map(|info| info.start_epoch)
495 .min()
496 .unwrap_or_else(|| self.current_epoch())
497 }
498
499 #[must_use]
501 pub fn active_count(&self) -> usize {
502 self.transactions
503 .read()
504 .values()
505 .filter(|info| info.state == TransactionState::Active)
506 .count()
507 }
508
509 pub fn gc(&self) -> usize {
517 let mut txns = self.transactions.write();
518 let mut committed = self.committed_epochs.write();
519
520 let min_active_start = txns
522 .values()
523 .filter(|info| info.state == TransactionState::Active)
524 .map(|info| info.start_epoch)
525 .min();
526
527 let initial_count = txns.len();
528
529 let to_remove: Vec<TransactionId> = txns
531 .iter()
532 .filter(|(transaction_id, info)| {
533 match info.state {
534 TransactionState::Active => false, TransactionState::Aborted => true, TransactionState::Committed => {
537 if let Some(min_start) = min_active_start {
540 if let Some(commit_epoch) = committed.get(*transaction_id) {
541 commit_epoch.as_u64() < min_start.as_u64()
543 } else {
544 false
546 }
547 } else {
548 true
550 }
551 }
552 }
553 })
554 .map(|(id, _)| *id)
555 .collect();
556
557 for id in &to_remove {
558 txns.remove(id);
559 committed.remove(id);
560 }
561
562 initial_count - txns.len()
563 }
564
565 pub fn mark_committed(&self, transaction_id: TransactionId, epoch: EpochId) {
569 self.committed_epochs.write().insert(transaction_id, epoch);
570 }
571
572 #[must_use]
576 pub fn last_assigned_transaction_id(&self) -> Option<TransactionId> {
577 let next = self.next_transaction_id.load(Ordering::Relaxed);
578 if next > 1 {
579 Some(TransactionId::new(next - 1))
580 } else {
581 None
582 }
583 }
584}
585
586impl Default for TransactionManager {
587 fn default() -> Self {
588 Self::new()
589 }
590}
591
592#[cfg(test)]
593mod tests {
594 use super::*;
595
596 #[test]
597 fn test_begin_commit() {
598 let mgr = TransactionManager::new();
599
600 let tx = mgr.begin();
601 assert_eq!(mgr.state(tx), Some(TransactionState::Active));
602
603 let commit_epoch = mgr.commit(tx).unwrap();
604 assert_eq!(mgr.state(tx), Some(TransactionState::Committed));
605 assert!(commit_epoch.as_u64() > 0);
606 }
607
608 #[test]
609 fn test_begin_abort() {
610 let mgr = TransactionManager::new();
611
612 let tx = mgr.begin();
613 mgr.abort(tx).unwrap();
614 assert_eq!(mgr.state(tx), Some(TransactionState::Aborted));
615 }
616
617 #[test]
618 fn test_epoch_advancement() {
619 let mgr = TransactionManager::new();
620
621 let initial_epoch = mgr.current_epoch();
622
623 let tx = mgr.begin();
624 let commit_epoch = mgr.commit(tx).unwrap();
625
626 assert!(mgr.current_epoch().as_u64() > initial_epoch.as_u64());
627 assert!(commit_epoch.as_u64() > initial_epoch.as_u64());
628 }
629
630 #[test]
631 fn test_gc_preserves_needed_write_sets() {
632 let mgr = TransactionManager::new();
633
634 let tx1 = mgr.begin();
635 let tx2 = mgr.begin();
636
637 mgr.commit(tx1).unwrap();
638 assert_eq!(mgr.active_count(), 1);
641
642 let cleaned = mgr.gc();
644 assert_eq!(cleaned, 0);
645
646 assert_eq!(mgr.state(tx1), Some(TransactionState::Committed));
648 assert_eq!(mgr.state(tx2), Some(TransactionState::Active));
649 }
650
651 #[test]
652 fn test_gc_removes_old_commits() {
653 let mgr = TransactionManager::new();
654
655 let tx1 = mgr.begin();
657 mgr.commit(tx1).unwrap();
658
659 let tx2 = mgr.begin();
661 mgr.commit(tx2).unwrap();
662
663 let tx3 = mgr.begin();
665
666 let cleaned = mgr.gc();
670 assert_eq!(cleaned, 1); assert_eq!(mgr.state(tx1), None);
673 assert_eq!(mgr.state(tx2), Some(TransactionState::Committed)); assert_eq!(mgr.state(tx3), Some(TransactionState::Active));
675
676 mgr.commit(tx3).unwrap();
678 let cleaned = mgr.gc();
679 assert_eq!(cleaned, 2); }
681
682 #[test]
683 fn test_gc_removes_aborted() {
684 let mgr = TransactionManager::new();
685
686 let tx1 = mgr.begin();
687 let tx2 = mgr.begin();
688
689 mgr.abort(tx1).unwrap();
690 let cleaned = mgr.gc();
694 assert_eq!(cleaned, 1);
695
696 assert_eq!(mgr.state(tx1), None);
697 assert_eq!(mgr.state(tx2), Some(TransactionState::Active));
698 }
699
700 #[test]
701 fn test_write_tracking() {
702 let mgr = TransactionManager::new();
703
704 let tx = mgr.begin();
705
706 mgr.record_write(tx, NodeId::new(1)).unwrap();
708 mgr.record_write(tx, NodeId::new(2)).unwrap();
709 mgr.record_write(tx, EdgeId::new(100)).unwrap();
710
711 assert!(mgr.commit(tx).is_ok());
713 }
714
715 #[test]
716 fn test_min_active_epoch() {
717 let mgr = TransactionManager::new();
718
719 assert_eq!(mgr.min_active_epoch(), mgr.current_epoch());
721
722 let tx1 = mgr.begin();
724 let epoch1 = mgr.start_epoch(tx1).unwrap();
725
726 let tx2 = mgr.begin();
728 mgr.commit(tx2).unwrap();
729
730 let _tx3 = mgr.begin();
731
732 assert_eq!(mgr.min_active_epoch(), epoch1);
734 }
735
736 #[test]
737 fn test_abort_all_active() {
738 let mgr = TransactionManager::new();
739
740 let tx1 = mgr.begin();
741 let tx2 = mgr.begin();
742 let tx3 = mgr.begin();
743
744 mgr.commit(tx1).unwrap();
745 mgr.abort_all_active();
748
749 assert_eq!(mgr.state(tx1), Some(TransactionState::Committed)); assert_eq!(mgr.state(tx2), Some(TransactionState::Aborted));
751 assert_eq!(mgr.state(tx3), Some(TransactionState::Aborted));
752 }
753
754 #[test]
755 fn test_start_epoch_snapshot() {
756 let mgr = TransactionManager::new();
757
758 let tx1 = mgr.begin();
760 let start1 = mgr.start_epoch(tx1).unwrap();
761
762 mgr.commit(tx1).unwrap();
764
765 let tx2 = mgr.begin();
767 let start2 = mgr.start_epoch(tx2).unwrap();
768
769 assert!(start2.as_u64() > start1.as_u64());
771 }
772
773 #[test]
774 fn test_write_write_conflict_detection() {
775 let mgr = TransactionManager::new();
776
777 let tx1 = mgr.begin();
779 let tx2 = mgr.begin();
780
781 let entity = NodeId::new(42);
783 mgr.record_write(tx1, entity).unwrap();
784
785 let result = mgr.record_write(tx2, entity);
787 assert!(result.is_err());
788 assert!(
789 result
790 .unwrap_err()
791 .to_string()
792 .contains("Write-write conflict"),
793 "Expected write-write conflict error"
794 );
795
796 let result1 = mgr.commit(tx1);
798 assert!(result1.is_ok());
799 }
800
801 #[test]
802 fn test_commit_epoch_monotonicity() {
803 let mgr = TransactionManager::new();
804
805 let mut epochs = Vec::new();
806
807 for _ in 0..10 {
809 let tx = mgr.begin();
810 let epoch = mgr.commit(tx).unwrap();
811 epochs.push(epoch.as_u64());
812 }
813
814 for i in 1..epochs.len() {
816 assert!(
817 epochs[i] > epochs[i - 1],
818 "Epoch {} ({}) should be greater than epoch {} ({})",
819 i,
820 epochs[i],
821 i - 1,
822 epochs[i - 1]
823 );
824 }
825 }
826
827 #[test]
828 fn test_concurrent_commits_via_threads() {
829 use std::sync::Arc;
830 use std::thread;
831
832 let mgr = Arc::new(TransactionManager::new());
833 let num_threads = 10;
834 let commits_per_thread = 100;
835
836 let handles: Vec<_> = (0..num_threads)
837 .map(|_| {
838 let mgr = Arc::clone(&mgr);
839 thread::spawn(move || {
840 let mut epochs = Vec::new();
841 for _ in 0..commits_per_thread {
842 let tx = mgr.begin();
843 let epoch = mgr.commit(tx).unwrap();
844 epochs.push(epoch.as_u64());
845 }
846 epochs
847 })
848 })
849 .collect();
850
851 let mut all_epochs: Vec<u64> = handles
852 .into_iter()
853 .flat_map(|h| h.join().unwrap())
854 .collect();
855
856 all_epochs.sort_unstable();
858 let unique_count = all_epochs.len();
859 all_epochs.dedup();
860 assert_eq!(
861 all_epochs.len(),
862 unique_count,
863 "All commit epochs should be unique"
864 );
865
866 assert_eq!(
868 mgr.current_epoch().as_u64(),
869 (num_threads * commits_per_thread) as u64,
870 "Final epoch should equal total commits"
871 );
872 }
873
874 #[test]
875 fn test_isolation_level_default() {
876 let mgr = TransactionManager::new();
877
878 let tx = mgr.begin();
879 assert_eq!(
880 mgr.isolation_level(tx),
881 Some(IsolationLevel::SnapshotIsolation)
882 );
883 }
884
885 #[test]
886 fn test_isolation_level_explicit() {
887 let mgr = TransactionManager::new();
888
889 let transaction_rc = mgr.begin_with_isolation(IsolationLevel::ReadCommitted);
890 let transaction_si = mgr.begin_with_isolation(IsolationLevel::SnapshotIsolation);
891 let transaction_ser = mgr.begin_with_isolation(IsolationLevel::Serializable);
892
893 assert_eq!(
894 mgr.isolation_level(transaction_rc),
895 Some(IsolationLevel::ReadCommitted)
896 );
897 assert_eq!(
898 mgr.isolation_level(transaction_si),
899 Some(IsolationLevel::SnapshotIsolation)
900 );
901 assert_eq!(
902 mgr.isolation_level(transaction_ser),
903 Some(IsolationLevel::Serializable)
904 );
905 }
906
907 #[test]
908 fn test_ssi_read_write_conflict_detected() {
909 let mgr = TransactionManager::new();
910
911 let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
913
914 let tx2 = mgr.begin();
916
917 let entity = NodeId::new(42);
919 mgr.record_read(tx1, entity).unwrap();
920
921 mgr.record_write(tx2, entity).unwrap();
923 mgr.commit(tx2).unwrap();
924
925 let result = mgr.commit(tx1);
927 assert!(result.is_err());
928 assert!(
929 result
930 .unwrap_err()
931 .to_string()
932 .contains("Serialization failure"),
933 "Expected serialization failure error"
934 );
935 }
936
937 #[test]
938 fn test_ssi_no_conflict_when_not_serializable() {
939 let mgr = TransactionManager::new();
940
941 let tx1 = mgr.begin();
943
944 let tx2 = mgr.begin();
946
947 let entity = NodeId::new(42);
949 mgr.record_read(tx1, entity).unwrap();
950
951 mgr.record_write(tx2, entity).unwrap();
953 mgr.commit(tx2).unwrap();
954
955 let result = mgr.commit(tx1);
957 assert!(
958 result.is_ok(),
959 "Snapshot Isolation should not detect read-write conflicts"
960 );
961 }
962
963 #[test]
964 fn test_ssi_no_conflict_when_write_before_read() {
965 let mgr = TransactionManager::new();
966
967 let tx1 = mgr.begin();
969 let entity = NodeId::new(42);
970 mgr.record_write(tx1, entity).unwrap();
971 mgr.commit(tx1).unwrap();
972
973 let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
975 mgr.record_read(tx2, entity).unwrap();
976
977 let result = mgr.commit(tx2);
979 assert!(
980 result.is_ok(),
981 "Should not conflict when writer committed before reader started"
982 );
983 }
984
985 #[test]
986 fn test_write_skew_prevented_by_ssi() {
987 let mgr = TransactionManager::new();
994
995 let account_a = NodeId::new(1);
996 let account_b = NodeId::new(2);
997
998 let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1000 let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1001
1002 mgr.record_read(tx1, account_a).unwrap();
1004 mgr.record_read(tx1, account_b).unwrap();
1005 mgr.record_read(tx2, account_a).unwrap();
1006 mgr.record_read(tx2, account_b).unwrap();
1007
1008 mgr.record_write(tx1, account_a).unwrap();
1010 mgr.record_write(tx2, account_b).unwrap();
1011
1012 let result1 = mgr.commit(tx1);
1014 assert!(result1.is_ok(), "First commit should succeed");
1015
1016 let result2 = mgr.commit(tx2);
1018 assert!(result2.is_err(), "Second commit should fail due to SSI");
1019 assert!(
1020 result2
1021 .unwrap_err()
1022 .to_string()
1023 .contains("Serialization failure"),
1024 "Expected serialization failure error for write skew prevention"
1025 );
1026 }
1027
1028 #[test]
1029 fn test_read_committed_allows_non_repeatable_reads() {
1030 let mgr = TransactionManager::new();
1031
1032 let tx1 = mgr.begin_with_isolation(IsolationLevel::ReadCommitted);
1034 let entity = NodeId::new(42);
1035
1036 mgr.record_read(tx1, entity).unwrap();
1038
1039 let tx2 = mgr.begin();
1041 mgr.record_write(tx2, entity).unwrap();
1042 mgr.commit(tx2).unwrap();
1043
1044 let result = mgr.commit(tx1);
1046 assert!(
1047 result.is_ok(),
1048 "ReadCommitted should allow non-repeatable reads"
1049 );
1050 }
1051
1052 #[test]
1053 fn test_isolation_level_debug() {
1054 assert_eq!(
1055 format!("{:?}", IsolationLevel::ReadCommitted),
1056 "ReadCommitted"
1057 );
1058 assert_eq!(
1059 format!("{:?}", IsolationLevel::SnapshotIsolation),
1060 "SnapshotIsolation"
1061 );
1062 assert_eq!(
1063 format!("{:?}", IsolationLevel::Serializable),
1064 "Serializable"
1065 );
1066 }
1067
1068 #[test]
1069 fn test_isolation_level_default_trait() {
1070 let default: IsolationLevel = Default::default();
1071 assert_eq!(default, IsolationLevel::SnapshotIsolation);
1072 }
1073
1074 #[test]
1075 fn test_ssi_concurrent_reads_no_conflict() {
1076 let mgr = TransactionManager::new();
1077
1078 let entity = NodeId::new(42);
1079
1080 let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1082 let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1083
1084 mgr.record_read(tx1, entity).unwrap();
1085 mgr.record_read(tx2, entity).unwrap();
1086
1087 assert!(mgr.commit(tx1).is_ok());
1089 assert!(mgr.commit(tx2).is_ok());
1090 }
1091
1092 #[test]
1093 fn test_ssi_write_write_conflict() {
1094 let mgr = TransactionManager::new();
1095
1096 let entity = NodeId::new(42);
1097
1098 let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1100 let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1101
1102 mgr.record_write(tx1, entity).unwrap();
1104
1105 let result = mgr.record_write(tx2, entity);
1107 assert!(
1108 result.is_err(),
1109 "Second record_write should fail with write-write conflict"
1110 );
1111
1112 assert!(mgr.commit(tx1).is_ok());
1114 }
1115}