Skip to main content

grafeo_engine/transaction/
manager.rs

1//! Transaction manager.
2
3use std::collections::HashSet;
4use std::sync::atomic::{AtomicU64, Ordering};
5
6use grafeo_common::types::{EdgeId, EpochId, NodeId, TransactionId};
7use grafeo_common::utils::error::{Error, Result, TransactionError};
8use grafeo_common::utils::hash::FxHashMap;
9use parking_lot::RwLock;
10
11/// State of a transaction.
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum TransactionState {
14    /// Transaction is active.
15    Active,
16    /// Transaction is committed.
17    Committed,
18    /// Transaction is aborted.
19    Aborted,
20}
21
22/// Transaction isolation level.
23///
24/// Controls the consistency guarantees and performance tradeoffs for transactions.
25///
26/// # Comparison
27///
28/// | Level | Dirty Reads | Non-Repeatable Reads | Phantom Reads | Write Skew |
29/// |-------|-------------|----------------------|---------------|------------|
30/// | ReadCommitted | No | Yes | Yes | Yes |
31/// | SnapshotIsolation | No | No | No | Yes |
32/// | Serializable | No | No | No | No |
33///
34/// # Performance
35///
36/// Higher isolation levels require more bookkeeping:
37/// - `ReadCommitted`: Only tracks writes
38/// - `SnapshotIsolation`: Tracks writes + snapshot versioning
39/// - `Serializable`: Tracks writes + reads + SSI validation
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
41pub enum IsolationLevel {
42    /// Read Committed: sees only committed data, but may see different
43    /// versions of the same row within a transaction.
44    ///
45    /// Lowest overhead, highest throughput, but weaker consistency.
46    ReadCommitted,
47
48    /// Snapshot Isolation (default): each transaction sees a consistent
49    /// snapshot as of transaction start. Prevents non-repeatable reads
50    /// and phantom reads.
51    ///
52    /// Vulnerable to write skew anomaly.
53    #[default]
54    SnapshotIsolation,
55
56    /// Serializable Snapshot Isolation (SSI): provides full serializability
57    /// by detecting read-write conflicts in addition to write-write conflicts.
58    ///
59    /// Prevents all anomalies including write skew, but may abort more
60    /// transactions due to stricter conflict detection.
61    Serializable,
62}
63
64/// Entity identifier for write tracking.
65#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
66pub enum EntityId {
67    /// A node.
68    Node(NodeId),
69    /// An edge.
70    Edge(EdgeId),
71}
72
73impl From<NodeId> for EntityId {
74    fn from(id: NodeId) -> Self {
75        Self::Node(id)
76    }
77}
78
79impl From<EdgeId> for EntityId {
80    fn from(id: EdgeId) -> Self {
81        Self::Edge(id)
82    }
83}
84
85/// Information about an active transaction.
86pub struct TransactionInfo {
87    /// Transaction state.
88    pub state: TransactionState,
89    /// Isolation level for this transaction.
90    pub isolation_level: IsolationLevel,
91    /// Start epoch (snapshot epoch for reads).
92    pub start_epoch: EpochId,
93    /// Set of entities written by this transaction.
94    pub write_set: HashSet<EntityId>,
95    /// Set of entities read by this transaction (for serializable isolation).
96    pub read_set: HashSet<EntityId>,
97}
98
99impl TransactionInfo {
100    /// Creates a new transaction info with the given isolation level.
101    fn new(start_epoch: EpochId, isolation_level: IsolationLevel) -> Self {
102        Self {
103            state: TransactionState::Active,
104            isolation_level,
105            start_epoch,
106            write_set: HashSet::new(),
107            read_set: HashSet::new(),
108        }
109    }
110}
111
112/// Manages transactions and MVCC versioning.
113pub struct TransactionManager {
114    /// Next transaction ID.
115    next_transaction_id: AtomicU64,
116    /// Current epoch.
117    current_epoch: AtomicU64,
118    /// Number of currently active transactions (for fast-path conflict skip).
119    active_count: AtomicU64,
120    /// Active transactions.
121    transactions: RwLock<FxHashMap<TransactionId, TransactionInfo>>,
122    /// Committed transaction epochs (for conflict detection).
123    /// Maps TransactionId -> commit epoch.
124    committed_epochs: RwLock<FxHashMap<TransactionId, EpochId>>,
125}
126
127impl TransactionManager {
128    /// Creates a new transaction manager.
129    #[must_use]
130    pub fn new() -> Self {
131        Self {
132            // Start at 2 to avoid collision with TransactionId::SYSTEM (which is 1)
133            // TransactionId::INVALID = u64::MAX, TransactionId::SYSTEM = 1, user transactions start at 2
134            next_transaction_id: AtomicU64::new(2),
135            current_epoch: AtomicU64::new(0),
136            active_count: AtomicU64::new(0),
137            transactions: RwLock::new(FxHashMap::default()),
138            committed_epochs: RwLock::new(FxHashMap::default()),
139        }
140    }
141
142    /// Begins a new transaction with the default isolation level (Snapshot Isolation).
143    pub fn begin(&self) -> TransactionId {
144        self.begin_with_isolation(IsolationLevel::default())
145    }
146
147    /// Begins a new transaction with the specified isolation level.
148    pub fn begin_with_isolation(&self, isolation_level: IsolationLevel) -> TransactionId {
149        let transaction_id =
150            TransactionId::new(self.next_transaction_id.fetch_add(1, Ordering::Relaxed));
151        let epoch = EpochId::new(self.current_epoch.load(Ordering::Acquire));
152
153        let info = TransactionInfo::new(epoch, isolation_level);
154        self.transactions.write().insert(transaction_id, info);
155        self.active_count.fetch_add(1, Ordering::Relaxed);
156        transaction_id
157    }
158
159    /// Returns the isolation level of a transaction.
160    pub fn isolation_level(&self, transaction_id: TransactionId) -> Option<IsolationLevel> {
161        self.transactions
162            .read()
163            .get(&transaction_id)
164            .map(|info| info.isolation_level)
165    }
166
167    /// Records a write operation for the transaction.
168    ///
169    /// Uses first-writer-wins: if another active transaction has already
170    /// written to the same entity, returns a write-write conflict error
171    /// immediately (before the caller mutates the store).
172    ///
173    /// # Errors
174    ///
175    /// Returns an error if the transaction is not active or if another
176    /// active transaction has already written to the same entity.
177    pub fn record_write(
178        &self,
179        transaction_id: TransactionId,
180        entity: impl Into<EntityId>,
181    ) -> Result<()> {
182        let entity = entity.into();
183        let mut txns = self.transactions.write();
184        let info = txns.get(&transaction_id).ok_or_else(|| {
185            Error::Transaction(TransactionError::InvalidState(
186                "Transaction not found".to_string(),
187            ))
188        })?;
189
190        if info.state != TransactionState::Active {
191            return Err(Error::Transaction(TransactionError::InvalidState(
192                "Transaction is not active".to_string(),
193            )));
194        }
195
196        // First-writer-wins: reject if another active transaction already
197        // wrote to the same entity. This prevents interleaved PENDING
198        // entries in VersionLogs that cannot be rolled back per-transaction.
199        // Skip the scan when only one transaction is active (common case for
200        // auto-commit): there is nobody to conflict with.
201        if self.active_count.load(Ordering::Relaxed) > 1 {
202            for (other_tx, other_info) in txns.iter() {
203                if *other_tx != transaction_id
204                    && other_info.state == TransactionState::Active
205                    && other_info.write_set.contains(&entity)
206                {
207                    return Err(Error::Transaction(TransactionError::WriteConflict(
208                        format!("Write-write conflict on entity {entity:?}"),
209                    )));
210                }
211            }
212        }
213
214        // Safe to record: re-borrow mutably
215        let info = txns.get_mut(&transaction_id).expect("checked above");
216        info.write_set.insert(entity);
217        Ok(())
218    }
219
220    /// Records a read operation for the transaction (for serializable isolation).
221    ///
222    /// # Errors
223    ///
224    /// Returns an error if the transaction is not active.
225    pub fn record_read(
226        &self,
227        transaction_id: TransactionId,
228        entity: impl Into<EntityId>,
229    ) -> Result<()> {
230        let mut txns = self.transactions.write();
231        let info = txns.get_mut(&transaction_id).ok_or_else(|| {
232            Error::Transaction(TransactionError::InvalidState(
233                "Transaction not found".to_string(),
234            ))
235        })?;
236
237        if info.state != TransactionState::Active {
238            return Err(Error::Transaction(TransactionError::InvalidState(
239                "Transaction is not active".to_string(),
240            )));
241        }
242
243        info.read_set.insert(entity.into());
244        Ok(())
245    }
246
247    /// Commits a transaction with conflict detection.
248    ///
249    /// # Conflict Detection
250    ///
251    /// - **All isolation levels**: Write-write conflicts (two transactions writing
252    ///   to the same entity) are always detected and cause the second committer to abort.
253    ///
254    /// - **Serializable only**: Read-write conflicts (SSI validation) are additionally
255    ///   checked. If transaction T1 read an entity that another transaction T2 wrote,
256    ///   and T2 committed after T1 started, T1 will abort. This prevents write skew.
257    ///
258    /// # Errors
259    ///
260    /// Returns an error if:
261    /// - The transaction is not active
262    /// - There's a write-write conflict with another committed transaction
263    /// - (Serializable only) There's a read-write conflict (SSI violation)
264    pub fn commit(&self, transaction_id: TransactionId) -> Result<EpochId> {
265        let mut txns = self.transactions.write();
266        let committed = self.committed_epochs.read();
267
268        // First, validate the transaction exists and is active
269        let (our_isolation, our_start_epoch, our_write_set, our_read_set) = {
270            let info = txns.get(&transaction_id).ok_or_else(|| {
271                Error::Transaction(TransactionError::InvalidState(
272                    "Transaction not found".to_string(),
273                ))
274            })?;
275
276            if info.state != TransactionState::Active {
277                return Err(Error::Transaction(TransactionError::InvalidState(
278                    "Transaction is not active".to_string(),
279                )));
280            }
281
282            (
283                info.isolation_level,
284                info.start_epoch,
285                info.write_set.clone(),
286                info.read_set.clone(),
287            )
288        };
289
290        // Check for write-write conflicts with transactions that committed
291        // after our snapshot (i.e., concurrent writers to the same entities).
292        // Transactions committed before our start_epoch are part of our visible
293        // snapshot, so overwriting their values is not a conflict.
294        for (other_tx, commit_epoch) in committed.iter() {
295            if *other_tx != transaction_id && commit_epoch.as_u64() > our_start_epoch.as_u64() {
296                // Check if that transaction wrote to any of our entities
297                if let Some(other_info) = txns.get(other_tx) {
298                    for entity in &our_write_set {
299                        if other_info.write_set.contains(entity) {
300                            return Err(Error::Transaction(TransactionError::WriteConflict(
301                                format!("Write-write conflict on entity {:?}", entity),
302                            )));
303                        }
304                    }
305                }
306            }
307        }
308
309        // SSI validation for Serializable isolation level
310        // Check for read-write conflicts: if we read an entity that another
311        // transaction (that committed after we started) wrote, we have a
312        // "rw-antidependency" which can cause write skew.
313        if our_isolation == IsolationLevel::Serializable && !our_read_set.is_empty() {
314            for (other_tx, commit_epoch) in committed.iter() {
315                if *other_tx != transaction_id && commit_epoch.as_u64() > our_start_epoch.as_u64() {
316                    // Check if that transaction wrote to any entity we read
317                    if let Some(other_info) = txns.get(other_tx) {
318                        for entity in &our_read_set {
319                            if other_info.write_set.contains(entity) {
320                                return Err(Error::Transaction(
321                                    TransactionError::SerializationFailure(format!(
322                                        "Read-write conflict on entity {:?}: \
323                                         another transaction modified data we read",
324                                        entity
325                                    )),
326                                ));
327                            }
328                        }
329                    }
330                }
331            }
332
333            // Also check against transactions that are already marked committed
334            // but not yet in committed_epochs map
335            for (other_tx, other_info) in txns.iter() {
336                if *other_tx == transaction_id {
337                    continue;
338                }
339                if other_info.state == TransactionState::Committed {
340                    // If we can see their write set and we read something they wrote
341                    for entity in &our_read_set {
342                        if other_info.write_set.contains(entity) {
343                            // Check if they committed after we started
344                            if let Some(commit_epoch) = committed.get(other_tx)
345                                && commit_epoch.as_u64() > our_start_epoch.as_u64()
346                            {
347                                return Err(Error::Transaction(
348                                    TransactionError::SerializationFailure(format!(
349                                        "Read-write conflict on entity {:?}: \
350                                             another transaction modified data we read",
351                                        entity
352                                    )),
353                                ));
354                            }
355                        }
356                    }
357                }
358            }
359        }
360
361        // Commit successful - advance epoch atomically
362        // SeqCst ensures all threads see commits in a consistent total order
363        let commit_epoch = EpochId::new(self.current_epoch.fetch_add(1, Ordering::SeqCst) + 1);
364
365        // Now update state
366        if let Some(info) = txns.get_mut(&transaction_id) {
367            info.state = TransactionState::Committed;
368        }
369        self.active_count.fetch_sub(1, Ordering::Relaxed);
370
371        // Record commit epoch (need to drop read lock first)
372        drop(committed);
373        self.committed_epochs
374            .write()
375            .insert(transaction_id, commit_epoch);
376
377        Ok(commit_epoch)
378    }
379
380    /// Aborts a transaction.
381    ///
382    /// # Errors
383    ///
384    /// Returns an error if the transaction is not active.
385    pub fn abort(&self, transaction_id: TransactionId) -> Result<()> {
386        let mut txns = self.transactions.write();
387
388        let info = txns.get_mut(&transaction_id).ok_or_else(|| {
389            Error::Transaction(TransactionError::InvalidState(
390                "Transaction not found".to_string(),
391            ))
392        })?;
393
394        if info.state != TransactionState::Active {
395            return Err(Error::Transaction(TransactionError::InvalidState(
396                "Transaction is not active".to_string(),
397            )));
398        }
399
400        info.state = TransactionState::Aborted;
401        self.active_count.fetch_sub(1, Ordering::Relaxed);
402        Ok(())
403    }
404
405    /// Returns the write set of a transaction.
406    ///
407    /// This returns a copy of the entities written by this transaction,
408    /// used for rollback to discard uncommitted versions.
409    pub fn get_write_set(&self, transaction_id: TransactionId) -> Result<HashSet<EntityId>> {
410        let txns = self.transactions.read();
411        let info = txns.get(&transaction_id).ok_or_else(|| {
412            Error::Transaction(TransactionError::InvalidState(
413                "Transaction not found".to_string(),
414            ))
415        })?;
416        Ok(info.write_set.clone())
417    }
418
419    /// Replaces the write set of a transaction (used for savepoint rollback).
420    ///
421    /// # Errors
422    ///
423    /// Returns an error if the transaction is not found.
424    pub fn reset_write_set(
425        &self,
426        transaction_id: TransactionId,
427        write_set: HashSet<EntityId>,
428    ) -> Result<()> {
429        let mut txns = self.transactions.write();
430        let info = txns.get_mut(&transaction_id).ok_or_else(|| {
431            Error::Transaction(TransactionError::InvalidState(
432                "Transaction not found".to_string(),
433            ))
434        })?;
435        info.write_set = write_set;
436        Ok(())
437    }
438
439    /// Aborts all active transactions.
440    ///
441    /// Used during database shutdown.
442    pub fn abort_all_active(&self) {
443        let mut txns = self.transactions.write();
444        for info in txns.values_mut() {
445            if info.state == TransactionState::Active {
446                info.state = TransactionState::Aborted;
447                self.active_count.fetch_sub(1, Ordering::Relaxed);
448            }
449        }
450    }
451
452    /// Returns the state of a transaction.
453    pub fn state(&self, transaction_id: TransactionId) -> Option<TransactionState> {
454        self.transactions
455            .read()
456            .get(&transaction_id)
457            .map(|info| info.state)
458    }
459
460    /// Returns the start epoch of a transaction.
461    pub fn start_epoch(&self, transaction_id: TransactionId) -> Option<EpochId> {
462        self.transactions
463            .read()
464            .get(&transaction_id)
465            .map(|info| info.start_epoch)
466    }
467
468    /// Returns the current epoch.
469    #[must_use]
470    pub fn current_epoch(&self) -> EpochId {
471        EpochId::new(self.current_epoch.load(Ordering::Acquire))
472    }
473
474    /// Synchronizes the epoch counter to at least the given value.
475    ///
476    /// Used after snapshot import and WAL recovery to align the
477    /// TransactionManager epoch with the store epoch.
478    pub fn sync_epoch(&self, epoch: EpochId) {
479        self.current_epoch
480            .fetch_max(epoch.as_u64(), Ordering::SeqCst);
481    }
482
483    /// Returns the minimum epoch that must be preserved for active transactions.
484    ///
485    /// This is used for garbage collection - versions visible at this epoch
486    /// must be preserved.
487    #[must_use]
488    pub fn min_active_epoch(&self) -> EpochId {
489        let txns = self.transactions.read();
490        txns.values()
491            .filter(|info| info.state == TransactionState::Active)
492            .map(|info| info.start_epoch)
493            .min()
494            .unwrap_or_else(|| self.current_epoch())
495    }
496
497    /// Returns the number of active transactions.
498    #[must_use]
499    pub fn active_count(&self) -> usize {
500        self.transactions
501            .read()
502            .values()
503            .filter(|info| info.state == TransactionState::Active)
504            .count()
505    }
506
507    /// Cleans up completed transactions that are no longer needed for conflict detection.
508    ///
509    /// A committed transaction's write set must be preserved until all transactions
510    /// that started before its commit have completed. This ensures write-write
511    /// conflict detection works correctly.
512    ///
513    /// Returns the number of transactions cleaned up.
514    pub fn gc(&self) -> usize {
515        let mut txns = self.transactions.write();
516        let mut committed = self.committed_epochs.write();
517
518        // Find the minimum start epoch among active transactions
519        let min_active_start = txns
520            .values()
521            .filter(|info| info.state == TransactionState::Active)
522            .map(|info| info.start_epoch)
523            .min();
524
525        let initial_count = txns.len();
526
527        // Collect transactions safe to remove
528        let to_remove: Vec<TransactionId> = txns
529            .iter()
530            .filter(|(transaction_id, info)| {
531                match info.state {
532                    TransactionState::Active => false, // Never remove active transactions
533                    TransactionState::Aborted => true, // Always safe to remove aborted transactions
534                    TransactionState::Committed => {
535                        // Only remove committed transactions if their commit epoch
536                        // is older than all active transactions' start epochs
537                        if let Some(min_start) = min_active_start {
538                            if let Some(commit_epoch) = committed.get(*transaction_id) {
539                                // Safe to remove if committed before all active txns started
540                                commit_epoch.as_u64() < min_start.as_u64()
541                            } else {
542                                // No commit epoch recorded, keep it to be safe
543                                false
544                            }
545                        } else {
546                            // No active transactions, safe to remove all committed
547                            true
548                        }
549                    }
550                }
551            })
552            .map(|(id, _)| *id)
553            .collect();
554
555        for id in &to_remove {
556            txns.remove(id);
557            committed.remove(id);
558        }
559
560        initial_count - txns.len()
561    }
562
563    /// Marks a transaction as committed at a specific epoch.
564    ///
565    /// Used during recovery to restore transaction state.
566    pub fn mark_committed(&self, transaction_id: TransactionId, epoch: EpochId) {
567        self.committed_epochs.write().insert(transaction_id, epoch);
568    }
569
570    /// Returns the last assigned transaction ID.
571    ///
572    /// Returns `None` if no transactions have been started yet.
573    #[must_use]
574    pub fn last_assigned_transaction_id(&self) -> Option<TransactionId> {
575        let next = self.next_transaction_id.load(Ordering::Relaxed);
576        if next > 1 {
577            Some(TransactionId::new(next - 1))
578        } else {
579            None
580        }
581    }
582}
583
584impl Default for TransactionManager {
585    fn default() -> Self {
586        Self::new()
587    }
588}
589
590#[cfg(test)]
591mod tests {
592    use super::*;
593
594    #[test]
595    fn test_begin_commit() {
596        let mgr = TransactionManager::new();
597
598        let tx = mgr.begin();
599        assert_eq!(mgr.state(tx), Some(TransactionState::Active));
600
601        let commit_epoch = mgr.commit(tx).unwrap();
602        assert_eq!(mgr.state(tx), Some(TransactionState::Committed));
603        assert!(commit_epoch.as_u64() > 0);
604    }
605
606    #[test]
607    fn test_begin_abort() {
608        let mgr = TransactionManager::new();
609
610        let tx = mgr.begin();
611        mgr.abort(tx).unwrap();
612        assert_eq!(mgr.state(tx), Some(TransactionState::Aborted));
613    }
614
615    #[test]
616    fn test_epoch_advancement() {
617        let mgr = TransactionManager::new();
618
619        let initial_epoch = mgr.current_epoch();
620
621        let tx = mgr.begin();
622        let commit_epoch = mgr.commit(tx).unwrap();
623
624        assert!(mgr.current_epoch().as_u64() > initial_epoch.as_u64());
625        assert!(commit_epoch.as_u64() > initial_epoch.as_u64());
626    }
627
628    #[test]
629    fn test_gc_preserves_needed_write_sets() {
630        let mgr = TransactionManager::new();
631
632        let tx1 = mgr.begin();
633        let tx2 = mgr.begin();
634
635        mgr.commit(tx1).unwrap();
636        // tx2 still active - started before tx1 committed
637
638        assert_eq!(mgr.active_count(), 1);
639
640        // GC should NOT remove tx1 because tx2 might need its write set for conflict detection
641        let cleaned = mgr.gc();
642        assert_eq!(cleaned, 0);
643
644        // Both transactions should remain
645        assert_eq!(mgr.state(tx1), Some(TransactionState::Committed));
646        assert_eq!(mgr.state(tx2), Some(TransactionState::Active));
647    }
648
649    #[test]
650    fn test_gc_removes_old_commits() {
651        let mgr = TransactionManager::new();
652
653        // tx1 commits at epoch 1
654        let tx1 = mgr.begin();
655        mgr.commit(tx1).unwrap();
656
657        // tx2 starts at epoch 1, commits at epoch 2
658        let tx2 = mgr.begin();
659        mgr.commit(tx2).unwrap();
660
661        // tx3 starts at epoch 2
662        let tx3 = mgr.begin();
663
664        // At this point:
665        // - tx1 committed at epoch 1, tx3 started at epoch 2 → tx1 commit < tx3 start → safe to GC
666        // - tx2 committed at epoch 2, tx3 started at epoch 2 → tx2 commit >= tx3 start → NOT safe
667        let cleaned = mgr.gc();
668        assert_eq!(cleaned, 1); // Only tx1 removed
669
670        assert_eq!(mgr.state(tx1), None);
671        assert_eq!(mgr.state(tx2), Some(TransactionState::Committed)); // Preserved for conflict detection
672        assert_eq!(mgr.state(tx3), Some(TransactionState::Active));
673
674        // After tx3 commits, tx2 can be GC'd
675        mgr.commit(tx3).unwrap();
676        let cleaned = mgr.gc();
677        assert_eq!(cleaned, 2); // tx2 and tx3 both cleaned (no active transactions)
678    }
679
680    #[test]
681    fn test_gc_removes_aborted() {
682        let mgr = TransactionManager::new();
683
684        let tx1 = mgr.begin();
685        let tx2 = mgr.begin();
686
687        mgr.abort(tx1).unwrap();
688        // tx2 still active
689
690        // Aborted transactions are always safe to remove
691        let cleaned = mgr.gc();
692        assert_eq!(cleaned, 1);
693
694        assert_eq!(mgr.state(tx1), None);
695        assert_eq!(mgr.state(tx2), Some(TransactionState::Active));
696    }
697
698    #[test]
699    fn test_write_tracking() {
700        let mgr = TransactionManager::new();
701
702        let tx = mgr.begin();
703
704        // Record writes
705        mgr.record_write(tx, NodeId::new(1)).unwrap();
706        mgr.record_write(tx, NodeId::new(2)).unwrap();
707        mgr.record_write(tx, EdgeId::new(100)).unwrap();
708
709        // Should commit successfully (no conflicts)
710        assert!(mgr.commit(tx).is_ok());
711    }
712
713    #[test]
714    fn test_min_active_epoch() {
715        let mgr = TransactionManager::new();
716
717        // No active transactions - should return current epoch
718        assert_eq!(mgr.min_active_epoch(), mgr.current_epoch());
719
720        // Start some transactions
721        let tx1 = mgr.begin();
722        let epoch1 = mgr.start_epoch(tx1).unwrap();
723
724        // Advance epoch
725        let tx2 = mgr.begin();
726        mgr.commit(tx2).unwrap();
727
728        let _tx3 = mgr.begin();
729
730        // min_active_epoch should be tx1's start epoch (earliest active)
731        assert_eq!(mgr.min_active_epoch(), epoch1);
732    }
733
734    #[test]
735    fn test_abort_all_active() {
736        let mgr = TransactionManager::new();
737
738        let tx1 = mgr.begin();
739        let tx2 = mgr.begin();
740        let tx3 = mgr.begin();
741
742        mgr.commit(tx1).unwrap();
743        // tx2 and tx3 still active
744
745        mgr.abort_all_active();
746
747        assert_eq!(mgr.state(tx1), Some(TransactionState::Committed)); // Already committed
748        assert_eq!(mgr.state(tx2), Some(TransactionState::Aborted));
749        assert_eq!(mgr.state(tx3), Some(TransactionState::Aborted));
750    }
751
752    #[test]
753    fn test_start_epoch_snapshot() {
754        let mgr = TransactionManager::new();
755
756        // Start epoch for tx1
757        let tx1 = mgr.begin();
758        let start1 = mgr.start_epoch(tx1).unwrap();
759
760        // Commit tx1, advancing epoch
761        mgr.commit(tx1).unwrap();
762
763        // Start tx2 after epoch advanced
764        let tx2 = mgr.begin();
765        let start2 = mgr.start_epoch(tx2).unwrap();
766
767        // tx2 should have a later start epoch
768        assert!(start2.as_u64() > start1.as_u64());
769    }
770
771    #[test]
772    fn test_write_write_conflict_detection() {
773        let mgr = TransactionManager::new();
774
775        // Both transactions start at the same epoch
776        let tx1 = mgr.begin();
777        let tx2 = mgr.begin();
778
779        // First writer succeeds
780        let entity = NodeId::new(42);
781        mgr.record_write(tx1, entity).unwrap();
782
783        // Second writer is rejected immediately (first-writer-wins)
784        let result = mgr.record_write(tx2, entity);
785        assert!(result.is_err());
786        assert!(
787            result
788                .unwrap_err()
789                .to_string()
790                .contains("Write-write conflict"),
791            "Expected write-write conflict error"
792        );
793
794        // First commit succeeds (no conflict at commit time either)
795        let result1 = mgr.commit(tx1);
796        assert!(result1.is_ok());
797    }
798
799    #[test]
800    fn test_commit_epoch_monotonicity() {
801        let mgr = TransactionManager::new();
802
803        let mut epochs = Vec::new();
804
805        // Commit multiple transactions and verify epochs are strictly increasing
806        for _ in 0..10 {
807            let tx = mgr.begin();
808            let epoch = mgr.commit(tx).unwrap();
809            epochs.push(epoch.as_u64());
810        }
811
812        // Verify strict monotonicity
813        for i in 1..epochs.len() {
814            assert!(
815                epochs[i] > epochs[i - 1],
816                "Epoch {} ({}) should be greater than epoch {} ({})",
817                i,
818                epochs[i],
819                i - 1,
820                epochs[i - 1]
821            );
822        }
823    }
824
825    #[test]
826    fn test_concurrent_commits_via_threads() {
827        use std::sync::Arc;
828        use std::thread;
829
830        let mgr = Arc::new(TransactionManager::new());
831        let num_threads = 10;
832        let commits_per_thread = 100;
833
834        let handles: Vec<_> = (0..num_threads)
835            .map(|_| {
836                let mgr = Arc::clone(&mgr);
837                thread::spawn(move || {
838                    let mut epochs = Vec::new();
839                    for _ in 0..commits_per_thread {
840                        let tx = mgr.begin();
841                        let epoch = mgr.commit(tx).unwrap();
842                        epochs.push(epoch.as_u64());
843                    }
844                    epochs
845                })
846            })
847            .collect();
848
849        let mut all_epochs: Vec<u64> = handles
850            .into_iter()
851            .flat_map(|h| h.join().unwrap())
852            .collect();
853
854        // All epochs should be unique (no duplicates)
855        all_epochs.sort_unstable();
856        let unique_count = all_epochs.len();
857        all_epochs.dedup();
858        assert_eq!(
859            all_epochs.len(),
860            unique_count,
861            "All commit epochs should be unique"
862        );
863
864        // Final epoch should equal number of commits
865        assert_eq!(
866            mgr.current_epoch().as_u64(),
867            (num_threads * commits_per_thread) as u64,
868            "Final epoch should equal total commits"
869        );
870    }
871
872    #[test]
873    fn test_isolation_level_default() {
874        let mgr = TransactionManager::new();
875
876        let tx = mgr.begin();
877        assert_eq!(
878            mgr.isolation_level(tx),
879            Some(IsolationLevel::SnapshotIsolation)
880        );
881    }
882
883    #[test]
884    fn test_isolation_level_explicit() {
885        let mgr = TransactionManager::new();
886
887        let transaction_rc = mgr.begin_with_isolation(IsolationLevel::ReadCommitted);
888        let transaction_si = mgr.begin_with_isolation(IsolationLevel::SnapshotIsolation);
889        let transaction_ser = mgr.begin_with_isolation(IsolationLevel::Serializable);
890
891        assert_eq!(
892            mgr.isolation_level(transaction_rc),
893            Some(IsolationLevel::ReadCommitted)
894        );
895        assert_eq!(
896            mgr.isolation_level(transaction_si),
897            Some(IsolationLevel::SnapshotIsolation)
898        );
899        assert_eq!(
900            mgr.isolation_level(transaction_ser),
901            Some(IsolationLevel::Serializable)
902        );
903    }
904
905    #[test]
906    fn test_ssi_read_write_conflict_detected() {
907        let mgr = TransactionManager::new();
908
909        // tx1 starts with Serializable isolation
910        let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
911
912        // tx2 starts and will modify an entity
913        let tx2 = mgr.begin();
914
915        // tx1 reads entity 42
916        let entity = NodeId::new(42);
917        mgr.record_read(tx1, entity).unwrap();
918
919        // tx2 writes to the same entity and commits
920        mgr.record_write(tx2, entity).unwrap();
921        mgr.commit(tx2).unwrap();
922
923        // tx1 tries to commit - should fail due to SSI read-write conflict
924        let result = mgr.commit(tx1);
925        assert!(result.is_err());
926        assert!(
927            result
928                .unwrap_err()
929                .to_string()
930                .contains("Serialization failure"),
931            "Expected serialization failure error"
932        );
933    }
934
935    #[test]
936    fn test_ssi_no_conflict_when_not_serializable() {
937        let mgr = TransactionManager::new();
938
939        // tx1 starts with default Snapshot Isolation
940        let tx1 = mgr.begin();
941
942        // tx2 starts and will modify an entity
943        let tx2 = mgr.begin();
944
945        // tx1 reads entity 42
946        let entity = NodeId::new(42);
947        mgr.record_read(tx1, entity).unwrap();
948
949        // tx2 writes to the same entity and commits
950        mgr.record_write(tx2, entity).unwrap();
951        mgr.commit(tx2).unwrap();
952
953        // tx1 should commit successfully (SI doesn't check read-write conflicts)
954        let result = mgr.commit(tx1);
955        assert!(
956            result.is_ok(),
957            "Snapshot Isolation should not detect read-write conflicts"
958        );
959    }
960
961    #[test]
962    fn test_ssi_no_conflict_when_write_before_read() {
963        let mgr = TransactionManager::new();
964
965        // tx1 writes and commits first
966        let tx1 = mgr.begin();
967        let entity = NodeId::new(42);
968        mgr.record_write(tx1, entity).unwrap();
969        mgr.commit(tx1).unwrap();
970
971        // tx2 starts AFTER tx1 committed and reads the entity
972        let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
973        mgr.record_read(tx2, entity).unwrap();
974
975        // tx2 should commit successfully (tx1 committed before tx2 started)
976        let result = mgr.commit(tx2);
977        assert!(
978            result.is_ok(),
979            "Should not conflict when writer committed before reader started"
980        );
981    }
982
983    #[test]
984    fn test_write_skew_prevented_by_ssi() {
985        // Classic write skew scenario:
986        // Account A = 50, Account B = 50, constraint: A + B >= 0
987        // T1 reads A, B, writes A = A - 100
988        // T2 reads A, B, writes B = B - 100
989        // Without SSI, both could commit violating the constraint.
990
991        let mgr = TransactionManager::new();
992
993        let account_a = NodeId::new(1);
994        let account_b = NodeId::new(2);
995
996        // T1 and T2 both start with Serializable isolation
997        let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
998        let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
999
1000        // Both read both accounts
1001        mgr.record_read(tx1, account_a).unwrap();
1002        mgr.record_read(tx1, account_b).unwrap();
1003        mgr.record_read(tx2, account_a).unwrap();
1004        mgr.record_read(tx2, account_b).unwrap();
1005
1006        // T1 writes to A, T2 writes to B (no write-write conflict)
1007        mgr.record_write(tx1, account_a).unwrap();
1008        mgr.record_write(tx2, account_b).unwrap();
1009
1010        // T1 commits first
1011        let result1 = mgr.commit(tx1);
1012        assert!(result1.is_ok(), "First commit should succeed");
1013
1014        // T2 tries to commit - should fail because it read account_a which T1 wrote
1015        let result2 = mgr.commit(tx2);
1016        assert!(result2.is_err(), "Second commit should fail due to SSI");
1017        assert!(
1018            result2
1019                .unwrap_err()
1020                .to_string()
1021                .contains("Serialization failure"),
1022            "Expected serialization failure error for write skew prevention"
1023        );
1024    }
1025
1026    #[test]
1027    fn test_read_committed_allows_non_repeatable_reads() {
1028        let mgr = TransactionManager::new();
1029
1030        // tx1 starts with ReadCommitted isolation
1031        let tx1 = mgr.begin_with_isolation(IsolationLevel::ReadCommitted);
1032        let entity = NodeId::new(42);
1033
1034        // tx1 reads entity
1035        mgr.record_read(tx1, entity).unwrap();
1036
1037        // tx2 writes and commits
1038        let tx2 = mgr.begin();
1039        mgr.record_write(tx2, entity).unwrap();
1040        mgr.commit(tx2).unwrap();
1041
1042        // tx1 can still commit (ReadCommitted allows non-repeatable reads)
1043        let result = mgr.commit(tx1);
1044        assert!(
1045            result.is_ok(),
1046            "ReadCommitted should allow non-repeatable reads"
1047        );
1048    }
1049
1050    #[test]
1051    fn test_isolation_level_debug() {
1052        assert_eq!(
1053            format!("{:?}", IsolationLevel::ReadCommitted),
1054            "ReadCommitted"
1055        );
1056        assert_eq!(
1057            format!("{:?}", IsolationLevel::SnapshotIsolation),
1058            "SnapshotIsolation"
1059        );
1060        assert_eq!(
1061            format!("{:?}", IsolationLevel::Serializable),
1062            "Serializable"
1063        );
1064    }
1065
1066    #[test]
1067    fn test_isolation_level_default_trait() {
1068        let default: IsolationLevel = Default::default();
1069        assert_eq!(default, IsolationLevel::SnapshotIsolation);
1070    }
1071
1072    #[test]
1073    fn test_ssi_concurrent_reads_no_conflict() {
1074        let mgr = TransactionManager::new();
1075
1076        let entity = NodeId::new(42);
1077
1078        // Both transactions read the same entity
1079        let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1080        let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1081
1082        mgr.record_read(tx1, entity).unwrap();
1083        mgr.record_read(tx2, entity).unwrap();
1084
1085        // Both should commit successfully (read-read is not a conflict)
1086        assert!(mgr.commit(tx1).is_ok());
1087        assert!(mgr.commit(tx2).is_ok());
1088    }
1089
1090    #[test]
1091    fn test_ssi_write_write_conflict() {
1092        let mgr = TransactionManager::new();
1093
1094        let entity = NodeId::new(42);
1095
1096        // Both transactions attempt to write the same entity
1097        let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1098        let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1099
1100        // First writer succeeds
1101        mgr.record_write(tx1, entity).unwrap();
1102
1103        // Second writer is rejected immediately (first-writer-wins)
1104        let result = mgr.record_write(tx2, entity);
1105        assert!(
1106            result.is_err(),
1107            "Second record_write should fail with write-write conflict"
1108        );
1109
1110        // First commit succeeds
1111        assert!(mgr.commit(tx1).is_ok());
1112    }
1113}