Skip to main content

grafeo_engine/transaction/
manager.rs

1//! Transaction manager.
2
3use std::collections::HashSet;
4use std::sync::atomic::{AtomicU64, Ordering};
5
6use grafeo_common::types::{EdgeId, EpochId, NodeId, TransactionId};
7use grafeo_common::utils::error::{Error, Result, TransactionError};
8use grafeo_common::utils::hash::FxHashMap;
9use parking_lot::RwLock;
10
11/// State of a transaction.
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum TransactionState {
14    /// Transaction is active.
15    Active,
16    /// Transaction is committed.
17    Committed,
18    /// Transaction is aborted.
19    Aborted,
20}
21
22/// Transaction isolation level.
23///
24/// Controls the consistency guarantees and performance tradeoffs for transactions.
25///
26/// # Comparison
27///
28/// | Level | Dirty Reads | Non-Repeatable Reads | Phantom Reads | Write Skew |
29/// |-------|-------------|----------------------|---------------|------------|
30/// | ReadCommitted | No | Yes | Yes | Yes |
31/// | SnapshotIsolation | No | No | No | Yes |
32/// | Serializable | No | No | No | No |
33///
34/// # Performance
35///
36/// Higher isolation levels require more bookkeeping:
37/// - `ReadCommitted`: Only tracks writes
38/// - `SnapshotIsolation`: Tracks writes + snapshot versioning
39/// - `Serializable`: Tracks writes + reads + SSI validation
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
41#[non_exhaustive]
42pub enum IsolationLevel {
43    /// Read Committed: sees only committed data, but may see different
44    /// versions of the same row within a transaction.
45    ///
46    /// Lowest overhead, highest throughput, but weaker consistency.
47    ReadCommitted,
48
49    /// Snapshot Isolation (default): each transaction sees a consistent
50    /// snapshot as of transaction start. Prevents non-repeatable reads
51    /// and phantom reads.
52    ///
53    /// Vulnerable to write skew anomaly.
54    #[default]
55    SnapshotIsolation,
56
57    /// Serializable Snapshot Isolation (SSI): provides full serializability
58    /// by detecting read-write conflicts in addition to write-write conflicts.
59    ///
60    /// Prevents all anomalies including write skew, but may abort more
61    /// transactions due to stricter conflict detection.
62    Serializable,
63}
64
65/// Entity identifier for write tracking.
66#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
67pub enum EntityId {
68    /// A node.
69    Node(NodeId),
70    /// An edge.
71    Edge(EdgeId),
72}
73
74impl From<NodeId> for EntityId {
75    fn from(id: NodeId) -> Self {
76        Self::Node(id)
77    }
78}
79
80impl From<EdgeId> for EntityId {
81    fn from(id: EdgeId) -> Self {
82        Self::Edge(id)
83    }
84}
85
86/// Information about an active transaction.
87pub struct TransactionInfo {
88    /// Transaction state.
89    pub state: TransactionState,
90    /// Isolation level for this transaction.
91    pub isolation_level: IsolationLevel,
92    /// Start epoch (snapshot epoch for reads).
93    pub start_epoch: EpochId,
94    /// Set of entities written by this transaction.
95    pub write_set: HashSet<EntityId>,
96    /// Set of entities read by this transaction (for serializable isolation).
97    pub read_set: HashSet<EntityId>,
98}
99
100impl TransactionInfo {
101    /// Creates a new transaction info with the given isolation level.
102    fn new(start_epoch: EpochId, isolation_level: IsolationLevel) -> Self {
103        Self {
104            state: TransactionState::Active,
105            isolation_level,
106            start_epoch,
107            write_set: HashSet::new(),
108            read_set: HashSet::new(),
109        }
110    }
111}
112
113/// Manages transactions and MVCC versioning.
114pub struct TransactionManager {
115    /// Next transaction ID.
116    next_transaction_id: AtomicU64,
117    /// Current epoch.
118    current_epoch: AtomicU64,
119    /// Number of currently active transactions (for fast-path conflict skip).
120    active_count: AtomicU64,
121    /// Active transactions.
122    transactions: RwLock<FxHashMap<TransactionId, TransactionInfo>>,
123    /// Committed transaction epochs (for conflict detection).
124    /// Maps TransactionId -> commit epoch.
125    committed_epochs: RwLock<FxHashMap<TransactionId, EpochId>>,
126}
127
128impl TransactionManager {
129    /// Creates a new transaction manager.
130    #[must_use]
131    pub fn new() -> Self {
132        Self {
133            // Start at 2 to avoid collision with TransactionId::SYSTEM (which is 1)
134            // TransactionId::INVALID = u64::MAX, TransactionId::SYSTEM = 1, user transactions start at 2
135            next_transaction_id: AtomicU64::new(2),
136            current_epoch: AtomicU64::new(0),
137            active_count: AtomicU64::new(0),
138            transactions: RwLock::new(FxHashMap::default()),
139            committed_epochs: RwLock::new(FxHashMap::default()),
140        }
141    }
142
143    /// Begins a new transaction with the default isolation level (Snapshot Isolation).
144    pub fn begin(&self) -> TransactionId {
145        self.begin_with_isolation(IsolationLevel::default())
146    }
147
148    /// Begins a new transaction with the specified isolation level.
149    pub fn begin_with_isolation(&self, isolation_level: IsolationLevel) -> TransactionId {
150        let transaction_id =
151            TransactionId::new(self.next_transaction_id.fetch_add(1, Ordering::Relaxed));
152        let epoch = EpochId::new(self.current_epoch.load(Ordering::Acquire));
153
154        let info = TransactionInfo::new(epoch, isolation_level);
155        self.transactions.write().insert(transaction_id, info);
156        self.active_count.fetch_add(1, Ordering::Relaxed);
157        transaction_id
158    }
159
160    /// Returns the isolation level of a transaction.
161    pub fn isolation_level(&self, transaction_id: TransactionId) -> Option<IsolationLevel> {
162        self.transactions
163            .read()
164            .get(&transaction_id)
165            .map(|info| info.isolation_level)
166    }
167
168    /// Records a write operation for the transaction.
169    ///
170    /// Uses first-writer-wins: if another active transaction has already
171    /// written to the same entity, returns a write-write conflict error
172    /// immediately (before the caller mutates the store).
173    ///
174    /// # Errors
175    ///
176    /// Returns an error if the transaction is not active or if another
177    /// active transaction has already written to the same entity.
178    pub fn record_write(
179        &self,
180        transaction_id: TransactionId,
181        entity: impl Into<EntityId>,
182    ) -> Result<()> {
183        let entity = entity.into();
184        let mut txns = self.transactions.write();
185
186        // First-writer-wins conflict detection. Skip the scan when only one
187        // transaction is active (common case for auto-commit).
188        if self.active_count.load(Ordering::Relaxed) > 1 {
189            for (other_tx, other_info) in txns.iter() {
190                if *other_tx != transaction_id
191                    && other_info.state == TransactionState::Active
192                    && other_info.write_set.contains(&entity)
193                {
194                    return Err(Error::Transaction(TransactionError::WriteConflict(
195                        format!("Write-write conflict on entity {entity:?}"),
196                    )));
197                }
198            }
199        }
200
201        // Single lookup: get_mut for both state check and write_set insert
202        let info = txns.get_mut(&transaction_id).ok_or_else(|| {
203            Error::Transaction(TransactionError::InvalidState(
204                "Transaction not found".to_string(),
205            ))
206        })?;
207
208        if info.state != TransactionState::Active {
209            return Err(Error::Transaction(TransactionError::InvalidState(
210                "Transaction is not active".to_string(),
211            )));
212        }
213
214        info.write_set.insert(entity);
215        Ok(())
216    }
217
218    /// Records a read operation for the transaction (for serializable isolation).
219    ///
220    /// # Errors
221    ///
222    /// Returns an error if the transaction is not active.
223    pub fn record_read(
224        &self,
225        transaction_id: TransactionId,
226        entity: impl Into<EntityId>,
227    ) -> Result<()> {
228        let mut txns = self.transactions.write();
229        let info = txns.get_mut(&transaction_id).ok_or_else(|| {
230            Error::Transaction(TransactionError::InvalidState(
231                "Transaction not found".to_string(),
232            ))
233        })?;
234
235        if info.state != TransactionState::Active {
236            return Err(Error::Transaction(TransactionError::InvalidState(
237                "Transaction is not active".to_string(),
238            )));
239        }
240
241        info.read_set.insert(entity.into());
242        Ok(())
243    }
244
245    /// Commits a transaction with conflict detection.
246    ///
247    /// # Conflict Detection
248    ///
249    /// - **All isolation levels**: Write-write conflicts (two transactions writing
250    ///   to the same entity) are always detected and cause the second committer to abort.
251    ///
252    /// - **Serializable only**: Read-write conflicts (SSI validation) are additionally
253    ///   checked. If transaction T1 read an entity that another transaction T2 wrote,
254    ///   and T2 committed after T1 started, T1 will abort. This prevents write skew.
255    ///
256    /// # Errors
257    ///
258    /// Returns an error if:
259    /// - The transaction is not active
260    /// - There's a write-write conflict with another committed transaction
261    /// - (Serializable only) There's a read-write conflict (SSI violation)
262    pub fn commit(&self, transaction_id: TransactionId) -> Result<EpochId> {
263        let mut txns = self.transactions.write();
264        let committed = self.committed_epochs.read();
265
266        // First, validate the transaction exists and is active
267        let (our_isolation, our_start_epoch, our_write_set, our_read_set) = {
268            let info = txns.get(&transaction_id).ok_or_else(|| {
269                Error::Transaction(TransactionError::InvalidState(
270                    "Transaction not found".to_string(),
271                ))
272            })?;
273
274            if info.state != TransactionState::Active {
275                return Err(Error::Transaction(TransactionError::InvalidState(
276                    "Transaction is not active".to_string(),
277                )));
278            }
279
280            (
281                info.isolation_level,
282                info.start_epoch,
283                info.write_set.clone(),
284                info.read_set.clone(),
285            )
286        };
287
288        // Check for write-write conflicts with transactions that committed
289        // after our snapshot (i.e., concurrent writers to the same entities).
290        // Transactions committed before our start_epoch are part of our visible
291        // snapshot, so overwriting their values is not a conflict.
292        for (other_tx, commit_epoch) in committed.iter() {
293            if *other_tx != transaction_id && commit_epoch.as_u64() > our_start_epoch.as_u64() {
294                // Check if that transaction wrote to any of our entities
295                if let Some(other_info) = txns.get(other_tx) {
296                    for entity in &our_write_set {
297                        if other_info.write_set.contains(entity) {
298                            return Err(Error::Transaction(TransactionError::WriteConflict(
299                                format!("Write-write conflict on entity {:?}", entity),
300                            )));
301                        }
302                    }
303                }
304            }
305        }
306
307        // SSI validation for Serializable isolation level
308        // Check for read-write conflicts: if we read an entity that another
309        // transaction (that committed after we started) wrote, we have a
310        // "rw-antidependency" which can cause write skew.
311        if our_isolation == IsolationLevel::Serializable && !our_read_set.is_empty() {
312            for (other_tx, commit_epoch) in committed.iter() {
313                if *other_tx != transaction_id && commit_epoch.as_u64() > our_start_epoch.as_u64() {
314                    // Check if that transaction wrote to any entity we read
315                    if let Some(other_info) = txns.get(other_tx) {
316                        for entity in &our_read_set {
317                            if other_info.write_set.contains(entity) {
318                                return Err(Error::Transaction(
319                                    TransactionError::SerializationFailure(format!(
320                                        "Read-write conflict on entity {:?}: \
321                                         another transaction modified data we read",
322                                        entity
323                                    )),
324                                ));
325                            }
326                        }
327                    }
328                }
329            }
330
331            // Also check against transactions that are already marked committed
332            // but not yet in committed_epochs map
333            for (other_tx, other_info) in txns.iter() {
334                if *other_tx == transaction_id {
335                    continue;
336                }
337                if other_info.state == TransactionState::Committed {
338                    // If we can see their write set and we read something they wrote
339                    for entity in &our_read_set {
340                        if other_info.write_set.contains(entity) {
341                            // Check if they committed after we started
342                            if let Some(commit_epoch) = committed.get(other_tx)
343                                && commit_epoch.as_u64() > our_start_epoch.as_u64()
344                            {
345                                return Err(Error::Transaction(
346                                    TransactionError::SerializationFailure(format!(
347                                        "Read-write conflict on entity {:?}: \
348                                             another transaction modified data we read",
349                                        entity
350                                    )),
351                                ));
352                            }
353                        }
354                    }
355                }
356            }
357        }
358
359        // Commit successful - advance epoch atomically
360        // SeqCst ensures all threads see commits in a consistent total order
361        let commit_epoch = EpochId::new(self.current_epoch.fetch_add(1, Ordering::SeqCst) + 1);
362
363        // Now update state
364        if let Some(info) = txns.get_mut(&transaction_id) {
365            info.state = TransactionState::Committed;
366        }
367        self.active_count.fetch_sub(1, Ordering::Relaxed);
368
369        // Record commit epoch (need to drop read lock first)
370        drop(committed);
371        self.committed_epochs
372            .write()
373            .insert(transaction_id, commit_epoch);
374
375        Ok(commit_epoch)
376    }
377
378    /// Aborts a transaction.
379    ///
380    /// # Errors
381    ///
382    /// Returns an error if the transaction is not active.
383    pub fn abort(&self, transaction_id: TransactionId) -> Result<()> {
384        let mut txns = self.transactions.write();
385
386        let info = txns.get_mut(&transaction_id).ok_or_else(|| {
387            Error::Transaction(TransactionError::InvalidState(
388                "Transaction not found".to_string(),
389            ))
390        })?;
391
392        if info.state != TransactionState::Active {
393            return Err(Error::Transaction(TransactionError::InvalidState(
394                "Transaction is not active".to_string(),
395            )));
396        }
397
398        info.state = TransactionState::Aborted;
399        self.active_count.fetch_sub(1, Ordering::Relaxed);
400        Ok(())
401    }
402
403    /// Returns the write set of a transaction.
404    ///
405    /// This returns a copy of the entities written by this transaction,
406    /// used for rollback to discard uncommitted versions.
407    ///
408    /// # Errors
409    ///
410    /// Returns a `TransactionError::InvalidState` if the transaction is not found.
411    pub fn get_write_set(&self, transaction_id: TransactionId) -> Result<HashSet<EntityId>> {
412        let txns = self.transactions.read();
413        let info = txns.get(&transaction_id).ok_or_else(|| {
414            Error::Transaction(TransactionError::InvalidState(
415                "Transaction not found".to_string(),
416            ))
417        })?;
418        Ok(info.write_set.clone())
419    }
420
421    /// Replaces the write set of a transaction (used for savepoint rollback).
422    ///
423    /// # Errors
424    ///
425    /// Returns an error if the transaction is not found.
426    pub fn reset_write_set(
427        &self,
428        transaction_id: TransactionId,
429        write_set: HashSet<EntityId>,
430    ) -> Result<()> {
431        let mut txns = self.transactions.write();
432        let info = txns.get_mut(&transaction_id).ok_or_else(|| {
433            Error::Transaction(TransactionError::InvalidState(
434                "Transaction not found".to_string(),
435            ))
436        })?;
437        info.write_set = write_set;
438        Ok(())
439    }
440
441    /// Aborts all active transactions.
442    ///
443    /// Used during database shutdown.
444    pub fn abort_all_active(&self) {
445        let mut txns = self.transactions.write();
446        for info in txns.values_mut() {
447            if info.state == TransactionState::Active {
448                info.state = TransactionState::Aborted;
449                self.active_count.fetch_sub(1, Ordering::Relaxed);
450            }
451        }
452    }
453
454    /// Returns the state of a transaction.
455    pub fn state(&self, transaction_id: TransactionId) -> Option<TransactionState> {
456        self.transactions
457            .read()
458            .get(&transaction_id)
459            .map(|info| info.state)
460    }
461
462    /// Returns the start epoch of a transaction.
463    pub fn start_epoch(&self, transaction_id: TransactionId) -> Option<EpochId> {
464        self.transactions
465            .read()
466            .get(&transaction_id)
467            .map(|info| info.start_epoch)
468    }
469
470    /// Returns the current epoch.
471    #[must_use]
472    pub fn current_epoch(&self) -> EpochId {
473        EpochId::new(self.current_epoch.load(Ordering::Acquire))
474    }
475
476    /// Synchronizes the epoch counter to at least the given value.
477    ///
478    /// Used after snapshot import and WAL recovery to align the
479    /// TransactionManager epoch with the store epoch.
480    pub fn sync_epoch(&self, epoch: EpochId) {
481        self.current_epoch
482            .fetch_max(epoch.as_u64(), Ordering::SeqCst);
483    }
484
485    /// Returns the minimum epoch that must be preserved for active transactions.
486    ///
487    /// This is used for garbage collection - versions visible at this epoch
488    /// must be preserved.
489    #[must_use]
490    pub fn min_active_epoch(&self) -> EpochId {
491        let txns = self.transactions.read();
492        txns.values()
493            .filter(|info| info.state == TransactionState::Active)
494            .map(|info| info.start_epoch)
495            .min()
496            .unwrap_or_else(|| self.current_epoch())
497    }
498
499    /// Returns the number of active transactions.
500    #[must_use]
501    pub fn active_count(&self) -> usize {
502        self.transactions
503            .read()
504            .values()
505            .filter(|info| info.state == TransactionState::Active)
506            .count()
507    }
508
509    /// Cleans up completed transactions that are no longer needed for conflict detection.
510    ///
511    /// A committed transaction's write set must be preserved until all transactions
512    /// that started before its commit have completed. This ensures write-write
513    /// conflict detection works correctly.
514    ///
515    /// Returns the number of transactions cleaned up.
516    pub fn gc(&self) -> usize {
517        let mut txns = self.transactions.write();
518        let mut committed = self.committed_epochs.write();
519
520        // Find the minimum start epoch among active transactions
521        let min_active_start = txns
522            .values()
523            .filter(|info| info.state == TransactionState::Active)
524            .map(|info| info.start_epoch)
525            .min();
526
527        let initial_count = txns.len();
528
529        // Collect transactions safe to remove
530        let to_remove: Vec<TransactionId> = txns
531            .iter()
532            .filter(|(transaction_id, info)| {
533                match info.state {
534                    TransactionState::Active => false, // Never remove active transactions
535                    TransactionState::Aborted => true, // Always safe to remove aborted transactions
536                    TransactionState::Committed => {
537                        // Only remove committed transactions if their commit epoch
538                        // is older than all active transactions' start epochs
539                        if let Some(min_start) = min_active_start {
540                            if let Some(commit_epoch) = committed.get(*transaction_id) {
541                                // Safe to remove if committed before all active txns started
542                                commit_epoch.as_u64() < min_start.as_u64()
543                            } else {
544                                // No commit epoch recorded, keep it to be safe
545                                false
546                            }
547                        } else {
548                            // No active transactions, safe to remove all committed
549                            true
550                        }
551                    }
552                }
553            })
554            .map(|(id, _)| *id)
555            .collect();
556
557        for id in &to_remove {
558            txns.remove(id);
559            committed.remove(id);
560        }
561
562        initial_count - txns.len()
563    }
564
565    /// Marks a transaction as committed at a specific epoch.
566    ///
567    /// Used during recovery to restore transaction state.
568    pub fn mark_committed(&self, transaction_id: TransactionId, epoch: EpochId) {
569        self.committed_epochs.write().insert(transaction_id, epoch);
570    }
571
572    /// Returns the last assigned transaction ID.
573    ///
574    /// Returns `None` if no transactions have been started yet.
575    #[must_use]
576    pub fn last_assigned_transaction_id(&self) -> Option<TransactionId> {
577        let next = self.next_transaction_id.load(Ordering::Relaxed);
578        if next > 1 {
579            Some(TransactionId::new(next - 1))
580        } else {
581            None
582        }
583    }
584}
585
586impl Default for TransactionManager {
587    fn default() -> Self {
588        Self::new()
589    }
590}
591
592#[cfg(test)]
593mod tests {
594    use super::*;
595
596    #[test]
597    fn test_begin_commit() {
598        let mgr = TransactionManager::new();
599
600        let tx = mgr.begin();
601        assert_eq!(mgr.state(tx), Some(TransactionState::Active));
602
603        let commit_epoch = mgr.commit(tx).unwrap();
604        assert_eq!(mgr.state(tx), Some(TransactionState::Committed));
605        assert!(commit_epoch.as_u64() > 0);
606    }
607
608    #[test]
609    fn test_begin_abort() {
610        let mgr = TransactionManager::new();
611
612        let tx = mgr.begin();
613        mgr.abort(tx).unwrap();
614        assert_eq!(mgr.state(tx), Some(TransactionState::Aborted));
615    }
616
617    #[test]
618    fn test_epoch_advancement() {
619        let mgr = TransactionManager::new();
620
621        let initial_epoch = mgr.current_epoch();
622
623        let tx = mgr.begin();
624        let commit_epoch = mgr.commit(tx).unwrap();
625
626        assert!(mgr.current_epoch().as_u64() > initial_epoch.as_u64());
627        assert!(commit_epoch.as_u64() > initial_epoch.as_u64());
628    }
629
630    #[test]
631    fn test_gc_preserves_needed_write_sets() {
632        let mgr = TransactionManager::new();
633
634        let tx1 = mgr.begin();
635        let tx2 = mgr.begin();
636
637        mgr.commit(tx1).unwrap();
638        // tx2 still active - started before tx1 committed
639
640        assert_eq!(mgr.active_count(), 1);
641
642        // GC should NOT remove tx1 because tx2 might need its write set for conflict detection
643        let cleaned = mgr.gc();
644        assert_eq!(cleaned, 0);
645
646        // Both transactions should remain
647        assert_eq!(mgr.state(tx1), Some(TransactionState::Committed));
648        assert_eq!(mgr.state(tx2), Some(TransactionState::Active));
649    }
650
651    #[test]
652    fn test_gc_removes_old_commits() {
653        let mgr = TransactionManager::new();
654
655        // tx1 commits at epoch 1
656        let tx1 = mgr.begin();
657        mgr.commit(tx1).unwrap();
658
659        // tx2 starts at epoch 1, commits at epoch 2
660        let tx2 = mgr.begin();
661        mgr.commit(tx2).unwrap();
662
663        // tx3 starts at epoch 2
664        let tx3 = mgr.begin();
665
666        // At this point:
667        // - tx1 committed at epoch 1, tx3 started at epoch 2 → tx1 commit < tx3 start → safe to GC
668        // - tx2 committed at epoch 2, tx3 started at epoch 2 → tx2 commit >= tx3 start → NOT safe
669        let cleaned = mgr.gc();
670        assert_eq!(cleaned, 1); // Only tx1 removed
671
672        assert_eq!(mgr.state(tx1), None);
673        assert_eq!(mgr.state(tx2), Some(TransactionState::Committed)); // Preserved for conflict detection
674        assert_eq!(mgr.state(tx3), Some(TransactionState::Active));
675
676        // After tx3 commits, tx2 can be GC'd
677        mgr.commit(tx3).unwrap();
678        let cleaned = mgr.gc();
679        assert_eq!(cleaned, 2); // tx2 and tx3 both cleaned (no active transactions)
680    }
681
682    #[test]
683    fn test_gc_removes_aborted() {
684        let mgr = TransactionManager::new();
685
686        let tx1 = mgr.begin();
687        let tx2 = mgr.begin();
688
689        mgr.abort(tx1).unwrap();
690        // tx2 still active
691
692        // Aborted transactions are always safe to remove
693        let cleaned = mgr.gc();
694        assert_eq!(cleaned, 1);
695
696        assert_eq!(mgr.state(tx1), None);
697        assert_eq!(mgr.state(tx2), Some(TransactionState::Active));
698    }
699
700    #[test]
701    fn test_write_tracking() {
702        let mgr = TransactionManager::new();
703
704        let tx = mgr.begin();
705
706        // Record writes
707        mgr.record_write(tx, NodeId::new(1)).unwrap();
708        mgr.record_write(tx, NodeId::new(2)).unwrap();
709        mgr.record_write(tx, EdgeId::new(100)).unwrap();
710
711        // Should commit successfully (no conflicts)
712        assert!(mgr.commit(tx).is_ok());
713    }
714
715    #[test]
716    fn test_min_active_epoch() {
717        let mgr = TransactionManager::new();
718
719        // No active transactions - should return current epoch
720        assert_eq!(mgr.min_active_epoch(), mgr.current_epoch());
721
722        // Start some transactions
723        let tx1 = mgr.begin();
724        let epoch1 = mgr.start_epoch(tx1).unwrap();
725
726        // Advance epoch
727        let tx2 = mgr.begin();
728        mgr.commit(tx2).unwrap();
729
730        let _tx3 = mgr.begin();
731
732        // min_active_epoch should be tx1's start epoch (earliest active)
733        assert_eq!(mgr.min_active_epoch(), epoch1);
734    }
735
736    #[test]
737    fn test_abort_all_active() {
738        let mgr = TransactionManager::new();
739
740        let tx1 = mgr.begin();
741        let tx2 = mgr.begin();
742        let tx3 = mgr.begin();
743
744        mgr.commit(tx1).unwrap();
745        // tx2 and tx3 still active
746
747        mgr.abort_all_active();
748
749        assert_eq!(mgr.state(tx1), Some(TransactionState::Committed)); // Already committed
750        assert_eq!(mgr.state(tx2), Some(TransactionState::Aborted));
751        assert_eq!(mgr.state(tx3), Some(TransactionState::Aborted));
752    }
753
754    #[test]
755    fn test_start_epoch_snapshot() {
756        let mgr = TransactionManager::new();
757
758        // Start epoch for tx1
759        let tx1 = mgr.begin();
760        let start1 = mgr.start_epoch(tx1).unwrap();
761
762        // Commit tx1, advancing epoch
763        mgr.commit(tx1).unwrap();
764
765        // Start tx2 after epoch advanced
766        let tx2 = mgr.begin();
767        let start2 = mgr.start_epoch(tx2).unwrap();
768
769        // tx2 should have a later start epoch
770        assert!(start2.as_u64() > start1.as_u64());
771    }
772
773    #[test]
774    fn test_write_write_conflict_detection() {
775        let mgr = TransactionManager::new();
776
777        // Both transactions start at the same epoch
778        let tx1 = mgr.begin();
779        let tx2 = mgr.begin();
780
781        // First writer succeeds
782        let entity = NodeId::new(42);
783        mgr.record_write(tx1, entity).unwrap();
784
785        // Second writer is rejected immediately (first-writer-wins)
786        let result = mgr.record_write(tx2, entity);
787        assert!(result.is_err());
788        assert!(
789            result
790                .unwrap_err()
791                .to_string()
792                .contains("Write-write conflict"),
793            "Expected write-write conflict error"
794        );
795
796        // First commit succeeds (no conflict at commit time either)
797        let result1 = mgr.commit(tx1);
798        assert!(result1.is_ok());
799    }
800
801    #[test]
802    fn test_commit_epoch_monotonicity() {
803        let mgr = TransactionManager::new();
804
805        let mut epochs = Vec::new();
806
807        // Commit multiple transactions and verify epochs are strictly increasing
808        for _ in 0..10 {
809            let tx = mgr.begin();
810            let epoch = mgr.commit(tx).unwrap();
811            epochs.push(epoch.as_u64());
812        }
813
814        // Verify strict monotonicity
815        for i in 1..epochs.len() {
816            assert!(
817                epochs[i] > epochs[i - 1],
818                "Epoch {} ({}) should be greater than epoch {} ({})",
819                i,
820                epochs[i],
821                i - 1,
822                epochs[i - 1]
823            );
824        }
825    }
826
827    #[test]
828    fn test_concurrent_commits_via_threads() {
829        use std::sync::Arc;
830        use std::thread;
831
832        let mgr = Arc::new(TransactionManager::new());
833        let num_threads = 10;
834        let commits_per_thread = 100;
835
836        let handles: Vec<_> = (0..num_threads)
837            .map(|_| {
838                let mgr = Arc::clone(&mgr);
839                thread::spawn(move || {
840                    let mut epochs = Vec::new();
841                    for _ in 0..commits_per_thread {
842                        let tx = mgr.begin();
843                        let epoch = mgr.commit(tx).unwrap();
844                        epochs.push(epoch.as_u64());
845                    }
846                    epochs
847                })
848            })
849            .collect();
850
851        let mut all_epochs: Vec<u64> = handles
852            .into_iter()
853            .flat_map(|h| h.join().unwrap())
854            .collect();
855
856        // All epochs should be unique (no duplicates)
857        all_epochs.sort_unstable();
858        let unique_count = all_epochs.len();
859        all_epochs.dedup();
860        assert_eq!(
861            all_epochs.len(),
862            unique_count,
863            "All commit epochs should be unique"
864        );
865
866        // Final epoch should equal number of commits
867        assert_eq!(
868            mgr.current_epoch().as_u64(),
869            (num_threads * commits_per_thread) as u64,
870            "Final epoch should equal total commits"
871        );
872    }
873
874    #[test]
875    fn test_isolation_level_default() {
876        let mgr = TransactionManager::new();
877
878        let tx = mgr.begin();
879        assert_eq!(
880            mgr.isolation_level(tx),
881            Some(IsolationLevel::SnapshotIsolation)
882        );
883    }
884
885    #[test]
886    fn test_isolation_level_explicit() {
887        let mgr = TransactionManager::new();
888
889        let transaction_rc = mgr.begin_with_isolation(IsolationLevel::ReadCommitted);
890        let transaction_si = mgr.begin_with_isolation(IsolationLevel::SnapshotIsolation);
891        let transaction_ser = mgr.begin_with_isolation(IsolationLevel::Serializable);
892
893        assert_eq!(
894            mgr.isolation_level(transaction_rc),
895            Some(IsolationLevel::ReadCommitted)
896        );
897        assert_eq!(
898            mgr.isolation_level(transaction_si),
899            Some(IsolationLevel::SnapshotIsolation)
900        );
901        assert_eq!(
902            mgr.isolation_level(transaction_ser),
903            Some(IsolationLevel::Serializable)
904        );
905    }
906
907    #[test]
908    fn test_ssi_read_write_conflict_detected() {
909        let mgr = TransactionManager::new();
910
911        // tx1 starts with Serializable isolation
912        let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
913
914        // tx2 starts and will modify an entity
915        let tx2 = mgr.begin();
916
917        // tx1 reads entity 42
918        let entity = NodeId::new(42);
919        mgr.record_read(tx1, entity).unwrap();
920
921        // tx2 writes to the same entity and commits
922        mgr.record_write(tx2, entity).unwrap();
923        mgr.commit(tx2).unwrap();
924
925        // tx1 tries to commit - should fail due to SSI read-write conflict
926        let result = mgr.commit(tx1);
927        assert!(result.is_err());
928        assert!(
929            result
930                .unwrap_err()
931                .to_string()
932                .contains("Serialization failure"),
933            "Expected serialization failure error"
934        );
935    }
936
937    #[test]
938    fn test_ssi_no_conflict_when_not_serializable() {
939        let mgr = TransactionManager::new();
940
941        // tx1 starts with default Snapshot Isolation
942        let tx1 = mgr.begin();
943
944        // tx2 starts and will modify an entity
945        let tx2 = mgr.begin();
946
947        // tx1 reads entity 42
948        let entity = NodeId::new(42);
949        mgr.record_read(tx1, entity).unwrap();
950
951        // tx2 writes to the same entity and commits
952        mgr.record_write(tx2, entity).unwrap();
953        mgr.commit(tx2).unwrap();
954
955        // tx1 should commit successfully (SI doesn't check read-write conflicts)
956        let result = mgr.commit(tx1);
957        assert!(
958            result.is_ok(),
959            "Snapshot Isolation should not detect read-write conflicts"
960        );
961    }
962
963    #[test]
964    fn test_ssi_no_conflict_when_write_before_read() {
965        let mgr = TransactionManager::new();
966
967        // tx1 writes and commits first
968        let tx1 = mgr.begin();
969        let entity = NodeId::new(42);
970        mgr.record_write(tx1, entity).unwrap();
971        mgr.commit(tx1).unwrap();
972
973        // tx2 starts AFTER tx1 committed and reads the entity
974        let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
975        mgr.record_read(tx2, entity).unwrap();
976
977        // tx2 should commit successfully (tx1 committed before tx2 started)
978        let result = mgr.commit(tx2);
979        assert!(
980            result.is_ok(),
981            "Should not conflict when writer committed before reader started"
982        );
983    }
984
985    #[test]
986    fn test_write_skew_prevented_by_ssi() {
987        // Classic write skew scenario:
988        // Account A = 50, Account B = 50, constraint: A + B >= 0
989        // T1 reads A, B, writes A = A - 100
990        // T2 reads A, B, writes B = B - 100
991        // Without SSI, both could commit violating the constraint.
992
993        let mgr = TransactionManager::new();
994
995        let account_a = NodeId::new(1);
996        let account_b = NodeId::new(2);
997
998        // T1 and T2 both start with Serializable isolation
999        let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1000        let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1001
1002        // Both read both accounts
1003        mgr.record_read(tx1, account_a).unwrap();
1004        mgr.record_read(tx1, account_b).unwrap();
1005        mgr.record_read(tx2, account_a).unwrap();
1006        mgr.record_read(tx2, account_b).unwrap();
1007
1008        // T1 writes to A, T2 writes to B (no write-write conflict)
1009        mgr.record_write(tx1, account_a).unwrap();
1010        mgr.record_write(tx2, account_b).unwrap();
1011
1012        // T1 commits first
1013        let result1 = mgr.commit(tx1);
1014        assert!(result1.is_ok(), "First commit should succeed");
1015
1016        // T2 tries to commit - should fail because it read account_a which T1 wrote
1017        let result2 = mgr.commit(tx2);
1018        assert!(result2.is_err(), "Second commit should fail due to SSI");
1019        assert!(
1020            result2
1021                .unwrap_err()
1022                .to_string()
1023                .contains("Serialization failure"),
1024            "Expected serialization failure error for write skew prevention"
1025        );
1026    }
1027
1028    #[test]
1029    fn test_read_committed_allows_non_repeatable_reads() {
1030        let mgr = TransactionManager::new();
1031
1032        // tx1 starts with ReadCommitted isolation
1033        let tx1 = mgr.begin_with_isolation(IsolationLevel::ReadCommitted);
1034        let entity = NodeId::new(42);
1035
1036        // tx1 reads entity
1037        mgr.record_read(tx1, entity).unwrap();
1038
1039        // tx2 writes and commits
1040        let tx2 = mgr.begin();
1041        mgr.record_write(tx2, entity).unwrap();
1042        mgr.commit(tx2).unwrap();
1043
1044        // tx1 can still commit (ReadCommitted allows non-repeatable reads)
1045        let result = mgr.commit(tx1);
1046        assert!(
1047            result.is_ok(),
1048            "ReadCommitted should allow non-repeatable reads"
1049        );
1050    }
1051
1052    #[test]
1053    fn test_isolation_level_debug() {
1054        assert_eq!(
1055            format!("{:?}", IsolationLevel::ReadCommitted),
1056            "ReadCommitted"
1057        );
1058        assert_eq!(
1059            format!("{:?}", IsolationLevel::SnapshotIsolation),
1060            "SnapshotIsolation"
1061        );
1062        assert_eq!(
1063            format!("{:?}", IsolationLevel::Serializable),
1064            "Serializable"
1065        );
1066    }
1067
1068    #[test]
1069    fn test_isolation_level_default_trait() {
1070        let default: IsolationLevel = Default::default();
1071        assert_eq!(default, IsolationLevel::SnapshotIsolation);
1072    }
1073
1074    #[test]
1075    fn test_ssi_concurrent_reads_no_conflict() {
1076        let mgr = TransactionManager::new();
1077
1078        let entity = NodeId::new(42);
1079
1080        // Both transactions read the same entity
1081        let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1082        let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1083
1084        mgr.record_read(tx1, entity).unwrap();
1085        mgr.record_read(tx2, entity).unwrap();
1086
1087        // Both should commit successfully (read-read is not a conflict)
1088        assert!(mgr.commit(tx1).is_ok());
1089        assert!(mgr.commit(tx2).is_ok());
1090    }
1091
1092    #[test]
1093    fn test_ssi_write_write_conflict() {
1094        let mgr = TransactionManager::new();
1095
1096        let entity = NodeId::new(42);
1097
1098        // Both transactions attempt to write the same entity
1099        let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1100        let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1101
1102        // First writer succeeds
1103        mgr.record_write(tx1, entity).unwrap();
1104
1105        // Second writer is rejected immediately (first-writer-wins)
1106        let result = mgr.record_write(tx2, entity);
1107        assert!(
1108            result.is_err(),
1109            "Second record_write should fail with write-write conflict"
1110        );
1111
1112        // First commit succeeds
1113        assert!(mgr.commit(tx1).is_ok());
1114    }
1115}