1use std::collections::HashSet;
4use std::sync::atomic::{AtomicU64, Ordering};
5
6use grafeo_common::types::{EdgeId, EpochId, NodeId, TxId};
7use grafeo_common::utils::error::{Error, Result, TransactionError};
8use grafeo_common::utils::hash::FxHashMap;
9use parking_lot::RwLock;
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum TxState {
14 Active,
16 Committed,
18 Aborted,
20}
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
41pub enum IsolationLevel {
42 ReadCommitted,
47
48 #[default]
54 SnapshotIsolation,
55
56 Serializable,
62}
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
66pub enum EntityId {
67 Node(NodeId),
69 Edge(EdgeId),
71}
72
73impl From<NodeId> for EntityId {
74 fn from(id: NodeId) -> Self {
75 Self::Node(id)
76 }
77}
78
79impl From<EdgeId> for EntityId {
80 fn from(id: EdgeId) -> Self {
81 Self::Edge(id)
82 }
83}
84
85pub struct TxInfo {
87 pub state: TxState,
89 pub isolation_level: IsolationLevel,
91 pub start_epoch: EpochId,
93 pub write_set: HashSet<EntityId>,
95 pub read_set: HashSet<EntityId>,
97}
98
99impl TxInfo {
100 fn new(start_epoch: EpochId, isolation_level: IsolationLevel) -> Self {
102 Self {
103 state: TxState::Active,
104 isolation_level,
105 start_epoch,
106 write_set: HashSet::new(),
107 read_set: HashSet::new(),
108 }
109 }
110}
111
112pub struct TransactionManager {
114 next_tx_id: AtomicU64,
116 current_epoch: AtomicU64,
118 transactions: RwLock<FxHashMap<TxId, TxInfo>>,
120 committed_epochs: RwLock<FxHashMap<TxId, EpochId>>,
123}
124
125impl TransactionManager {
126 #[must_use]
128 pub fn new() -> Self {
129 Self {
130 next_tx_id: AtomicU64::new(2),
133 current_epoch: AtomicU64::new(0),
134 transactions: RwLock::new(FxHashMap::default()),
135 committed_epochs: RwLock::new(FxHashMap::default()),
136 }
137 }
138
139 pub fn begin(&self) -> TxId {
141 self.begin_with_isolation(IsolationLevel::default())
142 }
143
144 pub fn begin_with_isolation(&self, isolation_level: IsolationLevel) -> TxId {
146 let tx_id = TxId::new(self.next_tx_id.fetch_add(1, Ordering::Relaxed));
147 let epoch = EpochId::new(self.current_epoch.load(Ordering::Acquire));
148
149 let info = TxInfo::new(epoch, isolation_level);
150 self.transactions.write().insert(tx_id, info);
151 tx_id
152 }
153
154 pub fn isolation_level(&self, tx_id: TxId) -> Option<IsolationLevel> {
156 self.transactions
157 .read()
158 .get(&tx_id)
159 .map(|info| info.isolation_level)
160 }
161
162 pub fn record_write(&self, tx_id: TxId, entity: impl Into<EntityId>) -> Result<()> {
168 let mut txns = self.transactions.write();
169 let info = txns.get_mut(&tx_id).ok_or_else(|| {
170 Error::Transaction(TransactionError::InvalidState(
171 "Transaction not found".to_string(),
172 ))
173 })?;
174
175 if info.state != TxState::Active {
176 return Err(Error::Transaction(TransactionError::InvalidState(
177 "Transaction is not active".to_string(),
178 )));
179 }
180
181 info.write_set.insert(entity.into());
182 Ok(())
183 }
184
185 pub fn record_read(&self, tx_id: TxId, entity: impl Into<EntityId>) -> Result<()> {
191 let mut txns = self.transactions.write();
192 let info = txns.get_mut(&tx_id).ok_or_else(|| {
193 Error::Transaction(TransactionError::InvalidState(
194 "Transaction not found".to_string(),
195 ))
196 })?;
197
198 if info.state != TxState::Active {
199 return Err(Error::Transaction(TransactionError::InvalidState(
200 "Transaction is not active".to_string(),
201 )));
202 }
203
204 info.read_set.insert(entity.into());
205 Ok(())
206 }
207
208 pub fn commit(&self, tx_id: TxId) -> Result<EpochId> {
226 let mut txns = self.transactions.write();
227 let committed = self.committed_epochs.read();
228
229 let (our_isolation, our_start_epoch, our_write_set, our_read_set) = {
231 let info = txns.get(&tx_id).ok_or_else(|| {
232 Error::Transaction(TransactionError::InvalidState(
233 "Transaction not found".to_string(),
234 ))
235 })?;
236
237 if info.state != TxState::Active {
238 return Err(Error::Transaction(TransactionError::InvalidState(
239 "Transaction is not active".to_string(),
240 )));
241 }
242
243 (
244 info.isolation_level,
245 info.start_epoch,
246 info.write_set.clone(),
247 info.read_set.clone(),
248 )
249 };
250
251 for (other_tx, other_info) in txns.iter() {
253 if *other_tx == tx_id {
254 continue;
255 }
256 if other_info.state == TxState::Committed {
257 for entity in &our_write_set {
259 if other_info.write_set.contains(entity) {
260 return Err(Error::Transaction(TransactionError::WriteConflict(
261 format!("Write-write conflict on entity {:?}", entity),
262 )));
263 }
264 }
265 }
266 }
267
268 for (other_tx, commit_epoch) in committed.iter() {
270 if *other_tx != tx_id && commit_epoch.as_u64() > our_start_epoch.as_u64() {
271 if let Some(other_info) = txns.get(other_tx) {
273 for entity in &our_write_set {
274 if other_info.write_set.contains(entity) {
275 return Err(Error::Transaction(TransactionError::WriteConflict(
276 format!("Write-write conflict on entity {:?}", entity),
277 )));
278 }
279 }
280 }
281 }
282 }
283
284 if our_isolation == IsolationLevel::Serializable && !our_read_set.is_empty() {
289 for (other_tx, commit_epoch) in committed.iter() {
290 if *other_tx != tx_id && commit_epoch.as_u64() > our_start_epoch.as_u64() {
291 if let Some(other_info) = txns.get(other_tx) {
293 for entity in &our_read_set {
294 if other_info.write_set.contains(entity) {
295 return Err(Error::Transaction(
296 TransactionError::SerializationFailure(format!(
297 "Read-write conflict on entity {:?}: \
298 another transaction modified data we read",
299 entity
300 )),
301 ));
302 }
303 }
304 }
305 }
306 }
307
308 for (other_tx, other_info) in txns.iter() {
311 if *other_tx == tx_id {
312 continue;
313 }
314 if other_info.state == TxState::Committed {
315 for entity in &our_read_set {
317 if other_info.write_set.contains(entity) {
318 if let Some(commit_epoch) = committed.get(other_tx)
320 && commit_epoch.as_u64() > our_start_epoch.as_u64()
321 {
322 return Err(Error::Transaction(
323 TransactionError::SerializationFailure(format!(
324 "Read-write conflict on entity {:?}: \
325 another transaction modified data we read",
326 entity
327 )),
328 ));
329 }
330 }
331 }
332 }
333 }
334 }
335
336 let commit_epoch = EpochId::new(self.current_epoch.fetch_add(1, Ordering::SeqCst) + 1);
339
340 if let Some(info) = txns.get_mut(&tx_id) {
342 info.state = TxState::Committed;
343 }
344
345 drop(committed);
347 self.committed_epochs.write().insert(tx_id, commit_epoch);
348
349 Ok(commit_epoch)
350 }
351
352 pub fn abort(&self, tx_id: TxId) -> Result<()> {
358 let mut txns = self.transactions.write();
359
360 let info = txns.get_mut(&tx_id).ok_or_else(|| {
361 Error::Transaction(TransactionError::InvalidState(
362 "Transaction not found".to_string(),
363 ))
364 })?;
365
366 if info.state != TxState::Active {
367 return Err(Error::Transaction(TransactionError::InvalidState(
368 "Transaction is not active".to_string(),
369 )));
370 }
371
372 info.state = TxState::Aborted;
373 Ok(())
374 }
375
376 pub fn get_write_set(&self, tx_id: TxId) -> Result<HashSet<EntityId>> {
381 let txns = self.transactions.read();
382 let info = txns.get(&tx_id).ok_or_else(|| {
383 Error::Transaction(TransactionError::InvalidState(
384 "Transaction not found".to_string(),
385 ))
386 })?;
387 Ok(info.write_set.clone())
388 }
389
390 pub fn abort_all_active(&self) {
394 let mut txns = self.transactions.write();
395 for info in txns.values_mut() {
396 if info.state == TxState::Active {
397 info.state = TxState::Aborted;
398 }
399 }
400 }
401
402 pub fn state(&self, tx_id: TxId) -> Option<TxState> {
404 self.transactions.read().get(&tx_id).map(|info| info.state)
405 }
406
407 pub fn start_epoch(&self, tx_id: TxId) -> Option<EpochId> {
409 self.transactions
410 .read()
411 .get(&tx_id)
412 .map(|info| info.start_epoch)
413 }
414
415 #[must_use]
417 pub fn current_epoch(&self) -> EpochId {
418 EpochId::new(self.current_epoch.load(Ordering::Acquire))
419 }
420
421 #[must_use]
426 pub fn min_active_epoch(&self) -> EpochId {
427 let txns = self.transactions.read();
428 txns.values()
429 .filter(|info| info.state == TxState::Active)
430 .map(|info| info.start_epoch)
431 .min()
432 .unwrap_or_else(|| self.current_epoch())
433 }
434
435 #[must_use]
437 pub fn active_count(&self) -> usize {
438 self.transactions
439 .read()
440 .values()
441 .filter(|info| info.state == TxState::Active)
442 .count()
443 }
444
445 pub fn gc(&self) -> usize {
453 let mut txns = self.transactions.write();
454 let mut committed = self.committed_epochs.write();
455
456 let min_active_start = txns
458 .values()
459 .filter(|info| info.state == TxState::Active)
460 .map(|info| info.start_epoch)
461 .min();
462
463 let initial_count = txns.len();
464
465 let to_remove: Vec<TxId> = txns
467 .iter()
468 .filter(|(tx_id, info)| {
469 match info.state {
470 TxState::Active => false, TxState::Aborted => true, TxState::Committed => {
473 if let Some(min_start) = min_active_start {
476 if let Some(commit_epoch) = committed.get(*tx_id) {
477 commit_epoch.as_u64() < min_start.as_u64()
479 } else {
480 false
482 }
483 } else {
484 true
486 }
487 }
488 }
489 })
490 .map(|(id, _)| *id)
491 .collect();
492
493 for id in &to_remove {
494 txns.remove(id);
495 committed.remove(id);
496 }
497
498 initial_count - txns.len()
499 }
500
501 pub fn mark_committed(&self, tx_id: TxId, epoch: EpochId) {
505 self.committed_epochs.write().insert(tx_id, epoch);
506 }
507
508 #[must_use]
512 pub fn last_assigned_tx_id(&self) -> Option<TxId> {
513 let next = self.next_tx_id.load(Ordering::Relaxed);
514 if next > 1 {
515 Some(TxId::new(next - 1))
516 } else {
517 None
518 }
519 }
520}
521
522impl Default for TransactionManager {
523 fn default() -> Self {
524 Self::new()
525 }
526}
527
528#[cfg(test)]
529mod tests {
530 use super::*;
531
532 #[test]
533 fn test_begin_commit() {
534 let mgr = TransactionManager::new();
535
536 let tx = mgr.begin();
537 assert_eq!(mgr.state(tx), Some(TxState::Active));
538
539 let commit_epoch = mgr.commit(tx).unwrap();
540 assert_eq!(mgr.state(tx), Some(TxState::Committed));
541 assert!(commit_epoch.as_u64() > 0);
542 }
543
544 #[test]
545 fn test_begin_abort() {
546 let mgr = TransactionManager::new();
547
548 let tx = mgr.begin();
549 mgr.abort(tx).unwrap();
550 assert_eq!(mgr.state(tx), Some(TxState::Aborted));
551 }
552
553 #[test]
554 fn test_epoch_advancement() {
555 let mgr = TransactionManager::new();
556
557 let initial_epoch = mgr.current_epoch();
558
559 let tx = mgr.begin();
560 let commit_epoch = mgr.commit(tx).unwrap();
561
562 assert!(mgr.current_epoch().as_u64() > initial_epoch.as_u64());
563 assert!(commit_epoch.as_u64() > initial_epoch.as_u64());
564 }
565
566 #[test]
567 fn test_gc_preserves_needed_write_sets() {
568 let mgr = TransactionManager::new();
569
570 let tx1 = mgr.begin();
571 let tx2 = mgr.begin();
572
573 mgr.commit(tx1).unwrap();
574 assert_eq!(mgr.active_count(), 1);
577
578 let cleaned = mgr.gc();
580 assert_eq!(cleaned, 0);
581
582 assert_eq!(mgr.state(tx1), Some(TxState::Committed));
584 assert_eq!(mgr.state(tx2), Some(TxState::Active));
585 }
586
587 #[test]
588 fn test_gc_removes_old_commits() {
589 let mgr = TransactionManager::new();
590
591 let tx1 = mgr.begin();
593 mgr.commit(tx1).unwrap();
594
595 let tx2 = mgr.begin();
597 mgr.commit(tx2).unwrap();
598
599 let tx3 = mgr.begin();
601
602 let cleaned = mgr.gc();
606 assert_eq!(cleaned, 1); assert_eq!(mgr.state(tx1), None);
609 assert_eq!(mgr.state(tx2), Some(TxState::Committed)); assert_eq!(mgr.state(tx3), Some(TxState::Active));
611
612 mgr.commit(tx3).unwrap();
614 let cleaned = mgr.gc();
615 assert_eq!(cleaned, 2); }
617
618 #[test]
619 fn test_gc_removes_aborted() {
620 let mgr = TransactionManager::new();
621
622 let tx1 = mgr.begin();
623 let tx2 = mgr.begin();
624
625 mgr.abort(tx1).unwrap();
626 let cleaned = mgr.gc();
630 assert_eq!(cleaned, 1);
631
632 assert_eq!(mgr.state(tx1), None);
633 assert_eq!(mgr.state(tx2), Some(TxState::Active));
634 }
635
636 #[test]
637 fn test_write_tracking() {
638 let mgr = TransactionManager::new();
639
640 let tx = mgr.begin();
641
642 mgr.record_write(tx, NodeId::new(1)).unwrap();
644 mgr.record_write(tx, NodeId::new(2)).unwrap();
645 mgr.record_write(tx, EdgeId::new(100)).unwrap();
646
647 assert!(mgr.commit(tx).is_ok());
649 }
650
651 #[test]
652 fn test_min_active_epoch() {
653 let mgr = TransactionManager::new();
654
655 assert_eq!(mgr.min_active_epoch(), mgr.current_epoch());
657
658 let tx1 = mgr.begin();
660 let epoch1 = mgr.start_epoch(tx1).unwrap();
661
662 let tx2 = mgr.begin();
664 mgr.commit(tx2).unwrap();
665
666 let _tx3 = mgr.begin();
667
668 assert_eq!(mgr.min_active_epoch(), epoch1);
670 }
671
672 #[test]
673 fn test_abort_all_active() {
674 let mgr = TransactionManager::new();
675
676 let tx1 = mgr.begin();
677 let tx2 = mgr.begin();
678 let tx3 = mgr.begin();
679
680 mgr.commit(tx1).unwrap();
681 mgr.abort_all_active();
684
685 assert_eq!(mgr.state(tx1), Some(TxState::Committed)); assert_eq!(mgr.state(tx2), Some(TxState::Aborted));
687 assert_eq!(mgr.state(tx3), Some(TxState::Aborted));
688 }
689
690 #[test]
691 fn test_start_epoch_snapshot() {
692 let mgr = TransactionManager::new();
693
694 let tx1 = mgr.begin();
696 let start1 = mgr.start_epoch(tx1).unwrap();
697
698 mgr.commit(tx1).unwrap();
700
701 let tx2 = mgr.begin();
703 let start2 = mgr.start_epoch(tx2).unwrap();
704
705 assert!(start2.as_u64() > start1.as_u64());
707 }
708
709 #[test]
710 fn test_write_write_conflict_detection() {
711 let mgr = TransactionManager::new();
712
713 let tx1 = mgr.begin();
715 let tx2 = mgr.begin();
716
717 let entity = NodeId::new(42);
719 mgr.record_write(tx1, entity).unwrap();
720 mgr.record_write(tx2, entity).unwrap();
721
722 let result1 = mgr.commit(tx1);
724 assert!(result1.is_ok());
725
726 let result2 = mgr.commit(tx2);
728 assert!(result2.is_err());
729 assert!(
730 result2
731 .unwrap_err()
732 .to_string()
733 .contains("Write-write conflict"),
734 "Expected write-write conflict error"
735 );
736 }
737
738 #[test]
739 fn test_commit_epoch_monotonicity() {
740 let mgr = TransactionManager::new();
741
742 let mut epochs = Vec::new();
743
744 for _ in 0..10 {
746 let tx = mgr.begin();
747 let epoch = mgr.commit(tx).unwrap();
748 epochs.push(epoch.as_u64());
749 }
750
751 for i in 1..epochs.len() {
753 assert!(
754 epochs[i] > epochs[i - 1],
755 "Epoch {} ({}) should be greater than epoch {} ({})",
756 i,
757 epochs[i],
758 i - 1,
759 epochs[i - 1]
760 );
761 }
762 }
763
764 #[test]
765 fn test_concurrent_commits_via_threads() {
766 use std::sync::Arc;
767 use std::thread;
768
769 let mgr = Arc::new(TransactionManager::new());
770 let num_threads = 10;
771 let commits_per_thread = 100;
772
773 let handles: Vec<_> = (0..num_threads)
774 .map(|_| {
775 let mgr = Arc::clone(&mgr);
776 thread::spawn(move || {
777 let mut epochs = Vec::new();
778 for _ in 0..commits_per_thread {
779 let tx = mgr.begin();
780 let epoch = mgr.commit(tx).unwrap();
781 epochs.push(epoch.as_u64());
782 }
783 epochs
784 })
785 })
786 .collect();
787
788 let mut all_epochs: Vec<u64> = handles
789 .into_iter()
790 .flat_map(|h| h.join().unwrap())
791 .collect();
792
793 all_epochs.sort();
795 let unique_count = all_epochs.len();
796 all_epochs.dedup();
797 assert_eq!(
798 all_epochs.len(),
799 unique_count,
800 "All commit epochs should be unique"
801 );
802
803 assert_eq!(
805 mgr.current_epoch().as_u64(),
806 (num_threads * commits_per_thread) as u64,
807 "Final epoch should equal total commits"
808 );
809 }
810
811 #[test]
812 fn test_isolation_level_default() {
813 let mgr = TransactionManager::new();
814
815 let tx = mgr.begin();
816 assert_eq!(
817 mgr.isolation_level(tx),
818 Some(IsolationLevel::SnapshotIsolation)
819 );
820 }
821
822 #[test]
823 fn test_isolation_level_explicit() {
824 let mgr = TransactionManager::new();
825
826 let tx_rc = mgr.begin_with_isolation(IsolationLevel::ReadCommitted);
827 let tx_si = mgr.begin_with_isolation(IsolationLevel::SnapshotIsolation);
828 let tx_ser = mgr.begin_with_isolation(IsolationLevel::Serializable);
829
830 assert_eq!(
831 mgr.isolation_level(tx_rc),
832 Some(IsolationLevel::ReadCommitted)
833 );
834 assert_eq!(
835 mgr.isolation_level(tx_si),
836 Some(IsolationLevel::SnapshotIsolation)
837 );
838 assert_eq!(
839 mgr.isolation_level(tx_ser),
840 Some(IsolationLevel::Serializable)
841 );
842 }
843
844 #[test]
845 fn test_ssi_read_write_conflict_detected() {
846 let mgr = TransactionManager::new();
847
848 let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
850
851 let tx2 = mgr.begin();
853
854 let entity = NodeId::new(42);
856 mgr.record_read(tx1, entity).unwrap();
857
858 mgr.record_write(tx2, entity).unwrap();
860 mgr.commit(tx2).unwrap();
861
862 let result = mgr.commit(tx1);
864 assert!(result.is_err());
865 assert!(
866 result
867 .unwrap_err()
868 .to_string()
869 .contains("Serialization failure"),
870 "Expected serialization failure error"
871 );
872 }
873
874 #[test]
875 fn test_ssi_no_conflict_when_not_serializable() {
876 let mgr = TransactionManager::new();
877
878 let tx1 = mgr.begin();
880
881 let tx2 = mgr.begin();
883
884 let entity = NodeId::new(42);
886 mgr.record_read(tx1, entity).unwrap();
887
888 mgr.record_write(tx2, entity).unwrap();
890 mgr.commit(tx2).unwrap();
891
892 let result = mgr.commit(tx1);
894 assert!(
895 result.is_ok(),
896 "Snapshot Isolation should not detect read-write conflicts"
897 );
898 }
899
900 #[test]
901 fn test_ssi_no_conflict_when_write_before_read() {
902 let mgr = TransactionManager::new();
903
904 let tx1 = mgr.begin();
906 let entity = NodeId::new(42);
907 mgr.record_write(tx1, entity).unwrap();
908 mgr.commit(tx1).unwrap();
909
910 let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
912 mgr.record_read(tx2, entity).unwrap();
913
914 let result = mgr.commit(tx2);
916 assert!(
917 result.is_ok(),
918 "Should not conflict when writer committed before reader started"
919 );
920 }
921
922 #[test]
923 fn test_write_skew_prevented_by_ssi() {
924 let mgr = TransactionManager::new();
931
932 let account_a = NodeId::new(1);
933 let account_b = NodeId::new(2);
934
935 let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
937 let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
938
939 mgr.record_read(tx1, account_a).unwrap();
941 mgr.record_read(tx1, account_b).unwrap();
942 mgr.record_read(tx2, account_a).unwrap();
943 mgr.record_read(tx2, account_b).unwrap();
944
945 mgr.record_write(tx1, account_a).unwrap();
947 mgr.record_write(tx2, account_b).unwrap();
948
949 let result1 = mgr.commit(tx1);
951 assert!(result1.is_ok(), "First commit should succeed");
952
953 let result2 = mgr.commit(tx2);
955 assert!(result2.is_err(), "Second commit should fail due to SSI");
956 assert!(
957 result2
958 .unwrap_err()
959 .to_string()
960 .contains("Serialization failure"),
961 "Expected serialization failure error for write skew prevention"
962 );
963 }
964
965 #[test]
966 fn test_read_committed_allows_non_repeatable_reads() {
967 let mgr = TransactionManager::new();
968
969 let tx1 = mgr.begin_with_isolation(IsolationLevel::ReadCommitted);
971 let entity = NodeId::new(42);
972
973 mgr.record_read(tx1, entity).unwrap();
975
976 let tx2 = mgr.begin();
978 mgr.record_write(tx2, entity).unwrap();
979 mgr.commit(tx2).unwrap();
980
981 let result = mgr.commit(tx1);
983 assert!(
984 result.is_ok(),
985 "ReadCommitted should allow non-repeatable reads"
986 );
987 }
988
989 #[test]
990 fn test_isolation_level_debug() {
991 assert_eq!(
992 format!("{:?}", IsolationLevel::ReadCommitted),
993 "ReadCommitted"
994 );
995 assert_eq!(
996 format!("{:?}", IsolationLevel::SnapshotIsolation),
997 "SnapshotIsolation"
998 );
999 assert_eq!(
1000 format!("{:?}", IsolationLevel::Serializable),
1001 "Serializable"
1002 );
1003 }
1004
1005 #[test]
1006 fn test_isolation_level_default_trait() {
1007 let default: IsolationLevel = Default::default();
1008 assert_eq!(default, IsolationLevel::SnapshotIsolation);
1009 }
1010
1011 #[test]
1012 fn test_ssi_concurrent_reads_no_conflict() {
1013 let mgr = TransactionManager::new();
1014
1015 let entity = NodeId::new(42);
1016
1017 let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1019 let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1020
1021 mgr.record_read(tx1, entity).unwrap();
1022 mgr.record_read(tx2, entity).unwrap();
1023
1024 assert!(mgr.commit(tx1).is_ok());
1026 assert!(mgr.commit(tx2).is_ok());
1027 }
1028
1029 #[test]
1030 fn test_ssi_write_write_conflict() {
1031 let mgr = TransactionManager::new();
1032
1033 let entity = NodeId::new(42);
1034
1035 let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1037 let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1038
1039 mgr.record_write(tx1, entity).unwrap();
1040 mgr.record_write(tx2, entity).unwrap();
1041
1042 assert!(mgr.commit(tx1).is_ok());
1044
1045 let result = mgr.commit(tx2);
1047 assert!(result.is_err());
1048 }
1049}