Skip to main content

grafeo_engine/transaction/
manager.rs

1//! Transaction manager.
2
3use std::collections::HashSet;
4use std::sync::atomic::{AtomicU64, Ordering};
5
6use grafeo_common::types::{EdgeId, EpochId, NodeId, TxId};
7use grafeo_common::utils::error::{Error, Result, TransactionError};
8use grafeo_common::utils::hash::FxHashMap;
9use parking_lot::RwLock;
10
11/// State of a transaction.
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum TxState {
14    /// Transaction is active.
15    Active,
16    /// Transaction is committed.
17    Committed,
18    /// Transaction is aborted.
19    Aborted,
20}
21
22/// Transaction isolation level.
23///
24/// Controls the consistency guarantees and performance tradeoffs for transactions.
25///
26/// # Comparison
27///
28/// | Level | Dirty Reads | Non-Repeatable Reads | Phantom Reads | Write Skew |
29/// |-------|-------------|----------------------|---------------|------------|
30/// | ReadCommitted | No | Yes | Yes | Yes |
31/// | SnapshotIsolation | No | No | No | Yes |
32/// | Serializable | No | No | No | No |
33///
34/// # Performance
35///
36/// Higher isolation levels require more bookkeeping:
37/// - `ReadCommitted`: Only tracks writes
38/// - `SnapshotIsolation`: Tracks writes + snapshot versioning
39/// - `Serializable`: Tracks writes + reads + SSI validation
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
41pub enum IsolationLevel {
42    /// Read Committed: sees only committed data, but may see different
43    /// versions of the same row within a transaction.
44    ///
45    /// Lowest overhead, highest throughput, but weaker consistency.
46    ReadCommitted,
47
48    /// Snapshot Isolation (default): each transaction sees a consistent
49    /// snapshot as of transaction start. Prevents non-repeatable reads
50    /// and phantom reads.
51    ///
52    /// Vulnerable to write skew anomaly.
53    #[default]
54    SnapshotIsolation,
55
56    /// Serializable Snapshot Isolation (SSI): provides full serializability
57    /// by detecting read-write conflicts in addition to write-write conflicts.
58    ///
59    /// Prevents all anomalies including write skew, but may abort more
60    /// transactions due to stricter conflict detection.
61    Serializable,
62}
63
64/// Entity identifier for write tracking.
65#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
66pub enum EntityId {
67    /// A node.
68    Node(NodeId),
69    /// An edge.
70    Edge(EdgeId),
71}
72
73impl From<NodeId> for EntityId {
74    fn from(id: NodeId) -> Self {
75        Self::Node(id)
76    }
77}
78
79impl From<EdgeId> for EntityId {
80    fn from(id: EdgeId) -> Self {
81        Self::Edge(id)
82    }
83}
84
85/// Information about an active transaction.
86pub struct TxInfo {
87    /// Transaction state.
88    pub state: TxState,
89    /// Isolation level for this transaction.
90    pub isolation_level: IsolationLevel,
91    /// Start epoch (snapshot epoch for reads).
92    pub start_epoch: EpochId,
93    /// Set of entities written by this transaction.
94    pub write_set: HashSet<EntityId>,
95    /// Set of entities read by this transaction (for serializable isolation).
96    pub read_set: HashSet<EntityId>,
97}
98
99impl TxInfo {
100    /// Creates a new transaction info with the given isolation level.
101    fn new(start_epoch: EpochId, isolation_level: IsolationLevel) -> Self {
102        Self {
103            state: TxState::Active,
104            isolation_level,
105            start_epoch,
106            write_set: HashSet::new(),
107            read_set: HashSet::new(),
108        }
109    }
110}
111
112/// Manages transactions and MVCC versioning.
113pub struct TransactionManager {
114    /// Next transaction ID.
115    next_tx_id: AtomicU64,
116    /// Current epoch.
117    current_epoch: AtomicU64,
118    /// Active transactions.
119    transactions: RwLock<FxHashMap<TxId, TxInfo>>,
120    /// Committed transaction epochs (for conflict detection).
121    /// Maps TxId -> commit epoch.
122    committed_epochs: RwLock<FxHashMap<TxId, EpochId>>,
123}
124
125impl TransactionManager {
126    /// Creates a new transaction manager.
127    #[must_use]
128    pub fn new() -> Self {
129        Self {
130            // Start at 2 to avoid collision with TxId::SYSTEM (which is 1)
131            // TxId::INVALID = 0, TxId::SYSTEM = 1, user transactions start at 2
132            next_tx_id: AtomicU64::new(2),
133            current_epoch: AtomicU64::new(0),
134            transactions: RwLock::new(FxHashMap::default()),
135            committed_epochs: RwLock::new(FxHashMap::default()),
136        }
137    }
138
139    /// Begins a new transaction with the default isolation level (Snapshot Isolation).
140    pub fn begin(&self) -> TxId {
141        self.begin_with_isolation(IsolationLevel::default())
142    }
143
144    /// Begins a new transaction with the specified isolation level.
145    pub fn begin_with_isolation(&self, isolation_level: IsolationLevel) -> TxId {
146        let tx_id = TxId::new(self.next_tx_id.fetch_add(1, Ordering::Relaxed));
147        let epoch = EpochId::new(self.current_epoch.load(Ordering::Acquire));
148
149        let info = TxInfo::new(epoch, isolation_level);
150        self.transactions.write().insert(tx_id, info);
151        tx_id
152    }
153
154    /// Returns the isolation level of a transaction.
155    pub fn isolation_level(&self, tx_id: TxId) -> Option<IsolationLevel> {
156        self.transactions
157            .read()
158            .get(&tx_id)
159            .map(|info| info.isolation_level)
160    }
161
162    /// Records a write operation for the transaction.
163    ///
164    /// # Errors
165    ///
166    /// Returns an error if the transaction is not active.
167    pub fn record_write(&self, tx_id: TxId, entity: impl Into<EntityId>) -> Result<()> {
168        let mut txns = self.transactions.write();
169        let info = txns.get_mut(&tx_id).ok_or_else(|| {
170            Error::Transaction(TransactionError::InvalidState(
171                "Transaction not found".to_string(),
172            ))
173        })?;
174
175        if info.state != TxState::Active {
176            return Err(Error::Transaction(TransactionError::InvalidState(
177                "Transaction is not active".to_string(),
178            )));
179        }
180
181        info.write_set.insert(entity.into());
182        Ok(())
183    }
184
185    /// Records a read operation for the transaction (for serializable isolation).
186    ///
187    /// # Errors
188    ///
189    /// Returns an error if the transaction is not active.
190    pub fn record_read(&self, tx_id: TxId, entity: impl Into<EntityId>) -> Result<()> {
191        let mut txns = self.transactions.write();
192        let info = txns.get_mut(&tx_id).ok_or_else(|| {
193            Error::Transaction(TransactionError::InvalidState(
194                "Transaction not found".to_string(),
195            ))
196        })?;
197
198        if info.state != TxState::Active {
199            return Err(Error::Transaction(TransactionError::InvalidState(
200                "Transaction is not active".to_string(),
201            )));
202        }
203
204        info.read_set.insert(entity.into());
205        Ok(())
206    }
207
208    /// Commits a transaction with conflict detection.
209    ///
210    /// # Conflict Detection
211    ///
212    /// - **All isolation levels**: Write-write conflicts (two transactions writing
213    ///   to the same entity) are always detected and cause the second committer to abort.
214    ///
215    /// - **Serializable only**: Read-write conflicts (SSI validation) are additionally
216    ///   checked. If transaction T1 read an entity that another transaction T2 wrote,
217    ///   and T2 committed after T1 started, T1 will abort. This prevents write skew.
218    ///
219    /// # Errors
220    ///
221    /// Returns an error if:
222    /// - The transaction is not active
223    /// - There's a write-write conflict with another committed transaction
224    /// - (Serializable only) There's a read-write conflict (SSI violation)
225    pub fn commit(&self, tx_id: TxId) -> Result<EpochId> {
226        let mut txns = self.transactions.write();
227        let committed = self.committed_epochs.read();
228
229        // First, validate the transaction exists and is active
230        let (our_isolation, our_start_epoch, our_write_set, our_read_set) = {
231            let info = txns.get(&tx_id).ok_or_else(|| {
232                Error::Transaction(TransactionError::InvalidState(
233                    "Transaction not found".to_string(),
234                ))
235            })?;
236
237            if info.state != TxState::Active {
238                return Err(Error::Transaction(TransactionError::InvalidState(
239                    "Transaction is not active".to_string(),
240                )));
241            }
242
243            (
244                info.isolation_level,
245                info.start_epoch,
246                info.write_set.clone(),
247                info.read_set.clone(),
248            )
249        };
250
251        // Check for write-write conflicts with other committed transactions
252        for (other_tx, other_info) in txns.iter() {
253            if *other_tx == tx_id {
254                continue;
255            }
256            if other_info.state == TxState::Committed {
257                // Check if any of our writes conflict with their writes
258                for entity in &our_write_set {
259                    if other_info.write_set.contains(entity) {
260                        return Err(Error::Transaction(TransactionError::WriteConflict(
261                            format!("Write-write conflict on entity {:?}", entity),
262                        )));
263                    }
264                }
265            }
266        }
267
268        // Also check against recently committed transactions
269        for (other_tx, commit_epoch) in committed.iter() {
270            if *other_tx != tx_id && commit_epoch.as_u64() > our_start_epoch.as_u64() {
271                // Check if that transaction wrote to any of our entities
272                if let Some(other_info) = txns.get(other_tx) {
273                    for entity in &our_write_set {
274                        if other_info.write_set.contains(entity) {
275                            return Err(Error::Transaction(TransactionError::WriteConflict(
276                                format!("Write-write conflict on entity {:?}", entity),
277                            )));
278                        }
279                    }
280                }
281            }
282        }
283
284        // SSI validation for Serializable isolation level
285        // Check for read-write conflicts: if we read an entity that another
286        // transaction (that committed after we started) wrote, we have a
287        // "rw-antidependency" which can cause write skew.
288        if our_isolation == IsolationLevel::Serializable && !our_read_set.is_empty() {
289            for (other_tx, commit_epoch) in committed.iter() {
290                if *other_tx != tx_id && commit_epoch.as_u64() > our_start_epoch.as_u64() {
291                    // Check if that transaction wrote to any entity we read
292                    if let Some(other_info) = txns.get(other_tx) {
293                        for entity in &our_read_set {
294                            if other_info.write_set.contains(entity) {
295                                return Err(Error::Transaction(
296                                    TransactionError::SerializationFailure(format!(
297                                        "Read-write conflict on entity {:?}: \
298                                         another transaction modified data we read",
299                                        entity
300                                    )),
301                                ));
302                            }
303                        }
304                    }
305                }
306            }
307
308            // Also check against transactions that are already marked committed
309            // but not yet in committed_epochs map
310            for (other_tx, other_info) in txns.iter() {
311                if *other_tx == tx_id {
312                    continue;
313                }
314                if other_info.state == TxState::Committed {
315                    // If we can see their write set and we read something they wrote
316                    for entity in &our_read_set {
317                        if other_info.write_set.contains(entity) {
318                            // Check if they committed after we started
319                            if let Some(commit_epoch) = committed.get(other_tx)
320                                && commit_epoch.as_u64() > our_start_epoch.as_u64()
321                            {
322                                return Err(Error::Transaction(
323                                    TransactionError::SerializationFailure(format!(
324                                        "Read-write conflict on entity {:?}: \
325                                             another transaction modified data we read",
326                                        entity
327                                    )),
328                                ));
329                            }
330                        }
331                    }
332                }
333            }
334        }
335
336        // Commit successful - advance epoch atomically
337        // SeqCst ensures all threads see commits in a consistent total order
338        let commit_epoch = EpochId::new(self.current_epoch.fetch_add(1, Ordering::SeqCst) + 1);
339
340        // Now update state
341        if let Some(info) = txns.get_mut(&tx_id) {
342            info.state = TxState::Committed;
343        }
344
345        // Record commit epoch (need to drop read lock first)
346        drop(committed);
347        self.committed_epochs.write().insert(tx_id, commit_epoch);
348
349        Ok(commit_epoch)
350    }
351
352    /// Aborts a transaction.
353    ///
354    /// # Errors
355    ///
356    /// Returns an error if the transaction is not active.
357    pub fn abort(&self, tx_id: TxId) -> Result<()> {
358        let mut txns = self.transactions.write();
359
360        let info = txns.get_mut(&tx_id).ok_or_else(|| {
361            Error::Transaction(TransactionError::InvalidState(
362                "Transaction not found".to_string(),
363            ))
364        })?;
365
366        if info.state != TxState::Active {
367            return Err(Error::Transaction(TransactionError::InvalidState(
368                "Transaction is not active".to_string(),
369            )));
370        }
371
372        info.state = TxState::Aborted;
373        Ok(())
374    }
375
376    /// Returns the write set of a transaction.
377    ///
378    /// This returns a copy of the entities written by this transaction,
379    /// used for rollback to discard uncommitted versions.
380    pub fn get_write_set(&self, tx_id: TxId) -> Result<HashSet<EntityId>> {
381        let txns = self.transactions.read();
382        let info = txns.get(&tx_id).ok_or_else(|| {
383            Error::Transaction(TransactionError::InvalidState(
384                "Transaction not found".to_string(),
385            ))
386        })?;
387        Ok(info.write_set.clone())
388    }
389
390    /// Aborts all active transactions.
391    ///
392    /// Used during database shutdown.
393    pub fn abort_all_active(&self) {
394        let mut txns = self.transactions.write();
395        for info in txns.values_mut() {
396            if info.state == TxState::Active {
397                info.state = TxState::Aborted;
398            }
399        }
400    }
401
402    /// Returns the state of a transaction.
403    pub fn state(&self, tx_id: TxId) -> Option<TxState> {
404        self.transactions.read().get(&tx_id).map(|info| info.state)
405    }
406
407    /// Returns the start epoch of a transaction.
408    pub fn start_epoch(&self, tx_id: TxId) -> Option<EpochId> {
409        self.transactions
410            .read()
411            .get(&tx_id)
412            .map(|info| info.start_epoch)
413    }
414
415    /// Returns the current epoch.
416    #[must_use]
417    pub fn current_epoch(&self) -> EpochId {
418        EpochId::new(self.current_epoch.load(Ordering::Acquire))
419    }
420
421    /// Returns the minimum epoch that must be preserved for active transactions.
422    ///
423    /// This is used for garbage collection - versions visible at this epoch
424    /// must be preserved.
425    #[must_use]
426    pub fn min_active_epoch(&self) -> EpochId {
427        let txns = self.transactions.read();
428        txns.values()
429            .filter(|info| info.state == TxState::Active)
430            .map(|info| info.start_epoch)
431            .min()
432            .unwrap_or_else(|| self.current_epoch())
433    }
434
435    /// Returns the number of active transactions.
436    #[must_use]
437    pub fn active_count(&self) -> usize {
438        self.transactions
439            .read()
440            .values()
441            .filter(|info| info.state == TxState::Active)
442            .count()
443    }
444
445    /// Cleans up completed transactions that are no longer needed for conflict detection.
446    ///
447    /// A committed transaction's write set must be preserved until all transactions
448    /// that started before its commit have completed. This ensures write-write
449    /// conflict detection works correctly.
450    ///
451    /// Returns the number of transactions cleaned up.
452    pub fn gc(&self) -> usize {
453        let mut txns = self.transactions.write();
454        let mut committed = self.committed_epochs.write();
455
456        // Find the minimum start epoch among active transactions
457        let min_active_start = txns
458            .values()
459            .filter(|info| info.state == TxState::Active)
460            .map(|info| info.start_epoch)
461            .min();
462
463        let initial_count = txns.len();
464
465        // Collect transactions safe to remove
466        let to_remove: Vec<TxId> = txns
467            .iter()
468            .filter(|(tx_id, info)| {
469                match info.state {
470                    TxState::Active => false, // Never remove active transactions
471                    TxState::Aborted => true, // Always safe to remove aborted transactions
472                    TxState::Committed => {
473                        // Only remove committed transactions if their commit epoch
474                        // is older than all active transactions' start epochs
475                        if let Some(min_start) = min_active_start {
476                            if let Some(commit_epoch) = committed.get(*tx_id) {
477                                // Safe to remove if committed before all active txns started
478                                commit_epoch.as_u64() < min_start.as_u64()
479                            } else {
480                                // No commit epoch recorded, keep it to be safe
481                                false
482                            }
483                        } else {
484                            // No active transactions, safe to remove all committed
485                            true
486                        }
487                    }
488                }
489            })
490            .map(|(id, _)| *id)
491            .collect();
492
493        for id in &to_remove {
494            txns.remove(id);
495            committed.remove(id);
496        }
497
498        initial_count - txns.len()
499    }
500
501    /// Marks a transaction as committed at a specific epoch.
502    ///
503    /// Used during recovery to restore transaction state.
504    pub fn mark_committed(&self, tx_id: TxId, epoch: EpochId) {
505        self.committed_epochs.write().insert(tx_id, epoch);
506    }
507
508    /// Returns the last assigned transaction ID.
509    ///
510    /// Returns `None` if no transactions have been started yet.
511    #[must_use]
512    pub fn last_assigned_tx_id(&self) -> Option<TxId> {
513        let next = self.next_tx_id.load(Ordering::Relaxed);
514        if next > 1 {
515            Some(TxId::new(next - 1))
516        } else {
517            None
518        }
519    }
520}
521
522impl Default for TransactionManager {
523    fn default() -> Self {
524        Self::new()
525    }
526}
527
528#[cfg(test)]
529mod tests {
530    use super::*;
531
532    #[test]
533    fn test_begin_commit() {
534        let mgr = TransactionManager::new();
535
536        let tx = mgr.begin();
537        assert_eq!(mgr.state(tx), Some(TxState::Active));
538
539        let commit_epoch = mgr.commit(tx).unwrap();
540        assert_eq!(mgr.state(tx), Some(TxState::Committed));
541        assert!(commit_epoch.as_u64() > 0);
542    }
543
544    #[test]
545    fn test_begin_abort() {
546        let mgr = TransactionManager::new();
547
548        let tx = mgr.begin();
549        mgr.abort(tx).unwrap();
550        assert_eq!(mgr.state(tx), Some(TxState::Aborted));
551    }
552
553    #[test]
554    fn test_epoch_advancement() {
555        let mgr = TransactionManager::new();
556
557        let initial_epoch = mgr.current_epoch();
558
559        let tx = mgr.begin();
560        let commit_epoch = mgr.commit(tx).unwrap();
561
562        assert!(mgr.current_epoch().as_u64() > initial_epoch.as_u64());
563        assert!(commit_epoch.as_u64() > initial_epoch.as_u64());
564    }
565
566    #[test]
567    fn test_gc_preserves_needed_write_sets() {
568        let mgr = TransactionManager::new();
569
570        let tx1 = mgr.begin();
571        let tx2 = mgr.begin();
572
573        mgr.commit(tx1).unwrap();
574        // tx2 still active - started before tx1 committed
575
576        assert_eq!(mgr.active_count(), 1);
577
578        // GC should NOT remove tx1 because tx2 might need its write set for conflict detection
579        let cleaned = mgr.gc();
580        assert_eq!(cleaned, 0);
581
582        // Both transactions should remain
583        assert_eq!(mgr.state(tx1), Some(TxState::Committed));
584        assert_eq!(mgr.state(tx2), Some(TxState::Active));
585    }
586
587    #[test]
588    fn test_gc_removes_old_commits() {
589        let mgr = TransactionManager::new();
590
591        // tx1 commits at epoch 1
592        let tx1 = mgr.begin();
593        mgr.commit(tx1).unwrap();
594
595        // tx2 starts at epoch 1, commits at epoch 2
596        let tx2 = mgr.begin();
597        mgr.commit(tx2).unwrap();
598
599        // tx3 starts at epoch 2
600        let tx3 = mgr.begin();
601
602        // At this point:
603        // - tx1 committed at epoch 1, tx3 started at epoch 2 → tx1 commit < tx3 start → safe to GC
604        // - tx2 committed at epoch 2, tx3 started at epoch 2 → tx2 commit >= tx3 start → NOT safe
605        let cleaned = mgr.gc();
606        assert_eq!(cleaned, 1); // Only tx1 removed
607
608        assert_eq!(mgr.state(tx1), None);
609        assert_eq!(mgr.state(tx2), Some(TxState::Committed)); // Preserved for conflict detection
610        assert_eq!(mgr.state(tx3), Some(TxState::Active));
611
612        // After tx3 commits, tx2 can be GC'd
613        mgr.commit(tx3).unwrap();
614        let cleaned = mgr.gc();
615        assert_eq!(cleaned, 2); // tx2 and tx3 both cleaned (no active transactions)
616    }
617
618    #[test]
619    fn test_gc_removes_aborted() {
620        let mgr = TransactionManager::new();
621
622        let tx1 = mgr.begin();
623        let tx2 = mgr.begin();
624
625        mgr.abort(tx1).unwrap();
626        // tx2 still active
627
628        // Aborted transactions are always safe to remove
629        let cleaned = mgr.gc();
630        assert_eq!(cleaned, 1);
631
632        assert_eq!(mgr.state(tx1), None);
633        assert_eq!(mgr.state(tx2), Some(TxState::Active));
634    }
635
636    #[test]
637    fn test_write_tracking() {
638        let mgr = TransactionManager::new();
639
640        let tx = mgr.begin();
641
642        // Record writes
643        mgr.record_write(tx, NodeId::new(1)).unwrap();
644        mgr.record_write(tx, NodeId::new(2)).unwrap();
645        mgr.record_write(tx, EdgeId::new(100)).unwrap();
646
647        // Should commit successfully (no conflicts)
648        assert!(mgr.commit(tx).is_ok());
649    }
650
651    #[test]
652    fn test_min_active_epoch() {
653        let mgr = TransactionManager::new();
654
655        // No active transactions - should return current epoch
656        assert_eq!(mgr.min_active_epoch(), mgr.current_epoch());
657
658        // Start some transactions
659        let tx1 = mgr.begin();
660        let epoch1 = mgr.start_epoch(tx1).unwrap();
661
662        // Advance epoch
663        let tx2 = mgr.begin();
664        mgr.commit(tx2).unwrap();
665
666        let _tx3 = mgr.begin();
667
668        // min_active_epoch should be tx1's start epoch (earliest active)
669        assert_eq!(mgr.min_active_epoch(), epoch1);
670    }
671
672    #[test]
673    fn test_abort_all_active() {
674        let mgr = TransactionManager::new();
675
676        let tx1 = mgr.begin();
677        let tx2 = mgr.begin();
678        let tx3 = mgr.begin();
679
680        mgr.commit(tx1).unwrap();
681        // tx2 and tx3 still active
682
683        mgr.abort_all_active();
684
685        assert_eq!(mgr.state(tx1), Some(TxState::Committed)); // Already committed
686        assert_eq!(mgr.state(tx2), Some(TxState::Aborted));
687        assert_eq!(mgr.state(tx3), Some(TxState::Aborted));
688    }
689
690    #[test]
691    fn test_start_epoch_snapshot() {
692        let mgr = TransactionManager::new();
693
694        // Start epoch for tx1
695        let tx1 = mgr.begin();
696        let start1 = mgr.start_epoch(tx1).unwrap();
697
698        // Commit tx1, advancing epoch
699        mgr.commit(tx1).unwrap();
700
701        // Start tx2 after epoch advanced
702        let tx2 = mgr.begin();
703        let start2 = mgr.start_epoch(tx2).unwrap();
704
705        // tx2 should have a later start epoch
706        assert!(start2.as_u64() > start1.as_u64());
707    }
708
709    #[test]
710    fn test_write_write_conflict_detection() {
711        let mgr = TransactionManager::new();
712
713        // Both transactions start at the same epoch
714        let tx1 = mgr.begin();
715        let tx2 = mgr.begin();
716
717        // Both try to write to the same entity
718        let entity = NodeId::new(42);
719        mgr.record_write(tx1, entity).unwrap();
720        mgr.record_write(tx2, entity).unwrap();
721
722        // First commit succeeds
723        let result1 = mgr.commit(tx1);
724        assert!(result1.is_ok());
725
726        // Second commit should fail due to write-write conflict
727        let result2 = mgr.commit(tx2);
728        assert!(result2.is_err());
729        assert!(
730            result2
731                .unwrap_err()
732                .to_string()
733                .contains("Write-write conflict"),
734            "Expected write-write conflict error"
735        );
736    }
737
738    #[test]
739    fn test_commit_epoch_monotonicity() {
740        let mgr = TransactionManager::new();
741
742        let mut epochs = Vec::new();
743
744        // Commit multiple transactions and verify epochs are strictly increasing
745        for _ in 0..10 {
746            let tx = mgr.begin();
747            let epoch = mgr.commit(tx).unwrap();
748            epochs.push(epoch.as_u64());
749        }
750
751        // Verify strict monotonicity
752        for i in 1..epochs.len() {
753            assert!(
754                epochs[i] > epochs[i - 1],
755                "Epoch {} ({}) should be greater than epoch {} ({})",
756                i,
757                epochs[i],
758                i - 1,
759                epochs[i - 1]
760            );
761        }
762    }
763
764    #[test]
765    fn test_concurrent_commits_via_threads() {
766        use std::sync::Arc;
767        use std::thread;
768
769        let mgr = Arc::new(TransactionManager::new());
770        let num_threads = 10;
771        let commits_per_thread = 100;
772
773        let handles: Vec<_> = (0..num_threads)
774            .map(|_| {
775                let mgr = Arc::clone(&mgr);
776                thread::spawn(move || {
777                    let mut epochs = Vec::new();
778                    for _ in 0..commits_per_thread {
779                        let tx = mgr.begin();
780                        let epoch = mgr.commit(tx).unwrap();
781                        epochs.push(epoch.as_u64());
782                    }
783                    epochs
784                })
785            })
786            .collect();
787
788        let mut all_epochs: Vec<u64> = handles
789            .into_iter()
790            .flat_map(|h| h.join().unwrap())
791            .collect();
792
793        // All epochs should be unique (no duplicates)
794        all_epochs.sort_unstable();
795        let unique_count = all_epochs.len();
796        all_epochs.dedup();
797        assert_eq!(
798            all_epochs.len(),
799            unique_count,
800            "All commit epochs should be unique"
801        );
802
803        // Final epoch should equal number of commits
804        assert_eq!(
805            mgr.current_epoch().as_u64(),
806            (num_threads * commits_per_thread) as u64,
807            "Final epoch should equal total commits"
808        );
809    }
810
811    #[test]
812    fn test_isolation_level_default() {
813        let mgr = TransactionManager::new();
814
815        let tx = mgr.begin();
816        assert_eq!(
817            mgr.isolation_level(tx),
818            Some(IsolationLevel::SnapshotIsolation)
819        );
820    }
821
822    #[test]
823    fn test_isolation_level_explicit() {
824        let mgr = TransactionManager::new();
825
826        let tx_rc = mgr.begin_with_isolation(IsolationLevel::ReadCommitted);
827        let tx_si = mgr.begin_with_isolation(IsolationLevel::SnapshotIsolation);
828        let tx_ser = mgr.begin_with_isolation(IsolationLevel::Serializable);
829
830        assert_eq!(
831            mgr.isolation_level(tx_rc),
832            Some(IsolationLevel::ReadCommitted)
833        );
834        assert_eq!(
835            mgr.isolation_level(tx_si),
836            Some(IsolationLevel::SnapshotIsolation)
837        );
838        assert_eq!(
839            mgr.isolation_level(tx_ser),
840            Some(IsolationLevel::Serializable)
841        );
842    }
843
844    #[test]
845    fn test_ssi_read_write_conflict_detected() {
846        let mgr = TransactionManager::new();
847
848        // tx1 starts with Serializable isolation
849        let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
850
851        // tx2 starts and will modify an entity
852        let tx2 = mgr.begin();
853
854        // tx1 reads entity 42
855        let entity = NodeId::new(42);
856        mgr.record_read(tx1, entity).unwrap();
857
858        // tx2 writes to the same entity and commits
859        mgr.record_write(tx2, entity).unwrap();
860        mgr.commit(tx2).unwrap();
861
862        // tx1 tries to commit - should fail due to SSI read-write conflict
863        let result = mgr.commit(tx1);
864        assert!(result.is_err());
865        assert!(
866            result
867                .unwrap_err()
868                .to_string()
869                .contains("Serialization failure"),
870            "Expected serialization failure error"
871        );
872    }
873
874    #[test]
875    fn test_ssi_no_conflict_when_not_serializable() {
876        let mgr = TransactionManager::new();
877
878        // tx1 starts with default Snapshot Isolation
879        let tx1 = mgr.begin();
880
881        // tx2 starts and will modify an entity
882        let tx2 = mgr.begin();
883
884        // tx1 reads entity 42
885        let entity = NodeId::new(42);
886        mgr.record_read(tx1, entity).unwrap();
887
888        // tx2 writes to the same entity and commits
889        mgr.record_write(tx2, entity).unwrap();
890        mgr.commit(tx2).unwrap();
891
892        // tx1 should commit successfully (SI doesn't check read-write conflicts)
893        let result = mgr.commit(tx1);
894        assert!(
895            result.is_ok(),
896            "Snapshot Isolation should not detect read-write conflicts"
897        );
898    }
899
900    #[test]
901    fn test_ssi_no_conflict_when_write_before_read() {
902        let mgr = TransactionManager::new();
903
904        // tx1 writes and commits first
905        let tx1 = mgr.begin();
906        let entity = NodeId::new(42);
907        mgr.record_write(tx1, entity).unwrap();
908        mgr.commit(tx1).unwrap();
909
910        // tx2 starts AFTER tx1 committed and reads the entity
911        let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
912        mgr.record_read(tx2, entity).unwrap();
913
914        // tx2 should commit successfully (tx1 committed before tx2 started)
915        let result = mgr.commit(tx2);
916        assert!(
917            result.is_ok(),
918            "Should not conflict when writer committed before reader started"
919        );
920    }
921
922    #[test]
923    fn test_write_skew_prevented_by_ssi() {
924        // Classic write skew scenario:
925        // Account A = 50, Account B = 50, constraint: A + B >= 0
926        // T1 reads A, B, writes A = A - 100
927        // T2 reads A, B, writes B = B - 100
928        // Without SSI, both could commit violating the constraint.
929
930        let mgr = TransactionManager::new();
931
932        let account_a = NodeId::new(1);
933        let account_b = NodeId::new(2);
934
935        // T1 and T2 both start with Serializable isolation
936        let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
937        let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
938
939        // Both read both accounts
940        mgr.record_read(tx1, account_a).unwrap();
941        mgr.record_read(tx1, account_b).unwrap();
942        mgr.record_read(tx2, account_a).unwrap();
943        mgr.record_read(tx2, account_b).unwrap();
944
945        // T1 writes to A, T2 writes to B (no write-write conflict)
946        mgr.record_write(tx1, account_a).unwrap();
947        mgr.record_write(tx2, account_b).unwrap();
948
949        // T1 commits first
950        let result1 = mgr.commit(tx1);
951        assert!(result1.is_ok(), "First commit should succeed");
952
953        // T2 tries to commit - should fail because it read account_a which T1 wrote
954        let result2 = mgr.commit(tx2);
955        assert!(result2.is_err(), "Second commit should fail due to SSI");
956        assert!(
957            result2
958                .unwrap_err()
959                .to_string()
960                .contains("Serialization failure"),
961            "Expected serialization failure error for write skew prevention"
962        );
963    }
964
965    #[test]
966    fn test_read_committed_allows_non_repeatable_reads() {
967        let mgr = TransactionManager::new();
968
969        // tx1 starts with ReadCommitted isolation
970        let tx1 = mgr.begin_with_isolation(IsolationLevel::ReadCommitted);
971        let entity = NodeId::new(42);
972
973        // tx1 reads entity
974        mgr.record_read(tx1, entity).unwrap();
975
976        // tx2 writes and commits
977        let tx2 = mgr.begin();
978        mgr.record_write(tx2, entity).unwrap();
979        mgr.commit(tx2).unwrap();
980
981        // tx1 can still commit (ReadCommitted allows non-repeatable reads)
982        let result = mgr.commit(tx1);
983        assert!(
984            result.is_ok(),
985            "ReadCommitted should allow non-repeatable reads"
986        );
987    }
988
989    #[test]
990    fn test_isolation_level_debug() {
991        assert_eq!(
992            format!("{:?}", IsolationLevel::ReadCommitted),
993            "ReadCommitted"
994        );
995        assert_eq!(
996            format!("{:?}", IsolationLevel::SnapshotIsolation),
997            "SnapshotIsolation"
998        );
999        assert_eq!(
1000            format!("{:?}", IsolationLevel::Serializable),
1001            "Serializable"
1002        );
1003    }
1004
1005    #[test]
1006    fn test_isolation_level_default_trait() {
1007        let default: IsolationLevel = Default::default();
1008        assert_eq!(default, IsolationLevel::SnapshotIsolation);
1009    }
1010
1011    #[test]
1012    fn test_ssi_concurrent_reads_no_conflict() {
1013        let mgr = TransactionManager::new();
1014
1015        let entity = NodeId::new(42);
1016
1017        // Both transactions read the same entity
1018        let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1019        let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1020
1021        mgr.record_read(tx1, entity).unwrap();
1022        mgr.record_read(tx2, entity).unwrap();
1023
1024        // Both should commit successfully (read-read is not a conflict)
1025        assert!(mgr.commit(tx1).is_ok());
1026        assert!(mgr.commit(tx2).is_ok());
1027    }
1028
1029    #[test]
1030    fn test_ssi_write_write_conflict() {
1031        let mgr = TransactionManager::new();
1032
1033        let entity = NodeId::new(42);
1034
1035        // Both transactions write the same entity
1036        let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1037        let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1038
1039        mgr.record_write(tx1, entity).unwrap();
1040        mgr.record_write(tx2, entity).unwrap();
1041
1042        // First commit succeeds
1043        assert!(mgr.commit(tx1).is_ok());
1044
1045        // Second commit fails (write-write conflict)
1046        let result = mgr.commit(tx2);
1047        assert!(result.is_err());
1048    }
1049}