Skip to main content

grafeo_engine/transaction/
manager.rs

1//! Transaction manager.
2
3use std::collections::HashSet;
4use std::sync::atomic::{AtomicU64, Ordering};
5
6use grafeo_common::types::{EdgeId, EpochId, NodeId, TransactionId};
7use grafeo_common::utils::error::{Error, Result, TransactionError};
8use grafeo_common::utils::hash::FxHashMap;
9use parking_lot::RwLock;
10
11/// State of a transaction.
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum TransactionState {
14    /// Transaction is active.
15    Active,
16    /// Transaction is committed.
17    Committed,
18    /// Transaction is aborted.
19    Aborted,
20}
21
22/// Transaction isolation level.
23///
24/// Controls the consistency guarantees and performance tradeoffs for transactions.
25///
26/// # Comparison
27///
28/// | Level | Dirty Reads | Non-Repeatable Reads | Phantom Reads | Write Skew |
29/// |-------|-------------|----------------------|---------------|------------|
30/// | ReadCommitted | No | Yes | Yes | Yes |
31/// | SnapshotIsolation | No | No | No | Yes |
32/// | Serializable | No | No | No | No |
33///
34/// # Performance
35///
36/// Higher isolation levels require more bookkeeping:
37/// - `ReadCommitted`: Only tracks writes
38/// - `SnapshotIsolation`: Tracks writes + snapshot versioning
39/// - `Serializable`: Tracks writes + reads + SSI validation
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
41pub enum IsolationLevel {
42    /// Read Committed: sees only committed data, but may see different
43    /// versions of the same row within a transaction.
44    ///
45    /// Lowest overhead, highest throughput, but weaker consistency.
46    ReadCommitted,
47
48    /// Snapshot Isolation (default): each transaction sees a consistent
49    /// snapshot as of transaction start. Prevents non-repeatable reads
50    /// and phantom reads.
51    ///
52    /// Vulnerable to write skew anomaly.
53    #[default]
54    SnapshotIsolation,
55
56    /// Serializable Snapshot Isolation (SSI): provides full serializability
57    /// by detecting read-write conflicts in addition to write-write conflicts.
58    ///
59    /// Prevents all anomalies including write skew, but may abort more
60    /// transactions due to stricter conflict detection.
61    Serializable,
62}
63
64/// Entity identifier for write tracking.
65#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
66pub enum EntityId {
67    /// A node.
68    Node(NodeId),
69    /// An edge.
70    Edge(EdgeId),
71}
72
73impl From<NodeId> for EntityId {
74    fn from(id: NodeId) -> Self {
75        Self::Node(id)
76    }
77}
78
79impl From<EdgeId> for EntityId {
80    fn from(id: EdgeId) -> Self {
81        Self::Edge(id)
82    }
83}
84
85/// Information about an active transaction.
86pub struct TransactionInfo {
87    /// Transaction state.
88    pub state: TransactionState,
89    /// Isolation level for this transaction.
90    pub isolation_level: IsolationLevel,
91    /// Start epoch (snapshot epoch for reads).
92    pub start_epoch: EpochId,
93    /// Set of entities written by this transaction.
94    pub write_set: HashSet<EntityId>,
95    /// Set of entities read by this transaction (for serializable isolation).
96    pub read_set: HashSet<EntityId>,
97}
98
99impl TransactionInfo {
100    /// Creates a new transaction info with the given isolation level.
101    fn new(start_epoch: EpochId, isolation_level: IsolationLevel) -> Self {
102        Self {
103            state: TransactionState::Active,
104            isolation_level,
105            start_epoch,
106            write_set: HashSet::new(),
107            read_set: HashSet::new(),
108        }
109    }
110}
111
112/// Manages transactions and MVCC versioning.
113pub struct TransactionManager {
114    /// Next transaction ID.
115    next_transaction_id: AtomicU64,
116    /// Current epoch.
117    current_epoch: AtomicU64,
118    /// Active transactions.
119    transactions: RwLock<FxHashMap<TransactionId, TransactionInfo>>,
120    /// Committed transaction epochs (for conflict detection).
121    /// Maps TransactionId -> commit epoch.
122    committed_epochs: RwLock<FxHashMap<TransactionId, EpochId>>,
123}
124
125impl TransactionManager {
126    /// Creates a new transaction manager.
127    #[must_use]
128    pub fn new() -> Self {
129        Self {
130            // Start at 2 to avoid collision with TransactionId::SYSTEM (which is 1)
131            // TransactionId::INVALID = u64::MAX, TransactionId::SYSTEM = 1, user transactions start at 2
132            next_transaction_id: AtomicU64::new(2),
133            current_epoch: AtomicU64::new(0),
134            transactions: RwLock::new(FxHashMap::default()),
135            committed_epochs: RwLock::new(FxHashMap::default()),
136        }
137    }
138
139    /// Begins a new transaction with the default isolation level (Snapshot Isolation).
140    pub fn begin(&self) -> TransactionId {
141        self.begin_with_isolation(IsolationLevel::default())
142    }
143
144    /// Begins a new transaction with the specified isolation level.
145    pub fn begin_with_isolation(&self, isolation_level: IsolationLevel) -> TransactionId {
146        let transaction_id =
147            TransactionId::new(self.next_transaction_id.fetch_add(1, Ordering::Relaxed));
148        let epoch = EpochId::new(self.current_epoch.load(Ordering::Acquire));
149
150        let info = TransactionInfo::new(epoch, isolation_level);
151        self.transactions.write().insert(transaction_id, info);
152        transaction_id
153    }
154
155    /// Returns the isolation level of a transaction.
156    pub fn isolation_level(&self, transaction_id: TransactionId) -> Option<IsolationLevel> {
157        self.transactions
158            .read()
159            .get(&transaction_id)
160            .map(|info| info.isolation_level)
161    }
162
163    /// Records a write operation for the transaction.
164    ///
165    /// # Errors
166    ///
167    /// Returns an error if the transaction is not active.
168    pub fn record_write(
169        &self,
170        transaction_id: TransactionId,
171        entity: impl Into<EntityId>,
172    ) -> Result<()> {
173        let mut txns = self.transactions.write();
174        let info = txns.get_mut(&transaction_id).ok_or_else(|| {
175            Error::Transaction(TransactionError::InvalidState(
176                "Transaction not found".to_string(),
177            ))
178        })?;
179
180        if info.state != TransactionState::Active {
181            return Err(Error::Transaction(TransactionError::InvalidState(
182                "Transaction is not active".to_string(),
183            )));
184        }
185
186        info.write_set.insert(entity.into());
187        Ok(())
188    }
189
190    /// Records a read operation for the transaction (for serializable isolation).
191    ///
192    /// # Errors
193    ///
194    /// Returns an error if the transaction is not active.
195    pub fn record_read(
196        &self,
197        transaction_id: TransactionId,
198        entity: impl Into<EntityId>,
199    ) -> Result<()> {
200        let mut txns = self.transactions.write();
201        let info = txns.get_mut(&transaction_id).ok_or_else(|| {
202            Error::Transaction(TransactionError::InvalidState(
203                "Transaction not found".to_string(),
204            ))
205        })?;
206
207        if info.state != TransactionState::Active {
208            return Err(Error::Transaction(TransactionError::InvalidState(
209                "Transaction is not active".to_string(),
210            )));
211        }
212
213        info.read_set.insert(entity.into());
214        Ok(())
215    }
216
217    /// Commits a transaction with conflict detection.
218    ///
219    /// # Conflict Detection
220    ///
221    /// - **All isolation levels**: Write-write conflicts (two transactions writing
222    ///   to the same entity) are always detected and cause the second committer to abort.
223    ///
224    /// - **Serializable only**: Read-write conflicts (SSI validation) are additionally
225    ///   checked. If transaction T1 read an entity that another transaction T2 wrote,
226    ///   and T2 committed after T1 started, T1 will abort. This prevents write skew.
227    ///
228    /// # Errors
229    ///
230    /// Returns an error if:
231    /// - The transaction is not active
232    /// - There's a write-write conflict with another committed transaction
233    /// - (Serializable only) There's a read-write conflict (SSI violation)
234    pub fn commit(&self, transaction_id: TransactionId) -> Result<EpochId> {
235        let mut txns = self.transactions.write();
236        let committed = self.committed_epochs.read();
237
238        // First, validate the transaction exists and is active
239        let (our_isolation, our_start_epoch, our_write_set, our_read_set) = {
240            let info = txns.get(&transaction_id).ok_or_else(|| {
241                Error::Transaction(TransactionError::InvalidState(
242                    "Transaction not found".to_string(),
243                ))
244            })?;
245
246            if info.state != TransactionState::Active {
247                return Err(Error::Transaction(TransactionError::InvalidState(
248                    "Transaction is not active".to_string(),
249                )));
250            }
251
252            (
253                info.isolation_level,
254                info.start_epoch,
255                info.write_set.clone(),
256                info.read_set.clone(),
257            )
258        };
259
260        // Check for write-write conflicts with transactions that committed
261        // after our snapshot (i.e., concurrent writers to the same entities).
262        // Transactions committed before our start_epoch are part of our visible
263        // snapshot, so overwriting their values is not a conflict.
264        for (other_tx, commit_epoch) in committed.iter() {
265            if *other_tx != transaction_id && commit_epoch.as_u64() > our_start_epoch.as_u64() {
266                // Check if that transaction wrote to any of our entities
267                if let Some(other_info) = txns.get(other_tx) {
268                    for entity in &our_write_set {
269                        if other_info.write_set.contains(entity) {
270                            return Err(Error::Transaction(TransactionError::WriteConflict(
271                                format!("Write-write conflict on entity {:?}", entity),
272                            )));
273                        }
274                    }
275                }
276            }
277        }
278
279        // SSI validation for Serializable isolation level
280        // Check for read-write conflicts: if we read an entity that another
281        // transaction (that committed after we started) wrote, we have a
282        // "rw-antidependency" which can cause write skew.
283        if our_isolation == IsolationLevel::Serializable && !our_read_set.is_empty() {
284            for (other_tx, commit_epoch) in committed.iter() {
285                if *other_tx != transaction_id && commit_epoch.as_u64() > our_start_epoch.as_u64() {
286                    // Check if that transaction wrote to any entity we read
287                    if let Some(other_info) = txns.get(other_tx) {
288                        for entity in &our_read_set {
289                            if other_info.write_set.contains(entity) {
290                                return Err(Error::Transaction(
291                                    TransactionError::SerializationFailure(format!(
292                                        "Read-write conflict on entity {:?}: \
293                                         another transaction modified data we read",
294                                        entity
295                                    )),
296                                ));
297                            }
298                        }
299                    }
300                }
301            }
302
303            // Also check against transactions that are already marked committed
304            // but not yet in committed_epochs map
305            for (other_tx, other_info) in txns.iter() {
306                if *other_tx == transaction_id {
307                    continue;
308                }
309                if other_info.state == TransactionState::Committed {
310                    // If we can see their write set and we read something they wrote
311                    for entity in &our_read_set {
312                        if other_info.write_set.contains(entity) {
313                            // Check if they committed after we started
314                            if let Some(commit_epoch) = committed.get(other_tx)
315                                && commit_epoch.as_u64() > our_start_epoch.as_u64()
316                            {
317                                return Err(Error::Transaction(
318                                    TransactionError::SerializationFailure(format!(
319                                        "Read-write conflict on entity {:?}: \
320                                             another transaction modified data we read",
321                                        entity
322                                    )),
323                                ));
324                            }
325                        }
326                    }
327                }
328            }
329        }
330
331        // Commit successful - advance epoch atomically
332        // SeqCst ensures all threads see commits in a consistent total order
333        let commit_epoch = EpochId::new(self.current_epoch.fetch_add(1, Ordering::SeqCst) + 1);
334
335        // Now update state
336        if let Some(info) = txns.get_mut(&transaction_id) {
337            info.state = TransactionState::Committed;
338        }
339
340        // Record commit epoch (need to drop read lock first)
341        drop(committed);
342        self.committed_epochs
343            .write()
344            .insert(transaction_id, commit_epoch);
345
346        Ok(commit_epoch)
347    }
348
349    /// Aborts a transaction.
350    ///
351    /// # Errors
352    ///
353    /// Returns an error if the transaction is not active.
354    pub fn abort(&self, transaction_id: TransactionId) -> Result<()> {
355        let mut txns = self.transactions.write();
356
357        let info = txns.get_mut(&transaction_id).ok_or_else(|| {
358            Error::Transaction(TransactionError::InvalidState(
359                "Transaction not found".to_string(),
360            ))
361        })?;
362
363        if info.state != TransactionState::Active {
364            return Err(Error::Transaction(TransactionError::InvalidState(
365                "Transaction is not active".to_string(),
366            )));
367        }
368
369        info.state = TransactionState::Aborted;
370        Ok(())
371    }
372
373    /// Returns the write set of a transaction.
374    ///
375    /// This returns a copy of the entities written by this transaction,
376    /// used for rollback to discard uncommitted versions.
377    pub fn get_write_set(&self, transaction_id: TransactionId) -> Result<HashSet<EntityId>> {
378        let txns = self.transactions.read();
379        let info = txns.get(&transaction_id).ok_or_else(|| {
380            Error::Transaction(TransactionError::InvalidState(
381                "Transaction not found".to_string(),
382            ))
383        })?;
384        Ok(info.write_set.clone())
385    }
386
387    /// Replaces the write set of a transaction (used for savepoint rollback).
388    ///
389    /// # Errors
390    ///
391    /// Returns an error if the transaction is not found.
392    pub fn reset_write_set(
393        &self,
394        transaction_id: TransactionId,
395        write_set: HashSet<EntityId>,
396    ) -> Result<()> {
397        let mut txns = self.transactions.write();
398        let info = txns.get_mut(&transaction_id).ok_or_else(|| {
399            Error::Transaction(TransactionError::InvalidState(
400                "Transaction not found".to_string(),
401            ))
402        })?;
403        info.write_set = write_set;
404        Ok(())
405    }
406
407    /// Aborts all active transactions.
408    ///
409    /// Used during database shutdown.
410    pub fn abort_all_active(&self) {
411        let mut txns = self.transactions.write();
412        for info in txns.values_mut() {
413            if info.state == TransactionState::Active {
414                info.state = TransactionState::Aborted;
415            }
416        }
417    }
418
419    /// Returns the state of a transaction.
420    pub fn state(&self, transaction_id: TransactionId) -> Option<TransactionState> {
421        self.transactions
422            .read()
423            .get(&transaction_id)
424            .map(|info| info.state)
425    }
426
427    /// Returns the start epoch of a transaction.
428    pub fn start_epoch(&self, transaction_id: TransactionId) -> Option<EpochId> {
429        self.transactions
430            .read()
431            .get(&transaction_id)
432            .map(|info| info.start_epoch)
433    }
434
435    /// Returns the current epoch.
436    #[must_use]
437    pub fn current_epoch(&self) -> EpochId {
438        EpochId::new(self.current_epoch.load(Ordering::Acquire))
439    }
440
441    /// Returns the minimum epoch that must be preserved for active transactions.
442    ///
443    /// This is used for garbage collection - versions visible at this epoch
444    /// must be preserved.
445    #[must_use]
446    pub fn min_active_epoch(&self) -> EpochId {
447        let txns = self.transactions.read();
448        txns.values()
449            .filter(|info| info.state == TransactionState::Active)
450            .map(|info| info.start_epoch)
451            .min()
452            .unwrap_or_else(|| self.current_epoch())
453    }
454
455    /// Returns the number of active transactions.
456    #[must_use]
457    pub fn active_count(&self) -> usize {
458        self.transactions
459            .read()
460            .values()
461            .filter(|info| info.state == TransactionState::Active)
462            .count()
463    }
464
465    /// Cleans up completed transactions that are no longer needed for conflict detection.
466    ///
467    /// A committed transaction's write set must be preserved until all transactions
468    /// that started before its commit have completed. This ensures write-write
469    /// conflict detection works correctly.
470    ///
471    /// Returns the number of transactions cleaned up.
472    pub fn gc(&self) -> usize {
473        let mut txns = self.transactions.write();
474        let mut committed = self.committed_epochs.write();
475
476        // Find the minimum start epoch among active transactions
477        let min_active_start = txns
478            .values()
479            .filter(|info| info.state == TransactionState::Active)
480            .map(|info| info.start_epoch)
481            .min();
482
483        let initial_count = txns.len();
484
485        // Collect transactions safe to remove
486        let to_remove: Vec<TransactionId> = txns
487            .iter()
488            .filter(|(transaction_id, info)| {
489                match info.state {
490                    TransactionState::Active => false, // Never remove active transactions
491                    TransactionState::Aborted => true, // Always safe to remove aborted transactions
492                    TransactionState::Committed => {
493                        // Only remove committed transactions if their commit epoch
494                        // is older than all active transactions' start epochs
495                        if let Some(min_start) = min_active_start {
496                            if let Some(commit_epoch) = committed.get(*transaction_id) {
497                                // Safe to remove if committed before all active txns started
498                                commit_epoch.as_u64() < min_start.as_u64()
499                            } else {
500                                // No commit epoch recorded, keep it to be safe
501                                false
502                            }
503                        } else {
504                            // No active transactions, safe to remove all committed
505                            true
506                        }
507                    }
508                }
509            })
510            .map(|(id, _)| *id)
511            .collect();
512
513        for id in &to_remove {
514            txns.remove(id);
515            committed.remove(id);
516        }
517
518        initial_count - txns.len()
519    }
520
521    /// Marks a transaction as committed at a specific epoch.
522    ///
523    /// Used during recovery to restore transaction state.
524    pub fn mark_committed(&self, transaction_id: TransactionId, epoch: EpochId) {
525        self.committed_epochs.write().insert(transaction_id, epoch);
526    }
527
528    /// Returns the last assigned transaction ID.
529    ///
530    /// Returns `None` if no transactions have been started yet.
531    #[must_use]
532    pub fn last_assigned_transaction_id(&self) -> Option<TransactionId> {
533        let next = self.next_transaction_id.load(Ordering::Relaxed);
534        if next > 1 {
535            Some(TransactionId::new(next - 1))
536        } else {
537            None
538        }
539    }
540}
541
542impl Default for TransactionManager {
543    fn default() -> Self {
544        Self::new()
545    }
546}
547
548#[cfg(test)]
549mod tests {
550    use super::*;
551
552    #[test]
553    fn test_begin_commit() {
554        let mgr = TransactionManager::new();
555
556        let tx = mgr.begin();
557        assert_eq!(mgr.state(tx), Some(TransactionState::Active));
558
559        let commit_epoch = mgr.commit(tx).unwrap();
560        assert_eq!(mgr.state(tx), Some(TransactionState::Committed));
561        assert!(commit_epoch.as_u64() > 0);
562    }
563
564    #[test]
565    fn test_begin_abort() {
566        let mgr = TransactionManager::new();
567
568        let tx = mgr.begin();
569        mgr.abort(tx).unwrap();
570        assert_eq!(mgr.state(tx), Some(TransactionState::Aborted));
571    }
572
573    #[test]
574    fn test_epoch_advancement() {
575        let mgr = TransactionManager::new();
576
577        let initial_epoch = mgr.current_epoch();
578
579        let tx = mgr.begin();
580        let commit_epoch = mgr.commit(tx).unwrap();
581
582        assert!(mgr.current_epoch().as_u64() > initial_epoch.as_u64());
583        assert!(commit_epoch.as_u64() > initial_epoch.as_u64());
584    }
585
586    #[test]
587    fn test_gc_preserves_needed_write_sets() {
588        let mgr = TransactionManager::new();
589
590        let tx1 = mgr.begin();
591        let tx2 = mgr.begin();
592
593        mgr.commit(tx1).unwrap();
594        // tx2 still active - started before tx1 committed
595
596        assert_eq!(mgr.active_count(), 1);
597
598        // GC should NOT remove tx1 because tx2 might need its write set for conflict detection
599        let cleaned = mgr.gc();
600        assert_eq!(cleaned, 0);
601
602        // Both transactions should remain
603        assert_eq!(mgr.state(tx1), Some(TransactionState::Committed));
604        assert_eq!(mgr.state(tx2), Some(TransactionState::Active));
605    }
606
607    #[test]
608    fn test_gc_removes_old_commits() {
609        let mgr = TransactionManager::new();
610
611        // tx1 commits at epoch 1
612        let tx1 = mgr.begin();
613        mgr.commit(tx1).unwrap();
614
615        // tx2 starts at epoch 1, commits at epoch 2
616        let tx2 = mgr.begin();
617        mgr.commit(tx2).unwrap();
618
619        // tx3 starts at epoch 2
620        let tx3 = mgr.begin();
621
622        // At this point:
623        // - tx1 committed at epoch 1, tx3 started at epoch 2 → tx1 commit < tx3 start → safe to GC
624        // - tx2 committed at epoch 2, tx3 started at epoch 2 → tx2 commit >= tx3 start → NOT safe
625        let cleaned = mgr.gc();
626        assert_eq!(cleaned, 1); // Only tx1 removed
627
628        assert_eq!(mgr.state(tx1), None);
629        assert_eq!(mgr.state(tx2), Some(TransactionState::Committed)); // Preserved for conflict detection
630        assert_eq!(mgr.state(tx3), Some(TransactionState::Active));
631
632        // After tx3 commits, tx2 can be GC'd
633        mgr.commit(tx3).unwrap();
634        let cleaned = mgr.gc();
635        assert_eq!(cleaned, 2); // tx2 and tx3 both cleaned (no active transactions)
636    }
637
638    #[test]
639    fn test_gc_removes_aborted() {
640        let mgr = TransactionManager::new();
641
642        let tx1 = mgr.begin();
643        let tx2 = mgr.begin();
644
645        mgr.abort(tx1).unwrap();
646        // tx2 still active
647
648        // Aborted transactions are always safe to remove
649        let cleaned = mgr.gc();
650        assert_eq!(cleaned, 1);
651
652        assert_eq!(mgr.state(tx1), None);
653        assert_eq!(mgr.state(tx2), Some(TransactionState::Active));
654    }
655
656    #[test]
657    fn test_write_tracking() {
658        let mgr = TransactionManager::new();
659
660        let tx = mgr.begin();
661
662        // Record writes
663        mgr.record_write(tx, NodeId::new(1)).unwrap();
664        mgr.record_write(tx, NodeId::new(2)).unwrap();
665        mgr.record_write(tx, EdgeId::new(100)).unwrap();
666
667        // Should commit successfully (no conflicts)
668        assert!(mgr.commit(tx).is_ok());
669    }
670
671    #[test]
672    fn test_min_active_epoch() {
673        let mgr = TransactionManager::new();
674
675        // No active transactions - should return current epoch
676        assert_eq!(mgr.min_active_epoch(), mgr.current_epoch());
677
678        // Start some transactions
679        let tx1 = mgr.begin();
680        let epoch1 = mgr.start_epoch(tx1).unwrap();
681
682        // Advance epoch
683        let tx2 = mgr.begin();
684        mgr.commit(tx2).unwrap();
685
686        let _tx3 = mgr.begin();
687
688        // min_active_epoch should be tx1's start epoch (earliest active)
689        assert_eq!(mgr.min_active_epoch(), epoch1);
690    }
691
692    #[test]
693    fn test_abort_all_active() {
694        let mgr = TransactionManager::new();
695
696        let tx1 = mgr.begin();
697        let tx2 = mgr.begin();
698        let tx3 = mgr.begin();
699
700        mgr.commit(tx1).unwrap();
701        // tx2 and tx3 still active
702
703        mgr.abort_all_active();
704
705        assert_eq!(mgr.state(tx1), Some(TransactionState::Committed)); // Already committed
706        assert_eq!(mgr.state(tx2), Some(TransactionState::Aborted));
707        assert_eq!(mgr.state(tx3), Some(TransactionState::Aborted));
708    }
709
710    #[test]
711    fn test_start_epoch_snapshot() {
712        let mgr = TransactionManager::new();
713
714        // Start epoch for tx1
715        let tx1 = mgr.begin();
716        let start1 = mgr.start_epoch(tx1).unwrap();
717
718        // Commit tx1, advancing epoch
719        mgr.commit(tx1).unwrap();
720
721        // Start tx2 after epoch advanced
722        let tx2 = mgr.begin();
723        let start2 = mgr.start_epoch(tx2).unwrap();
724
725        // tx2 should have a later start epoch
726        assert!(start2.as_u64() > start1.as_u64());
727    }
728
729    #[test]
730    fn test_write_write_conflict_detection() {
731        let mgr = TransactionManager::new();
732
733        // Both transactions start at the same epoch
734        let tx1 = mgr.begin();
735        let tx2 = mgr.begin();
736
737        // Both try to write to the same entity
738        let entity = NodeId::new(42);
739        mgr.record_write(tx1, entity).unwrap();
740        mgr.record_write(tx2, entity).unwrap();
741
742        // First commit succeeds
743        let result1 = mgr.commit(tx1);
744        assert!(result1.is_ok());
745
746        // Second commit should fail due to write-write conflict
747        let result2 = mgr.commit(tx2);
748        assert!(result2.is_err());
749        assert!(
750            result2
751                .unwrap_err()
752                .to_string()
753                .contains("Write-write conflict"),
754            "Expected write-write conflict error"
755        );
756    }
757
758    #[test]
759    fn test_commit_epoch_monotonicity() {
760        let mgr = TransactionManager::new();
761
762        let mut epochs = Vec::new();
763
764        // Commit multiple transactions and verify epochs are strictly increasing
765        for _ in 0..10 {
766            let tx = mgr.begin();
767            let epoch = mgr.commit(tx).unwrap();
768            epochs.push(epoch.as_u64());
769        }
770
771        // Verify strict monotonicity
772        for i in 1..epochs.len() {
773            assert!(
774                epochs[i] > epochs[i - 1],
775                "Epoch {} ({}) should be greater than epoch {} ({})",
776                i,
777                epochs[i],
778                i - 1,
779                epochs[i - 1]
780            );
781        }
782    }
783
784    #[test]
785    fn test_concurrent_commits_via_threads() {
786        use std::sync::Arc;
787        use std::thread;
788
789        let mgr = Arc::new(TransactionManager::new());
790        let num_threads = 10;
791        let commits_per_thread = 100;
792
793        let handles: Vec<_> = (0..num_threads)
794            .map(|_| {
795                let mgr = Arc::clone(&mgr);
796                thread::spawn(move || {
797                    let mut epochs = Vec::new();
798                    for _ in 0..commits_per_thread {
799                        let tx = mgr.begin();
800                        let epoch = mgr.commit(tx).unwrap();
801                        epochs.push(epoch.as_u64());
802                    }
803                    epochs
804                })
805            })
806            .collect();
807
808        let mut all_epochs: Vec<u64> = handles
809            .into_iter()
810            .flat_map(|h| h.join().unwrap())
811            .collect();
812
813        // All epochs should be unique (no duplicates)
814        all_epochs.sort_unstable();
815        let unique_count = all_epochs.len();
816        all_epochs.dedup();
817        assert_eq!(
818            all_epochs.len(),
819            unique_count,
820            "All commit epochs should be unique"
821        );
822
823        // Final epoch should equal number of commits
824        assert_eq!(
825            mgr.current_epoch().as_u64(),
826            (num_threads * commits_per_thread) as u64,
827            "Final epoch should equal total commits"
828        );
829    }
830
831    #[test]
832    fn test_isolation_level_default() {
833        let mgr = TransactionManager::new();
834
835        let tx = mgr.begin();
836        assert_eq!(
837            mgr.isolation_level(tx),
838            Some(IsolationLevel::SnapshotIsolation)
839        );
840    }
841
842    #[test]
843    fn test_isolation_level_explicit() {
844        let mgr = TransactionManager::new();
845
846        let transaction_rc = mgr.begin_with_isolation(IsolationLevel::ReadCommitted);
847        let transaction_si = mgr.begin_with_isolation(IsolationLevel::SnapshotIsolation);
848        let transaction_ser = mgr.begin_with_isolation(IsolationLevel::Serializable);
849
850        assert_eq!(
851            mgr.isolation_level(transaction_rc),
852            Some(IsolationLevel::ReadCommitted)
853        );
854        assert_eq!(
855            mgr.isolation_level(transaction_si),
856            Some(IsolationLevel::SnapshotIsolation)
857        );
858        assert_eq!(
859            mgr.isolation_level(transaction_ser),
860            Some(IsolationLevel::Serializable)
861        );
862    }
863
864    #[test]
865    fn test_ssi_read_write_conflict_detected() {
866        let mgr = TransactionManager::new();
867
868        // tx1 starts with Serializable isolation
869        let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
870
871        // tx2 starts and will modify an entity
872        let tx2 = mgr.begin();
873
874        // tx1 reads entity 42
875        let entity = NodeId::new(42);
876        mgr.record_read(tx1, entity).unwrap();
877
878        // tx2 writes to the same entity and commits
879        mgr.record_write(tx2, entity).unwrap();
880        mgr.commit(tx2).unwrap();
881
882        // tx1 tries to commit - should fail due to SSI read-write conflict
883        let result = mgr.commit(tx1);
884        assert!(result.is_err());
885        assert!(
886            result
887                .unwrap_err()
888                .to_string()
889                .contains("Serialization failure"),
890            "Expected serialization failure error"
891        );
892    }
893
894    #[test]
895    fn test_ssi_no_conflict_when_not_serializable() {
896        let mgr = TransactionManager::new();
897
898        // tx1 starts with default Snapshot Isolation
899        let tx1 = mgr.begin();
900
901        // tx2 starts and will modify an entity
902        let tx2 = mgr.begin();
903
904        // tx1 reads entity 42
905        let entity = NodeId::new(42);
906        mgr.record_read(tx1, entity).unwrap();
907
908        // tx2 writes to the same entity and commits
909        mgr.record_write(tx2, entity).unwrap();
910        mgr.commit(tx2).unwrap();
911
912        // tx1 should commit successfully (SI doesn't check read-write conflicts)
913        let result = mgr.commit(tx1);
914        assert!(
915            result.is_ok(),
916            "Snapshot Isolation should not detect read-write conflicts"
917        );
918    }
919
920    #[test]
921    fn test_ssi_no_conflict_when_write_before_read() {
922        let mgr = TransactionManager::new();
923
924        // tx1 writes and commits first
925        let tx1 = mgr.begin();
926        let entity = NodeId::new(42);
927        mgr.record_write(tx1, entity).unwrap();
928        mgr.commit(tx1).unwrap();
929
930        // tx2 starts AFTER tx1 committed and reads the entity
931        let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
932        mgr.record_read(tx2, entity).unwrap();
933
934        // tx2 should commit successfully (tx1 committed before tx2 started)
935        let result = mgr.commit(tx2);
936        assert!(
937            result.is_ok(),
938            "Should not conflict when writer committed before reader started"
939        );
940    }
941
942    #[test]
943    fn test_write_skew_prevented_by_ssi() {
944        // Classic write skew scenario:
945        // Account A = 50, Account B = 50, constraint: A + B >= 0
946        // T1 reads A, B, writes A = A - 100
947        // T2 reads A, B, writes B = B - 100
948        // Without SSI, both could commit violating the constraint.
949
950        let mgr = TransactionManager::new();
951
952        let account_a = NodeId::new(1);
953        let account_b = NodeId::new(2);
954
955        // T1 and T2 both start with Serializable isolation
956        let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
957        let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
958
959        // Both read both accounts
960        mgr.record_read(tx1, account_a).unwrap();
961        mgr.record_read(tx1, account_b).unwrap();
962        mgr.record_read(tx2, account_a).unwrap();
963        mgr.record_read(tx2, account_b).unwrap();
964
965        // T1 writes to A, T2 writes to B (no write-write conflict)
966        mgr.record_write(tx1, account_a).unwrap();
967        mgr.record_write(tx2, account_b).unwrap();
968
969        // T1 commits first
970        let result1 = mgr.commit(tx1);
971        assert!(result1.is_ok(), "First commit should succeed");
972
973        // T2 tries to commit - should fail because it read account_a which T1 wrote
974        let result2 = mgr.commit(tx2);
975        assert!(result2.is_err(), "Second commit should fail due to SSI");
976        assert!(
977            result2
978                .unwrap_err()
979                .to_string()
980                .contains("Serialization failure"),
981            "Expected serialization failure error for write skew prevention"
982        );
983    }
984
985    #[test]
986    fn test_read_committed_allows_non_repeatable_reads() {
987        let mgr = TransactionManager::new();
988
989        // tx1 starts with ReadCommitted isolation
990        let tx1 = mgr.begin_with_isolation(IsolationLevel::ReadCommitted);
991        let entity = NodeId::new(42);
992
993        // tx1 reads entity
994        mgr.record_read(tx1, entity).unwrap();
995
996        // tx2 writes and commits
997        let tx2 = mgr.begin();
998        mgr.record_write(tx2, entity).unwrap();
999        mgr.commit(tx2).unwrap();
1000
1001        // tx1 can still commit (ReadCommitted allows non-repeatable reads)
1002        let result = mgr.commit(tx1);
1003        assert!(
1004            result.is_ok(),
1005            "ReadCommitted should allow non-repeatable reads"
1006        );
1007    }
1008
1009    #[test]
1010    fn test_isolation_level_debug() {
1011        assert_eq!(
1012            format!("{:?}", IsolationLevel::ReadCommitted),
1013            "ReadCommitted"
1014        );
1015        assert_eq!(
1016            format!("{:?}", IsolationLevel::SnapshotIsolation),
1017            "SnapshotIsolation"
1018        );
1019        assert_eq!(
1020            format!("{:?}", IsolationLevel::Serializable),
1021            "Serializable"
1022        );
1023    }
1024
1025    #[test]
1026    fn test_isolation_level_default_trait() {
1027        let default: IsolationLevel = Default::default();
1028        assert_eq!(default, IsolationLevel::SnapshotIsolation);
1029    }
1030
1031    #[test]
1032    fn test_ssi_concurrent_reads_no_conflict() {
1033        let mgr = TransactionManager::new();
1034
1035        let entity = NodeId::new(42);
1036
1037        // Both transactions read the same entity
1038        let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1039        let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1040
1041        mgr.record_read(tx1, entity).unwrap();
1042        mgr.record_read(tx2, entity).unwrap();
1043
1044        // Both should commit successfully (read-read is not a conflict)
1045        assert!(mgr.commit(tx1).is_ok());
1046        assert!(mgr.commit(tx2).is_ok());
1047    }
1048
1049    #[test]
1050    fn test_ssi_write_write_conflict() {
1051        let mgr = TransactionManager::new();
1052
1053        let entity = NodeId::new(42);
1054
1055        // Both transactions write the same entity
1056        let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1057        let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1058
1059        mgr.record_write(tx1, entity).unwrap();
1060        mgr.record_write(tx2, entity).unwrap();
1061
1062        // First commit succeeds
1063        assert!(mgr.commit(tx1).is_ok());
1064
1065        // Second commit fails (write-write conflict)
1066        let result = mgr.commit(tx2);
1067        assert!(result.is_err());
1068    }
1069}