1use std::collections::HashSet;
4use std::sync::atomic::{AtomicU64, Ordering};
5
6use grafeo_common::types::{EdgeId, EpochId, NodeId, TransactionId};
7use grafeo_common::utils::error::{Error, Result, TransactionError};
8use grafeo_common::utils::hash::FxHashMap;
9use parking_lot::RwLock;
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13#[non_exhaustive]
14pub enum TransactionState {
15 Active,
17 Committed,
19 Aborted,
21}
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
42#[non_exhaustive]
43pub enum IsolationLevel {
44 ReadCommitted,
49
50 #[default]
56 SnapshotIsolation,
57
58 Serializable,
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
68#[non_exhaustive]
69pub enum EntityId {
70 Node(NodeId),
72 Edge(EdgeId),
74}
75
76impl From<NodeId> for EntityId {
77 fn from(id: NodeId) -> Self {
78 Self::Node(id)
79 }
80}
81
82impl From<EdgeId> for EntityId {
83 fn from(id: EdgeId) -> Self {
84 Self::Edge(id)
85 }
86}
87
88pub struct TransactionInfo {
90 pub state: TransactionState,
92 pub isolation_level: IsolationLevel,
94 pub start_epoch: EpochId,
96 pub write_set: HashSet<EntityId>,
98 pub read_set: HashSet<EntityId>,
100}
101
102impl TransactionInfo {
103 fn new(start_epoch: EpochId, isolation_level: IsolationLevel) -> Self {
105 Self {
106 state: TransactionState::Active,
107 isolation_level,
108 start_epoch,
109 write_set: HashSet::new(),
110 read_set: HashSet::new(),
111 }
112 }
113}
114
115pub struct TransactionManager {
117 next_transaction_id: AtomicU64,
119 current_epoch: AtomicU64,
121 active_count: AtomicU64,
123 transactions: RwLock<FxHashMap<TransactionId, TransactionInfo>>,
125 committed_epochs: RwLock<FxHashMap<TransactionId, EpochId>>,
128}
129
130impl TransactionManager {
131 #[must_use]
133 pub fn new() -> Self {
134 Self {
135 next_transaction_id: AtomicU64::new(2),
138 current_epoch: AtomicU64::new(0),
139 active_count: AtomicU64::new(0),
140 transactions: RwLock::new(FxHashMap::default()),
141 committed_epochs: RwLock::new(FxHashMap::default()),
142 }
143 }
144
145 pub fn begin(&self) -> TransactionId {
147 self.begin_with_isolation(IsolationLevel::default())
148 }
149
150 pub fn begin_with_isolation(&self, isolation_level: IsolationLevel) -> TransactionId {
152 let transaction_id =
153 TransactionId::new(self.next_transaction_id.fetch_add(1, Ordering::Relaxed));
154 let epoch = EpochId::new(self.current_epoch.load(Ordering::Acquire));
155
156 let info = TransactionInfo::new(epoch, isolation_level);
157 self.transactions.write().insert(transaction_id, info);
158 self.active_count.fetch_add(1, Ordering::Relaxed);
159 transaction_id
160 }
161
162 pub fn isolation_level(&self, transaction_id: TransactionId) -> Option<IsolationLevel> {
164 self.transactions
165 .read()
166 .get(&transaction_id)
167 .map(|info| info.isolation_level)
168 }
169
170 pub fn record_write(
181 &self,
182 transaction_id: TransactionId,
183 entity: impl Into<EntityId>,
184 ) -> Result<()> {
185 let entity = entity.into();
186 let mut txns = self.transactions.write();
187
188 if self.active_count.load(Ordering::Relaxed) > 1 {
191 for (other_tx, other_info) in txns.iter() {
192 if *other_tx != transaction_id
193 && other_info.state == TransactionState::Active
194 && other_info.write_set.contains(&entity)
195 {
196 return Err(Error::Transaction(TransactionError::WriteConflict(
197 format!("Write-write conflict on entity {entity:?}"),
198 )));
199 }
200 }
201 }
202
203 let info = txns.get_mut(&transaction_id).ok_or_else(|| {
205 Error::Transaction(TransactionError::InvalidState(
206 "Transaction not found".to_string(),
207 ))
208 })?;
209
210 if info.state != TransactionState::Active {
211 return Err(Error::Transaction(TransactionError::InvalidState(
212 "Transaction is not active".to_string(),
213 )));
214 }
215
216 info.write_set.insert(entity);
217 Ok(())
218 }
219
220 pub fn record_read(
226 &self,
227 transaction_id: TransactionId,
228 entity: impl Into<EntityId>,
229 ) -> Result<()> {
230 let mut txns = self.transactions.write();
231 let info = txns.get_mut(&transaction_id).ok_or_else(|| {
232 Error::Transaction(TransactionError::InvalidState(
233 "Transaction not found".to_string(),
234 ))
235 })?;
236
237 if info.state != TransactionState::Active {
238 return Err(Error::Transaction(TransactionError::InvalidState(
239 "Transaction is not active".to_string(),
240 )));
241 }
242
243 info.read_set.insert(entity.into());
244 Ok(())
245 }
246
247 pub fn commit(&self, transaction_id: TransactionId) -> Result<EpochId> {
265 let mut txns = self.transactions.write();
266 let committed = self.committed_epochs.read();
267
268 let (our_isolation, our_start_epoch, our_write_set, our_read_set) = {
270 let info = txns.get(&transaction_id).ok_or_else(|| {
271 Error::Transaction(TransactionError::InvalidState(
272 "Transaction not found".to_string(),
273 ))
274 })?;
275
276 if info.state != TransactionState::Active {
277 return Err(Error::Transaction(TransactionError::InvalidState(
278 "Transaction is not active".to_string(),
279 )));
280 }
281
282 (
283 info.isolation_level,
284 info.start_epoch,
285 info.write_set.clone(),
286 info.read_set.clone(),
287 )
288 };
289
290 for (other_tx, commit_epoch) in committed.iter() {
295 if *other_tx != transaction_id && commit_epoch.as_u64() > our_start_epoch.as_u64() {
296 if let Some(other_info) = txns.get(other_tx) {
298 for entity in &our_write_set {
299 if other_info.write_set.contains(entity) {
300 return Err(Error::Transaction(TransactionError::WriteConflict(
301 format!("Write-write conflict on entity {:?}", entity),
302 )));
303 }
304 }
305 }
306 }
307 }
308
309 if our_isolation == IsolationLevel::Serializable && !our_read_set.is_empty() {
314 for (other_tx, commit_epoch) in committed.iter() {
315 if *other_tx != transaction_id && commit_epoch.as_u64() > our_start_epoch.as_u64() {
316 if let Some(other_info) = txns.get(other_tx) {
318 for entity in &our_read_set {
319 if other_info.write_set.contains(entity) {
320 return Err(Error::Transaction(
321 TransactionError::SerializationFailure(format!(
322 "Read-write conflict on entity {:?}: \
323 another transaction modified data we read",
324 entity
325 )),
326 ));
327 }
328 }
329 }
330 }
331 }
332
333 for (other_tx, other_info) in txns.iter() {
336 if *other_tx == transaction_id {
337 continue;
338 }
339 if other_info.state == TransactionState::Committed {
340 for entity in &our_read_set {
342 if other_info.write_set.contains(entity) {
343 if let Some(commit_epoch) = committed.get(other_tx)
345 && commit_epoch.as_u64() > our_start_epoch.as_u64()
346 {
347 return Err(Error::Transaction(
348 TransactionError::SerializationFailure(format!(
349 "Read-write conflict on entity {:?}: \
350 another transaction modified data we read",
351 entity
352 )),
353 ));
354 }
355 }
356 }
357 }
358 }
359 }
360
361 let commit_epoch = EpochId::new(self.current_epoch.fetch_add(1, Ordering::SeqCst) + 1);
364
365 if let Some(info) = txns.get_mut(&transaction_id) {
367 info.state = TransactionState::Committed;
368 }
369 self.active_count.fetch_sub(1, Ordering::Relaxed);
370
371 drop(committed);
373 self.committed_epochs
374 .write()
375 .insert(transaction_id, commit_epoch);
376
377 Ok(commit_epoch)
378 }
379
380 pub fn abort(&self, transaction_id: TransactionId) -> Result<()> {
386 let mut txns = self.transactions.write();
387
388 let info = txns.get_mut(&transaction_id).ok_or_else(|| {
389 Error::Transaction(TransactionError::InvalidState(
390 "Transaction not found".to_string(),
391 ))
392 })?;
393
394 if info.state != TransactionState::Active {
395 return Err(Error::Transaction(TransactionError::InvalidState(
396 "Transaction is not active".to_string(),
397 )));
398 }
399
400 info.state = TransactionState::Aborted;
401 self.active_count.fetch_sub(1, Ordering::Relaxed);
402 Ok(())
403 }
404
405 pub fn get_write_set(&self, transaction_id: TransactionId) -> Result<HashSet<EntityId>> {
414 let txns = self.transactions.read();
415 let info = txns.get(&transaction_id).ok_or_else(|| {
416 Error::Transaction(TransactionError::InvalidState(
417 "Transaction not found".to_string(),
418 ))
419 })?;
420 Ok(info.write_set.clone())
421 }
422
423 pub fn reset_write_set(
429 &self,
430 transaction_id: TransactionId,
431 write_set: HashSet<EntityId>,
432 ) -> Result<()> {
433 let mut txns = self.transactions.write();
434 let info = txns.get_mut(&transaction_id).ok_or_else(|| {
435 Error::Transaction(TransactionError::InvalidState(
436 "Transaction not found".to_string(),
437 ))
438 })?;
439 info.write_set = write_set;
440 Ok(())
441 }
442
443 pub fn abort_all_active(&self) {
447 let mut txns = self.transactions.write();
448 for info in txns.values_mut() {
449 if info.state == TransactionState::Active {
450 info.state = TransactionState::Aborted;
451 self.active_count.fetch_sub(1, Ordering::Relaxed);
452 }
453 }
454 }
455
456 pub fn state(&self, transaction_id: TransactionId) -> Option<TransactionState> {
458 self.transactions
459 .read()
460 .get(&transaction_id)
461 .map(|info| info.state)
462 }
463
464 pub fn start_epoch(&self, transaction_id: TransactionId) -> Option<EpochId> {
466 self.transactions
467 .read()
468 .get(&transaction_id)
469 .map(|info| info.start_epoch)
470 }
471
472 #[must_use]
474 pub fn current_epoch(&self) -> EpochId {
475 EpochId::new(self.current_epoch.load(Ordering::Acquire))
476 }
477
478 pub fn sync_epoch(&self, epoch: EpochId) {
483 self.current_epoch
484 .fetch_max(epoch.as_u64(), Ordering::SeqCst);
485 }
486
487 #[must_use]
492 pub fn min_active_epoch(&self) -> EpochId {
493 let txns = self.transactions.read();
494 txns.values()
495 .filter(|info| info.state == TransactionState::Active)
496 .map(|info| info.start_epoch)
497 .min()
498 .unwrap_or_else(|| self.current_epoch())
499 }
500
501 #[must_use]
503 pub fn active_count(&self) -> usize {
504 self.transactions
505 .read()
506 .values()
507 .filter(|info| info.state == TransactionState::Active)
508 .count()
509 }
510
511 pub fn gc(&self) -> usize {
519 let mut txns = self.transactions.write();
520 let mut committed = self.committed_epochs.write();
521
522 let min_active_start = txns
524 .values()
525 .filter(|info| info.state == TransactionState::Active)
526 .map(|info| info.start_epoch)
527 .min();
528
529 let initial_count = txns.len();
530
531 let to_remove: Vec<TransactionId> = txns
533 .iter()
534 .filter(|(transaction_id, info)| {
535 match info.state {
536 TransactionState::Active => false, TransactionState::Aborted => true, TransactionState::Committed => {
539 if let Some(min_start) = min_active_start {
542 if let Some(commit_epoch) = committed.get(*transaction_id) {
543 commit_epoch.as_u64() < min_start.as_u64()
545 } else {
546 false
548 }
549 } else {
550 true
552 }
553 }
554 }
555 })
556 .map(|(id, _)| *id)
557 .collect();
558
559 for id in &to_remove {
560 txns.remove(id);
561 committed.remove(id);
562 }
563
564 initial_count - txns.len()
565 }
566
567 pub fn mark_committed(&self, transaction_id: TransactionId, epoch: EpochId) {
571 self.committed_epochs.write().insert(transaction_id, epoch);
572 }
573
574 #[must_use]
578 pub fn last_assigned_transaction_id(&self) -> Option<TransactionId> {
579 let next = self.next_transaction_id.load(Ordering::Relaxed);
580 if next > 1 {
581 Some(TransactionId::new(next - 1))
582 } else {
583 None
584 }
585 }
586}
587
588impl Default for TransactionManager {
589 fn default() -> Self {
590 Self::new()
591 }
592}
593
594#[cfg(test)]
595mod tests {
596 use super::*;
597
598 #[test]
599 fn test_begin_commit() {
600 let mgr = TransactionManager::new();
601
602 let tx = mgr.begin();
603 assert_eq!(mgr.state(tx), Some(TransactionState::Active));
604
605 let commit_epoch = mgr.commit(tx).unwrap();
606 assert_eq!(mgr.state(tx), Some(TransactionState::Committed));
607 assert!(commit_epoch.as_u64() > 0);
608 }
609
610 #[test]
611 fn test_begin_abort() {
612 let mgr = TransactionManager::new();
613
614 let tx = mgr.begin();
615 mgr.abort(tx).unwrap();
616 assert_eq!(mgr.state(tx), Some(TransactionState::Aborted));
617 }
618
619 #[test]
620 fn test_epoch_advancement() {
621 let mgr = TransactionManager::new();
622
623 let initial_epoch = mgr.current_epoch();
624
625 let tx = mgr.begin();
626 let commit_epoch = mgr.commit(tx).unwrap();
627
628 assert!(mgr.current_epoch().as_u64() > initial_epoch.as_u64());
629 assert!(commit_epoch.as_u64() > initial_epoch.as_u64());
630 }
631
632 #[test]
633 fn test_gc_preserves_needed_write_sets() {
634 let mgr = TransactionManager::new();
635
636 let tx1 = mgr.begin();
637 let tx2 = mgr.begin();
638
639 mgr.commit(tx1).unwrap();
640 assert_eq!(mgr.active_count(), 1);
643
644 let cleaned = mgr.gc();
646 assert_eq!(cleaned, 0);
647
648 assert_eq!(mgr.state(tx1), Some(TransactionState::Committed));
650 assert_eq!(mgr.state(tx2), Some(TransactionState::Active));
651 }
652
653 #[test]
654 fn test_gc_removes_old_commits() {
655 let mgr = TransactionManager::new();
656
657 let tx1 = mgr.begin();
659 mgr.commit(tx1).unwrap();
660
661 let tx2 = mgr.begin();
663 mgr.commit(tx2).unwrap();
664
665 let tx3 = mgr.begin();
667
668 let cleaned = mgr.gc();
672 assert_eq!(cleaned, 1); assert_eq!(mgr.state(tx1), None);
675 assert_eq!(mgr.state(tx2), Some(TransactionState::Committed)); assert_eq!(mgr.state(tx3), Some(TransactionState::Active));
677
678 mgr.commit(tx3).unwrap();
680 let cleaned = mgr.gc();
681 assert_eq!(cleaned, 2); }
683
684 #[test]
685 fn test_gc_removes_aborted() {
686 let mgr = TransactionManager::new();
687
688 let tx1 = mgr.begin();
689 let tx2 = mgr.begin();
690
691 mgr.abort(tx1).unwrap();
692 let cleaned = mgr.gc();
696 assert_eq!(cleaned, 1);
697
698 assert_eq!(mgr.state(tx1), None);
699 assert_eq!(mgr.state(tx2), Some(TransactionState::Active));
700 }
701
702 #[test]
703 fn test_write_tracking() {
704 let mgr = TransactionManager::new();
705
706 let tx = mgr.begin();
707
708 mgr.record_write(tx, NodeId::new(1)).unwrap();
710 mgr.record_write(tx, NodeId::new(2)).unwrap();
711 mgr.record_write(tx, EdgeId::new(100)).unwrap();
712
713 assert!(mgr.commit(tx).is_ok());
715 }
716
717 #[test]
718 fn test_min_active_epoch() {
719 let mgr = TransactionManager::new();
720
721 assert_eq!(mgr.min_active_epoch(), mgr.current_epoch());
723
724 let tx1 = mgr.begin();
726 let epoch1 = mgr.start_epoch(tx1).unwrap();
727
728 let tx2 = mgr.begin();
730 mgr.commit(tx2).unwrap();
731
732 let _tx3 = mgr.begin();
733
734 assert_eq!(mgr.min_active_epoch(), epoch1);
736 }
737
738 #[test]
739 fn test_abort_all_active() {
740 let mgr = TransactionManager::new();
741
742 let tx1 = mgr.begin();
743 let tx2 = mgr.begin();
744 let tx3 = mgr.begin();
745
746 mgr.commit(tx1).unwrap();
747 mgr.abort_all_active();
750
751 assert_eq!(mgr.state(tx1), Some(TransactionState::Committed)); assert_eq!(mgr.state(tx2), Some(TransactionState::Aborted));
753 assert_eq!(mgr.state(tx3), Some(TransactionState::Aborted));
754 }
755
756 #[test]
757 fn test_start_epoch_snapshot() {
758 let mgr = TransactionManager::new();
759
760 let tx1 = mgr.begin();
762 let start1 = mgr.start_epoch(tx1).unwrap();
763
764 mgr.commit(tx1).unwrap();
766
767 let tx2 = mgr.begin();
769 let start2 = mgr.start_epoch(tx2).unwrap();
770
771 assert!(start2.as_u64() > start1.as_u64());
773 }
774
775 #[test]
776 fn test_write_write_conflict_detection() {
777 let mgr = TransactionManager::new();
778
779 let tx1 = mgr.begin();
781 let tx2 = mgr.begin();
782
783 let entity = NodeId::new(42);
785 mgr.record_write(tx1, entity).unwrap();
786
787 let result = mgr.record_write(tx2, entity);
789 assert!(result.is_err());
790 assert!(
791 result
792 .unwrap_err()
793 .to_string()
794 .contains("Write-write conflict"),
795 "Expected write-write conflict error"
796 );
797
798 let result1 = mgr.commit(tx1);
800 assert!(result1.is_ok());
801 }
802
803 #[test]
804 fn test_commit_epoch_monotonicity() {
805 let mgr = TransactionManager::new();
806
807 let mut epochs = Vec::new();
808
809 for _ in 0..10 {
811 let tx = mgr.begin();
812 let epoch = mgr.commit(tx).unwrap();
813 epochs.push(epoch.as_u64());
814 }
815
816 for i in 1..epochs.len() {
818 assert!(
819 epochs[i] > epochs[i - 1],
820 "Epoch {} ({}) should be greater than epoch {} ({})",
821 i,
822 epochs[i],
823 i - 1,
824 epochs[i - 1]
825 );
826 }
827 }
828
829 #[test]
830 fn test_concurrent_commits_via_threads() {
831 use std::sync::Arc;
832 use std::thread;
833
834 let mgr = Arc::new(TransactionManager::new());
835 let num_threads = 10;
836 let commits_per_thread = 100;
837
838 let handles: Vec<_> = (0..num_threads)
839 .map(|_| {
840 let mgr = Arc::clone(&mgr);
841 thread::spawn(move || {
842 let mut epochs = Vec::new();
843 for _ in 0..commits_per_thread {
844 let tx = mgr.begin();
845 let epoch = mgr.commit(tx).unwrap();
846 epochs.push(epoch.as_u64());
847 }
848 epochs
849 })
850 })
851 .collect();
852
853 let mut all_epochs: Vec<u64> = handles
854 .into_iter()
855 .flat_map(|h| h.join().unwrap())
856 .collect();
857
858 all_epochs.sort_unstable();
860 let unique_count = all_epochs.len();
861 all_epochs.dedup();
862 assert_eq!(
863 all_epochs.len(),
864 unique_count,
865 "All commit epochs should be unique"
866 );
867
868 assert_eq!(
870 mgr.current_epoch().as_u64(),
871 (num_threads * commits_per_thread) as u64,
872 "Final epoch should equal total commits"
873 );
874 }
875
876 #[test]
877 fn test_isolation_level_default() {
878 let mgr = TransactionManager::new();
879
880 let tx = mgr.begin();
881 assert_eq!(
882 mgr.isolation_level(tx),
883 Some(IsolationLevel::SnapshotIsolation)
884 );
885 }
886
887 #[test]
888 fn test_isolation_level_explicit() {
889 let mgr = TransactionManager::new();
890
891 let transaction_rc = mgr.begin_with_isolation(IsolationLevel::ReadCommitted);
892 let transaction_si = mgr.begin_with_isolation(IsolationLevel::SnapshotIsolation);
893 let transaction_ser = mgr.begin_with_isolation(IsolationLevel::Serializable);
894
895 assert_eq!(
896 mgr.isolation_level(transaction_rc),
897 Some(IsolationLevel::ReadCommitted)
898 );
899 assert_eq!(
900 mgr.isolation_level(transaction_si),
901 Some(IsolationLevel::SnapshotIsolation)
902 );
903 assert_eq!(
904 mgr.isolation_level(transaction_ser),
905 Some(IsolationLevel::Serializable)
906 );
907 }
908
909 #[test]
910 fn test_ssi_read_write_conflict_detected() {
911 let mgr = TransactionManager::new();
912
913 let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
915
916 let tx2 = mgr.begin();
918
919 let entity = NodeId::new(42);
921 mgr.record_read(tx1, entity).unwrap();
922
923 mgr.record_write(tx2, entity).unwrap();
925 mgr.commit(tx2).unwrap();
926
927 let result = mgr.commit(tx1);
929 assert!(result.is_err());
930 assert!(
931 result
932 .unwrap_err()
933 .to_string()
934 .contains("Serialization failure"),
935 "Expected serialization failure error"
936 );
937 }
938
939 #[test]
940 fn test_ssi_no_conflict_when_not_serializable() {
941 let mgr = TransactionManager::new();
942
943 let tx1 = mgr.begin();
945
946 let tx2 = mgr.begin();
948
949 let entity = NodeId::new(42);
951 mgr.record_read(tx1, entity).unwrap();
952
953 mgr.record_write(tx2, entity).unwrap();
955 mgr.commit(tx2).unwrap();
956
957 let result = mgr.commit(tx1);
959 assert!(
960 result.is_ok(),
961 "Snapshot Isolation should not detect read-write conflicts"
962 );
963 }
964
965 #[test]
966 fn test_ssi_no_conflict_when_write_before_read() {
967 let mgr = TransactionManager::new();
968
969 let tx1 = mgr.begin();
971 let entity = NodeId::new(42);
972 mgr.record_write(tx1, entity).unwrap();
973 mgr.commit(tx1).unwrap();
974
975 let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
977 mgr.record_read(tx2, entity).unwrap();
978
979 let result = mgr.commit(tx2);
981 assert!(
982 result.is_ok(),
983 "Should not conflict when writer committed before reader started"
984 );
985 }
986
987 #[test]
988 fn test_write_skew_prevented_by_ssi() {
989 let mgr = TransactionManager::new();
996
997 let account_a = NodeId::new(1);
998 let account_b = NodeId::new(2);
999
1000 let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1002 let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1003
1004 mgr.record_read(tx1, account_a).unwrap();
1006 mgr.record_read(tx1, account_b).unwrap();
1007 mgr.record_read(tx2, account_a).unwrap();
1008 mgr.record_read(tx2, account_b).unwrap();
1009
1010 mgr.record_write(tx1, account_a).unwrap();
1012 mgr.record_write(tx2, account_b).unwrap();
1013
1014 let result1 = mgr.commit(tx1);
1016 assert!(result1.is_ok(), "First commit should succeed");
1017
1018 let result2 = mgr.commit(tx2);
1020 assert!(result2.is_err(), "Second commit should fail due to SSI");
1021 assert!(
1022 result2
1023 .unwrap_err()
1024 .to_string()
1025 .contains("Serialization failure"),
1026 "Expected serialization failure error for write skew prevention"
1027 );
1028 }
1029
1030 #[test]
1031 fn test_read_committed_allows_non_repeatable_reads() {
1032 let mgr = TransactionManager::new();
1033
1034 let tx1 = mgr.begin_with_isolation(IsolationLevel::ReadCommitted);
1036 let entity = NodeId::new(42);
1037
1038 mgr.record_read(tx1, entity).unwrap();
1040
1041 let tx2 = mgr.begin();
1043 mgr.record_write(tx2, entity).unwrap();
1044 mgr.commit(tx2).unwrap();
1045
1046 let result = mgr.commit(tx1);
1048 assert!(
1049 result.is_ok(),
1050 "ReadCommitted should allow non-repeatable reads"
1051 );
1052 }
1053
1054 #[test]
1055 fn test_isolation_level_debug() {
1056 assert_eq!(
1057 format!("{:?}", IsolationLevel::ReadCommitted),
1058 "ReadCommitted"
1059 );
1060 assert_eq!(
1061 format!("{:?}", IsolationLevel::SnapshotIsolation),
1062 "SnapshotIsolation"
1063 );
1064 assert_eq!(
1065 format!("{:?}", IsolationLevel::Serializable),
1066 "Serializable"
1067 );
1068 }
1069
1070 #[test]
1071 fn test_isolation_level_default_trait() {
1072 let default: IsolationLevel = Default::default();
1073 assert_eq!(default, IsolationLevel::SnapshotIsolation);
1074 }
1075
1076 #[test]
1077 fn test_ssi_concurrent_reads_no_conflict() {
1078 let mgr = TransactionManager::new();
1079
1080 let entity = NodeId::new(42);
1081
1082 let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1084 let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1085
1086 mgr.record_read(tx1, entity).unwrap();
1087 mgr.record_read(tx2, entity).unwrap();
1088
1089 assert!(mgr.commit(tx1).is_ok());
1091 assert!(mgr.commit(tx2).is_ok());
1092 }
1093
1094 #[test]
1095 fn test_ssi_write_write_conflict() {
1096 let mgr = TransactionManager::new();
1097
1098 let entity = NodeId::new(42);
1099
1100 let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1102 let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1103
1104 mgr.record_write(tx1, entity).unwrap();
1106
1107 let result = mgr.record_write(tx2, entity);
1109 assert!(
1110 result.is_err(),
1111 "Second record_write should fail with write-write conflict"
1112 );
1113
1114 assert!(mgr.commit(tx1).is_ok());
1116 }
1117}