Skip to main content

grafeo_engine/transaction/
manager.rs

1//! Transaction manager.
2
3use std::collections::HashSet;
4use std::sync::atomic::{AtomicU64, Ordering};
5
6use grafeo_common::types::{EdgeId, EpochId, NodeId, TransactionId};
7use grafeo_common::utils::error::{Error, Result, TransactionError};
8use grafeo_common::utils::hash::FxHashMap;
9use parking_lot::RwLock;
10
11/// State of a transaction.
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum TransactionState {
14    /// Transaction is active.
15    Active,
16    /// Transaction is committed.
17    Committed,
18    /// Transaction is aborted.
19    Aborted,
20}
21
22/// Transaction isolation level.
23///
24/// Controls the consistency guarantees and performance tradeoffs for transactions.
25///
26/// # Comparison
27///
28/// | Level | Dirty Reads | Non-Repeatable Reads | Phantom Reads | Write Skew |
29/// |-------|-------------|----------------------|---------------|------------|
30/// | ReadCommitted | No | Yes | Yes | Yes |
31/// | SnapshotIsolation | No | No | No | Yes |
32/// | Serializable | No | No | No | No |
33///
34/// # Performance
35///
36/// Higher isolation levels require more bookkeeping:
37/// - `ReadCommitted`: Only tracks writes
38/// - `SnapshotIsolation`: Tracks writes + snapshot versioning
39/// - `Serializable`: Tracks writes + reads + SSI validation
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
41pub enum IsolationLevel {
42    /// Read Committed: sees only committed data, but may see different
43    /// versions of the same row within a transaction.
44    ///
45    /// Lowest overhead, highest throughput, but weaker consistency.
46    ReadCommitted,
47
48    /// Snapshot Isolation (default): each transaction sees a consistent
49    /// snapshot as of transaction start. Prevents non-repeatable reads
50    /// and phantom reads.
51    ///
52    /// Vulnerable to write skew anomaly.
53    #[default]
54    SnapshotIsolation,
55
56    /// Serializable Snapshot Isolation (SSI): provides full serializability
57    /// by detecting read-write conflicts in addition to write-write conflicts.
58    ///
59    /// Prevents all anomalies including write skew, but may abort more
60    /// transactions due to stricter conflict detection.
61    Serializable,
62}
63
64/// Entity identifier for write tracking.
65#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
66pub enum EntityId {
67    /// A node.
68    Node(NodeId),
69    /// An edge.
70    Edge(EdgeId),
71}
72
73impl From<NodeId> for EntityId {
74    fn from(id: NodeId) -> Self {
75        Self::Node(id)
76    }
77}
78
79impl From<EdgeId> for EntityId {
80    fn from(id: EdgeId) -> Self {
81        Self::Edge(id)
82    }
83}
84
85/// Information about an active transaction.
86pub struct TransactionInfo {
87    /// Transaction state.
88    pub state: TransactionState,
89    /// Isolation level for this transaction.
90    pub isolation_level: IsolationLevel,
91    /// Start epoch (snapshot epoch for reads).
92    pub start_epoch: EpochId,
93    /// Set of entities written by this transaction.
94    pub write_set: HashSet<EntityId>,
95    /// Set of entities read by this transaction (for serializable isolation).
96    pub read_set: HashSet<EntityId>,
97}
98
99impl TransactionInfo {
100    /// Creates a new transaction info with the given isolation level.
101    fn new(start_epoch: EpochId, isolation_level: IsolationLevel) -> Self {
102        Self {
103            state: TransactionState::Active,
104            isolation_level,
105            start_epoch,
106            write_set: HashSet::new(),
107            read_set: HashSet::new(),
108        }
109    }
110}
111
112/// Manages transactions and MVCC versioning.
113pub struct TransactionManager {
114    /// Next transaction ID.
115    next_transaction_id: AtomicU64,
116    /// Current epoch.
117    current_epoch: AtomicU64,
118    /// Number of currently active transactions (for fast-path conflict skip).
119    active_count: AtomicU64,
120    /// Active transactions.
121    transactions: RwLock<FxHashMap<TransactionId, TransactionInfo>>,
122    /// Committed transaction epochs (for conflict detection).
123    /// Maps TransactionId -> commit epoch.
124    committed_epochs: RwLock<FxHashMap<TransactionId, EpochId>>,
125}
126
127impl TransactionManager {
128    /// Creates a new transaction manager.
129    #[must_use]
130    pub fn new() -> Self {
131        Self {
132            // Start at 2 to avoid collision with TransactionId::SYSTEM (which is 1)
133            // TransactionId::INVALID = u64::MAX, TransactionId::SYSTEM = 1, user transactions start at 2
134            next_transaction_id: AtomicU64::new(2),
135            current_epoch: AtomicU64::new(0),
136            active_count: AtomicU64::new(0),
137            transactions: RwLock::new(FxHashMap::default()),
138            committed_epochs: RwLock::new(FxHashMap::default()),
139        }
140    }
141
142    /// Begins a new transaction with the default isolation level (Snapshot Isolation).
143    pub fn begin(&self) -> TransactionId {
144        self.begin_with_isolation(IsolationLevel::default())
145    }
146
147    /// Begins a new transaction with the specified isolation level.
148    pub fn begin_with_isolation(&self, isolation_level: IsolationLevel) -> TransactionId {
149        let transaction_id =
150            TransactionId::new(self.next_transaction_id.fetch_add(1, Ordering::Relaxed));
151        let epoch = EpochId::new(self.current_epoch.load(Ordering::Acquire));
152
153        let info = TransactionInfo::new(epoch, isolation_level);
154        self.transactions.write().insert(transaction_id, info);
155        self.active_count.fetch_add(1, Ordering::Relaxed);
156        transaction_id
157    }
158
159    /// Returns the isolation level of a transaction.
160    pub fn isolation_level(&self, transaction_id: TransactionId) -> Option<IsolationLevel> {
161        self.transactions
162            .read()
163            .get(&transaction_id)
164            .map(|info| info.isolation_level)
165    }
166
167    /// Records a write operation for the transaction.
168    ///
169    /// Uses first-writer-wins: if another active transaction has already
170    /// written to the same entity, returns a write-write conflict error
171    /// immediately (before the caller mutates the store).
172    ///
173    /// # Errors
174    ///
175    /// Returns an error if the transaction is not active or if another
176    /// active transaction has already written to the same entity.
177    pub fn record_write(
178        &self,
179        transaction_id: TransactionId,
180        entity: impl Into<EntityId>,
181    ) -> Result<()> {
182        let entity = entity.into();
183        let mut txns = self.transactions.write();
184
185        // First-writer-wins conflict detection. Skip the scan when only one
186        // transaction is active (common case for auto-commit).
187        if self.active_count.load(Ordering::Relaxed) > 1 {
188            for (other_tx, other_info) in txns.iter() {
189                if *other_tx != transaction_id
190                    && other_info.state == TransactionState::Active
191                    && other_info.write_set.contains(&entity)
192                {
193                    return Err(Error::Transaction(TransactionError::WriteConflict(
194                        format!("Write-write conflict on entity {entity:?}"),
195                    )));
196                }
197            }
198        }
199
200        // Single lookup: get_mut for both state check and write_set insert
201        let info = txns.get_mut(&transaction_id).ok_or_else(|| {
202            Error::Transaction(TransactionError::InvalidState(
203                "Transaction not found".to_string(),
204            ))
205        })?;
206
207        if info.state != TransactionState::Active {
208            return Err(Error::Transaction(TransactionError::InvalidState(
209                "Transaction is not active".to_string(),
210            )));
211        }
212
213        info.write_set.insert(entity);
214        Ok(())
215    }
216
217    /// Records a read operation for the transaction (for serializable isolation).
218    ///
219    /// # Errors
220    ///
221    /// Returns an error if the transaction is not active.
222    pub fn record_read(
223        &self,
224        transaction_id: TransactionId,
225        entity: impl Into<EntityId>,
226    ) -> Result<()> {
227        let mut txns = self.transactions.write();
228        let info = txns.get_mut(&transaction_id).ok_or_else(|| {
229            Error::Transaction(TransactionError::InvalidState(
230                "Transaction not found".to_string(),
231            ))
232        })?;
233
234        if info.state != TransactionState::Active {
235            return Err(Error::Transaction(TransactionError::InvalidState(
236                "Transaction is not active".to_string(),
237            )));
238        }
239
240        info.read_set.insert(entity.into());
241        Ok(())
242    }
243
244    /// Commits a transaction with conflict detection.
245    ///
246    /// # Conflict Detection
247    ///
248    /// - **All isolation levels**: Write-write conflicts (two transactions writing
249    ///   to the same entity) are always detected and cause the second committer to abort.
250    ///
251    /// - **Serializable only**: Read-write conflicts (SSI validation) are additionally
252    ///   checked. If transaction T1 read an entity that another transaction T2 wrote,
253    ///   and T2 committed after T1 started, T1 will abort. This prevents write skew.
254    ///
255    /// # Errors
256    ///
257    /// Returns an error if:
258    /// - The transaction is not active
259    /// - There's a write-write conflict with another committed transaction
260    /// - (Serializable only) There's a read-write conflict (SSI violation)
261    pub fn commit(&self, transaction_id: TransactionId) -> Result<EpochId> {
262        let mut txns = self.transactions.write();
263        let committed = self.committed_epochs.read();
264
265        // First, validate the transaction exists and is active
266        let (our_isolation, our_start_epoch, our_write_set, our_read_set) = {
267            let info = txns.get(&transaction_id).ok_or_else(|| {
268                Error::Transaction(TransactionError::InvalidState(
269                    "Transaction not found".to_string(),
270                ))
271            })?;
272
273            if info.state != TransactionState::Active {
274                return Err(Error::Transaction(TransactionError::InvalidState(
275                    "Transaction is not active".to_string(),
276                )));
277            }
278
279            (
280                info.isolation_level,
281                info.start_epoch,
282                info.write_set.clone(),
283                info.read_set.clone(),
284            )
285        };
286
287        // Check for write-write conflicts with transactions that committed
288        // after our snapshot (i.e., concurrent writers to the same entities).
289        // Transactions committed before our start_epoch are part of our visible
290        // snapshot, so overwriting their values is not a conflict.
291        for (other_tx, commit_epoch) in committed.iter() {
292            if *other_tx != transaction_id && commit_epoch.as_u64() > our_start_epoch.as_u64() {
293                // Check if that transaction wrote to any of our entities
294                if let Some(other_info) = txns.get(other_tx) {
295                    for entity in &our_write_set {
296                        if other_info.write_set.contains(entity) {
297                            return Err(Error::Transaction(TransactionError::WriteConflict(
298                                format!("Write-write conflict on entity {:?}", entity),
299                            )));
300                        }
301                    }
302                }
303            }
304        }
305
306        // SSI validation for Serializable isolation level
307        // Check for read-write conflicts: if we read an entity that another
308        // transaction (that committed after we started) wrote, we have a
309        // "rw-antidependency" which can cause write skew.
310        if our_isolation == IsolationLevel::Serializable && !our_read_set.is_empty() {
311            for (other_tx, commit_epoch) in committed.iter() {
312                if *other_tx != transaction_id && commit_epoch.as_u64() > our_start_epoch.as_u64() {
313                    // Check if that transaction wrote to any entity we read
314                    if let Some(other_info) = txns.get(other_tx) {
315                        for entity in &our_read_set {
316                            if other_info.write_set.contains(entity) {
317                                return Err(Error::Transaction(
318                                    TransactionError::SerializationFailure(format!(
319                                        "Read-write conflict on entity {:?}: \
320                                         another transaction modified data we read",
321                                        entity
322                                    )),
323                                ));
324                            }
325                        }
326                    }
327                }
328            }
329
330            // Also check against transactions that are already marked committed
331            // but not yet in committed_epochs map
332            for (other_tx, other_info) in txns.iter() {
333                if *other_tx == transaction_id {
334                    continue;
335                }
336                if other_info.state == TransactionState::Committed {
337                    // If we can see their write set and we read something they wrote
338                    for entity in &our_read_set {
339                        if other_info.write_set.contains(entity) {
340                            // Check if they committed after we started
341                            if let Some(commit_epoch) = committed.get(other_tx)
342                                && commit_epoch.as_u64() > our_start_epoch.as_u64()
343                            {
344                                return Err(Error::Transaction(
345                                    TransactionError::SerializationFailure(format!(
346                                        "Read-write conflict on entity {:?}: \
347                                             another transaction modified data we read",
348                                        entity
349                                    )),
350                                ));
351                            }
352                        }
353                    }
354                }
355            }
356        }
357
358        // Commit successful - advance epoch atomically
359        // SeqCst ensures all threads see commits in a consistent total order
360        let commit_epoch = EpochId::new(self.current_epoch.fetch_add(1, Ordering::SeqCst) + 1);
361
362        // Now update state
363        if let Some(info) = txns.get_mut(&transaction_id) {
364            info.state = TransactionState::Committed;
365        }
366        self.active_count.fetch_sub(1, Ordering::Relaxed);
367
368        // Record commit epoch (need to drop read lock first)
369        drop(committed);
370        self.committed_epochs
371            .write()
372            .insert(transaction_id, commit_epoch);
373
374        Ok(commit_epoch)
375    }
376
377    /// Aborts a transaction.
378    ///
379    /// # Errors
380    ///
381    /// Returns an error if the transaction is not active.
382    pub fn abort(&self, transaction_id: TransactionId) -> Result<()> {
383        let mut txns = self.transactions.write();
384
385        let info = txns.get_mut(&transaction_id).ok_or_else(|| {
386            Error::Transaction(TransactionError::InvalidState(
387                "Transaction not found".to_string(),
388            ))
389        })?;
390
391        if info.state != TransactionState::Active {
392            return Err(Error::Transaction(TransactionError::InvalidState(
393                "Transaction is not active".to_string(),
394            )));
395        }
396
397        info.state = TransactionState::Aborted;
398        self.active_count.fetch_sub(1, Ordering::Relaxed);
399        Ok(())
400    }
401
402    /// Returns the write set of a transaction.
403    ///
404    /// This returns a copy of the entities written by this transaction,
405    /// used for rollback to discard uncommitted versions.
406    pub fn get_write_set(&self, transaction_id: TransactionId) -> Result<HashSet<EntityId>> {
407        let txns = self.transactions.read();
408        let info = txns.get(&transaction_id).ok_or_else(|| {
409            Error::Transaction(TransactionError::InvalidState(
410                "Transaction not found".to_string(),
411            ))
412        })?;
413        Ok(info.write_set.clone())
414    }
415
416    /// Replaces the write set of a transaction (used for savepoint rollback).
417    ///
418    /// # Errors
419    ///
420    /// Returns an error if the transaction is not found.
421    pub fn reset_write_set(
422        &self,
423        transaction_id: TransactionId,
424        write_set: HashSet<EntityId>,
425    ) -> Result<()> {
426        let mut txns = self.transactions.write();
427        let info = txns.get_mut(&transaction_id).ok_or_else(|| {
428            Error::Transaction(TransactionError::InvalidState(
429                "Transaction not found".to_string(),
430            ))
431        })?;
432        info.write_set = write_set;
433        Ok(())
434    }
435
436    /// Aborts all active transactions.
437    ///
438    /// Used during database shutdown.
439    pub fn abort_all_active(&self) {
440        let mut txns = self.transactions.write();
441        for info in txns.values_mut() {
442            if info.state == TransactionState::Active {
443                info.state = TransactionState::Aborted;
444                self.active_count.fetch_sub(1, Ordering::Relaxed);
445            }
446        }
447    }
448
449    /// Returns the state of a transaction.
450    pub fn state(&self, transaction_id: TransactionId) -> Option<TransactionState> {
451        self.transactions
452            .read()
453            .get(&transaction_id)
454            .map(|info| info.state)
455    }
456
457    /// Returns the start epoch of a transaction.
458    pub fn start_epoch(&self, transaction_id: TransactionId) -> Option<EpochId> {
459        self.transactions
460            .read()
461            .get(&transaction_id)
462            .map(|info| info.start_epoch)
463    }
464
465    /// Returns the current epoch.
466    #[must_use]
467    pub fn current_epoch(&self) -> EpochId {
468        EpochId::new(self.current_epoch.load(Ordering::Acquire))
469    }
470
471    /// Synchronizes the epoch counter to at least the given value.
472    ///
473    /// Used after snapshot import and WAL recovery to align the
474    /// TransactionManager epoch with the store epoch.
475    pub fn sync_epoch(&self, epoch: EpochId) {
476        self.current_epoch
477            .fetch_max(epoch.as_u64(), Ordering::SeqCst);
478    }
479
480    /// Returns the minimum epoch that must be preserved for active transactions.
481    ///
482    /// This is used for garbage collection - versions visible at this epoch
483    /// must be preserved.
484    #[must_use]
485    pub fn min_active_epoch(&self) -> EpochId {
486        let txns = self.transactions.read();
487        txns.values()
488            .filter(|info| info.state == TransactionState::Active)
489            .map(|info| info.start_epoch)
490            .min()
491            .unwrap_or_else(|| self.current_epoch())
492    }
493
494    /// Returns the number of active transactions.
495    #[must_use]
496    pub fn active_count(&self) -> usize {
497        self.transactions
498            .read()
499            .values()
500            .filter(|info| info.state == TransactionState::Active)
501            .count()
502    }
503
504    /// Cleans up completed transactions that are no longer needed for conflict detection.
505    ///
506    /// A committed transaction's write set must be preserved until all transactions
507    /// that started before its commit have completed. This ensures write-write
508    /// conflict detection works correctly.
509    ///
510    /// Returns the number of transactions cleaned up.
511    pub fn gc(&self) -> usize {
512        let mut txns = self.transactions.write();
513        let mut committed = self.committed_epochs.write();
514
515        // Find the minimum start epoch among active transactions
516        let min_active_start = txns
517            .values()
518            .filter(|info| info.state == TransactionState::Active)
519            .map(|info| info.start_epoch)
520            .min();
521
522        let initial_count = txns.len();
523
524        // Collect transactions safe to remove
525        let to_remove: Vec<TransactionId> = txns
526            .iter()
527            .filter(|(transaction_id, info)| {
528                match info.state {
529                    TransactionState::Active => false, // Never remove active transactions
530                    TransactionState::Aborted => true, // Always safe to remove aborted transactions
531                    TransactionState::Committed => {
532                        // Only remove committed transactions if their commit epoch
533                        // is older than all active transactions' start epochs
534                        if let Some(min_start) = min_active_start {
535                            if let Some(commit_epoch) = committed.get(*transaction_id) {
536                                // Safe to remove if committed before all active txns started
537                                commit_epoch.as_u64() < min_start.as_u64()
538                            } else {
539                                // No commit epoch recorded, keep it to be safe
540                                false
541                            }
542                        } else {
543                            // No active transactions, safe to remove all committed
544                            true
545                        }
546                    }
547                }
548            })
549            .map(|(id, _)| *id)
550            .collect();
551
552        for id in &to_remove {
553            txns.remove(id);
554            committed.remove(id);
555        }
556
557        initial_count - txns.len()
558    }
559
560    /// Marks a transaction as committed at a specific epoch.
561    ///
562    /// Used during recovery to restore transaction state.
563    pub fn mark_committed(&self, transaction_id: TransactionId, epoch: EpochId) {
564        self.committed_epochs.write().insert(transaction_id, epoch);
565    }
566
567    /// Returns the last assigned transaction ID.
568    ///
569    /// Returns `None` if no transactions have been started yet.
570    #[must_use]
571    pub fn last_assigned_transaction_id(&self) -> Option<TransactionId> {
572        let next = self.next_transaction_id.load(Ordering::Relaxed);
573        if next > 1 {
574            Some(TransactionId::new(next - 1))
575        } else {
576            None
577        }
578    }
579}
580
581impl Default for TransactionManager {
582    fn default() -> Self {
583        Self::new()
584    }
585}
586
587#[cfg(test)]
588mod tests {
589    use super::*;
590
591    #[test]
592    fn test_begin_commit() {
593        let mgr = TransactionManager::new();
594
595        let tx = mgr.begin();
596        assert_eq!(mgr.state(tx), Some(TransactionState::Active));
597
598        let commit_epoch = mgr.commit(tx).unwrap();
599        assert_eq!(mgr.state(tx), Some(TransactionState::Committed));
600        assert!(commit_epoch.as_u64() > 0);
601    }
602
603    #[test]
604    fn test_begin_abort() {
605        let mgr = TransactionManager::new();
606
607        let tx = mgr.begin();
608        mgr.abort(tx).unwrap();
609        assert_eq!(mgr.state(tx), Some(TransactionState::Aborted));
610    }
611
612    #[test]
613    fn test_epoch_advancement() {
614        let mgr = TransactionManager::new();
615
616        let initial_epoch = mgr.current_epoch();
617
618        let tx = mgr.begin();
619        let commit_epoch = mgr.commit(tx).unwrap();
620
621        assert!(mgr.current_epoch().as_u64() > initial_epoch.as_u64());
622        assert!(commit_epoch.as_u64() > initial_epoch.as_u64());
623    }
624
625    #[test]
626    fn test_gc_preserves_needed_write_sets() {
627        let mgr = TransactionManager::new();
628
629        let tx1 = mgr.begin();
630        let tx2 = mgr.begin();
631
632        mgr.commit(tx1).unwrap();
633        // tx2 still active - started before tx1 committed
634
635        assert_eq!(mgr.active_count(), 1);
636
637        // GC should NOT remove tx1 because tx2 might need its write set for conflict detection
638        let cleaned = mgr.gc();
639        assert_eq!(cleaned, 0);
640
641        // Both transactions should remain
642        assert_eq!(mgr.state(tx1), Some(TransactionState::Committed));
643        assert_eq!(mgr.state(tx2), Some(TransactionState::Active));
644    }
645
646    #[test]
647    fn test_gc_removes_old_commits() {
648        let mgr = TransactionManager::new();
649
650        // tx1 commits at epoch 1
651        let tx1 = mgr.begin();
652        mgr.commit(tx1).unwrap();
653
654        // tx2 starts at epoch 1, commits at epoch 2
655        let tx2 = mgr.begin();
656        mgr.commit(tx2).unwrap();
657
658        // tx3 starts at epoch 2
659        let tx3 = mgr.begin();
660
661        // At this point:
662        // - tx1 committed at epoch 1, tx3 started at epoch 2 → tx1 commit < tx3 start → safe to GC
663        // - tx2 committed at epoch 2, tx3 started at epoch 2 → tx2 commit >= tx3 start → NOT safe
664        let cleaned = mgr.gc();
665        assert_eq!(cleaned, 1); // Only tx1 removed
666
667        assert_eq!(mgr.state(tx1), None);
668        assert_eq!(mgr.state(tx2), Some(TransactionState::Committed)); // Preserved for conflict detection
669        assert_eq!(mgr.state(tx3), Some(TransactionState::Active));
670
671        // After tx3 commits, tx2 can be GC'd
672        mgr.commit(tx3).unwrap();
673        let cleaned = mgr.gc();
674        assert_eq!(cleaned, 2); // tx2 and tx3 both cleaned (no active transactions)
675    }
676
677    #[test]
678    fn test_gc_removes_aborted() {
679        let mgr = TransactionManager::new();
680
681        let tx1 = mgr.begin();
682        let tx2 = mgr.begin();
683
684        mgr.abort(tx1).unwrap();
685        // tx2 still active
686
687        // Aborted transactions are always safe to remove
688        let cleaned = mgr.gc();
689        assert_eq!(cleaned, 1);
690
691        assert_eq!(mgr.state(tx1), None);
692        assert_eq!(mgr.state(tx2), Some(TransactionState::Active));
693    }
694
695    #[test]
696    fn test_write_tracking() {
697        let mgr = TransactionManager::new();
698
699        let tx = mgr.begin();
700
701        // Record writes
702        mgr.record_write(tx, NodeId::new(1)).unwrap();
703        mgr.record_write(tx, NodeId::new(2)).unwrap();
704        mgr.record_write(tx, EdgeId::new(100)).unwrap();
705
706        // Should commit successfully (no conflicts)
707        assert!(mgr.commit(tx).is_ok());
708    }
709
710    #[test]
711    fn test_min_active_epoch() {
712        let mgr = TransactionManager::new();
713
714        // No active transactions - should return current epoch
715        assert_eq!(mgr.min_active_epoch(), mgr.current_epoch());
716
717        // Start some transactions
718        let tx1 = mgr.begin();
719        let epoch1 = mgr.start_epoch(tx1).unwrap();
720
721        // Advance epoch
722        let tx2 = mgr.begin();
723        mgr.commit(tx2).unwrap();
724
725        let _tx3 = mgr.begin();
726
727        // min_active_epoch should be tx1's start epoch (earliest active)
728        assert_eq!(mgr.min_active_epoch(), epoch1);
729    }
730
731    #[test]
732    fn test_abort_all_active() {
733        let mgr = TransactionManager::new();
734
735        let tx1 = mgr.begin();
736        let tx2 = mgr.begin();
737        let tx3 = mgr.begin();
738
739        mgr.commit(tx1).unwrap();
740        // tx2 and tx3 still active
741
742        mgr.abort_all_active();
743
744        assert_eq!(mgr.state(tx1), Some(TransactionState::Committed)); // Already committed
745        assert_eq!(mgr.state(tx2), Some(TransactionState::Aborted));
746        assert_eq!(mgr.state(tx3), Some(TransactionState::Aborted));
747    }
748
749    #[test]
750    fn test_start_epoch_snapshot() {
751        let mgr = TransactionManager::new();
752
753        // Start epoch for tx1
754        let tx1 = mgr.begin();
755        let start1 = mgr.start_epoch(tx1).unwrap();
756
757        // Commit tx1, advancing epoch
758        mgr.commit(tx1).unwrap();
759
760        // Start tx2 after epoch advanced
761        let tx2 = mgr.begin();
762        let start2 = mgr.start_epoch(tx2).unwrap();
763
764        // tx2 should have a later start epoch
765        assert!(start2.as_u64() > start1.as_u64());
766    }
767
768    #[test]
769    fn test_write_write_conflict_detection() {
770        let mgr = TransactionManager::new();
771
772        // Both transactions start at the same epoch
773        let tx1 = mgr.begin();
774        let tx2 = mgr.begin();
775
776        // First writer succeeds
777        let entity = NodeId::new(42);
778        mgr.record_write(tx1, entity).unwrap();
779
780        // Second writer is rejected immediately (first-writer-wins)
781        let result = mgr.record_write(tx2, entity);
782        assert!(result.is_err());
783        assert!(
784            result
785                .unwrap_err()
786                .to_string()
787                .contains("Write-write conflict"),
788            "Expected write-write conflict error"
789        );
790
791        // First commit succeeds (no conflict at commit time either)
792        let result1 = mgr.commit(tx1);
793        assert!(result1.is_ok());
794    }
795
796    #[test]
797    fn test_commit_epoch_monotonicity() {
798        let mgr = TransactionManager::new();
799
800        let mut epochs = Vec::new();
801
802        // Commit multiple transactions and verify epochs are strictly increasing
803        for _ in 0..10 {
804            let tx = mgr.begin();
805            let epoch = mgr.commit(tx).unwrap();
806            epochs.push(epoch.as_u64());
807        }
808
809        // Verify strict monotonicity
810        for i in 1..epochs.len() {
811            assert!(
812                epochs[i] > epochs[i - 1],
813                "Epoch {} ({}) should be greater than epoch {} ({})",
814                i,
815                epochs[i],
816                i - 1,
817                epochs[i - 1]
818            );
819        }
820    }
821
822    #[test]
823    fn test_concurrent_commits_via_threads() {
824        use std::sync::Arc;
825        use std::thread;
826
827        let mgr = Arc::new(TransactionManager::new());
828        let num_threads = 10;
829        let commits_per_thread = 100;
830
831        let handles: Vec<_> = (0..num_threads)
832            .map(|_| {
833                let mgr = Arc::clone(&mgr);
834                thread::spawn(move || {
835                    let mut epochs = Vec::new();
836                    for _ in 0..commits_per_thread {
837                        let tx = mgr.begin();
838                        let epoch = mgr.commit(tx).unwrap();
839                        epochs.push(epoch.as_u64());
840                    }
841                    epochs
842                })
843            })
844            .collect();
845
846        let mut all_epochs: Vec<u64> = handles
847            .into_iter()
848            .flat_map(|h| h.join().unwrap())
849            .collect();
850
851        // All epochs should be unique (no duplicates)
852        all_epochs.sort_unstable();
853        let unique_count = all_epochs.len();
854        all_epochs.dedup();
855        assert_eq!(
856            all_epochs.len(),
857            unique_count,
858            "All commit epochs should be unique"
859        );
860
861        // Final epoch should equal number of commits
862        assert_eq!(
863            mgr.current_epoch().as_u64(),
864            (num_threads * commits_per_thread) as u64,
865            "Final epoch should equal total commits"
866        );
867    }
868
869    #[test]
870    fn test_isolation_level_default() {
871        let mgr = TransactionManager::new();
872
873        let tx = mgr.begin();
874        assert_eq!(
875            mgr.isolation_level(tx),
876            Some(IsolationLevel::SnapshotIsolation)
877        );
878    }
879
880    #[test]
881    fn test_isolation_level_explicit() {
882        let mgr = TransactionManager::new();
883
884        let transaction_rc = mgr.begin_with_isolation(IsolationLevel::ReadCommitted);
885        let transaction_si = mgr.begin_with_isolation(IsolationLevel::SnapshotIsolation);
886        let transaction_ser = mgr.begin_with_isolation(IsolationLevel::Serializable);
887
888        assert_eq!(
889            mgr.isolation_level(transaction_rc),
890            Some(IsolationLevel::ReadCommitted)
891        );
892        assert_eq!(
893            mgr.isolation_level(transaction_si),
894            Some(IsolationLevel::SnapshotIsolation)
895        );
896        assert_eq!(
897            mgr.isolation_level(transaction_ser),
898            Some(IsolationLevel::Serializable)
899        );
900    }
901
902    #[test]
903    fn test_ssi_read_write_conflict_detected() {
904        let mgr = TransactionManager::new();
905
906        // tx1 starts with Serializable isolation
907        let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
908
909        // tx2 starts and will modify an entity
910        let tx2 = mgr.begin();
911
912        // tx1 reads entity 42
913        let entity = NodeId::new(42);
914        mgr.record_read(tx1, entity).unwrap();
915
916        // tx2 writes to the same entity and commits
917        mgr.record_write(tx2, entity).unwrap();
918        mgr.commit(tx2).unwrap();
919
920        // tx1 tries to commit - should fail due to SSI read-write conflict
921        let result = mgr.commit(tx1);
922        assert!(result.is_err());
923        assert!(
924            result
925                .unwrap_err()
926                .to_string()
927                .contains("Serialization failure"),
928            "Expected serialization failure error"
929        );
930    }
931
932    #[test]
933    fn test_ssi_no_conflict_when_not_serializable() {
934        let mgr = TransactionManager::new();
935
936        // tx1 starts with default Snapshot Isolation
937        let tx1 = mgr.begin();
938
939        // tx2 starts and will modify an entity
940        let tx2 = mgr.begin();
941
942        // tx1 reads entity 42
943        let entity = NodeId::new(42);
944        mgr.record_read(tx1, entity).unwrap();
945
946        // tx2 writes to the same entity and commits
947        mgr.record_write(tx2, entity).unwrap();
948        mgr.commit(tx2).unwrap();
949
950        // tx1 should commit successfully (SI doesn't check read-write conflicts)
951        let result = mgr.commit(tx1);
952        assert!(
953            result.is_ok(),
954            "Snapshot Isolation should not detect read-write conflicts"
955        );
956    }
957
958    #[test]
959    fn test_ssi_no_conflict_when_write_before_read() {
960        let mgr = TransactionManager::new();
961
962        // tx1 writes and commits first
963        let tx1 = mgr.begin();
964        let entity = NodeId::new(42);
965        mgr.record_write(tx1, entity).unwrap();
966        mgr.commit(tx1).unwrap();
967
968        // tx2 starts AFTER tx1 committed and reads the entity
969        let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
970        mgr.record_read(tx2, entity).unwrap();
971
972        // tx2 should commit successfully (tx1 committed before tx2 started)
973        let result = mgr.commit(tx2);
974        assert!(
975            result.is_ok(),
976            "Should not conflict when writer committed before reader started"
977        );
978    }
979
980    #[test]
981    fn test_write_skew_prevented_by_ssi() {
982        // Classic write skew scenario:
983        // Account A = 50, Account B = 50, constraint: A + B >= 0
984        // T1 reads A, B, writes A = A - 100
985        // T2 reads A, B, writes B = B - 100
986        // Without SSI, both could commit violating the constraint.
987
988        let mgr = TransactionManager::new();
989
990        let account_a = NodeId::new(1);
991        let account_b = NodeId::new(2);
992
993        // T1 and T2 both start with Serializable isolation
994        let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
995        let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
996
997        // Both read both accounts
998        mgr.record_read(tx1, account_a).unwrap();
999        mgr.record_read(tx1, account_b).unwrap();
1000        mgr.record_read(tx2, account_a).unwrap();
1001        mgr.record_read(tx2, account_b).unwrap();
1002
1003        // T1 writes to A, T2 writes to B (no write-write conflict)
1004        mgr.record_write(tx1, account_a).unwrap();
1005        mgr.record_write(tx2, account_b).unwrap();
1006
1007        // T1 commits first
1008        let result1 = mgr.commit(tx1);
1009        assert!(result1.is_ok(), "First commit should succeed");
1010
1011        // T2 tries to commit - should fail because it read account_a which T1 wrote
1012        let result2 = mgr.commit(tx2);
1013        assert!(result2.is_err(), "Second commit should fail due to SSI");
1014        assert!(
1015            result2
1016                .unwrap_err()
1017                .to_string()
1018                .contains("Serialization failure"),
1019            "Expected serialization failure error for write skew prevention"
1020        );
1021    }
1022
1023    #[test]
1024    fn test_read_committed_allows_non_repeatable_reads() {
1025        let mgr = TransactionManager::new();
1026
1027        // tx1 starts with ReadCommitted isolation
1028        let tx1 = mgr.begin_with_isolation(IsolationLevel::ReadCommitted);
1029        let entity = NodeId::new(42);
1030
1031        // tx1 reads entity
1032        mgr.record_read(tx1, entity).unwrap();
1033
1034        // tx2 writes and commits
1035        let tx2 = mgr.begin();
1036        mgr.record_write(tx2, entity).unwrap();
1037        mgr.commit(tx2).unwrap();
1038
1039        // tx1 can still commit (ReadCommitted allows non-repeatable reads)
1040        let result = mgr.commit(tx1);
1041        assert!(
1042            result.is_ok(),
1043            "ReadCommitted should allow non-repeatable reads"
1044        );
1045    }
1046
1047    #[test]
1048    fn test_isolation_level_debug() {
1049        assert_eq!(
1050            format!("{:?}", IsolationLevel::ReadCommitted),
1051            "ReadCommitted"
1052        );
1053        assert_eq!(
1054            format!("{:?}", IsolationLevel::SnapshotIsolation),
1055            "SnapshotIsolation"
1056        );
1057        assert_eq!(
1058            format!("{:?}", IsolationLevel::Serializable),
1059            "Serializable"
1060        );
1061    }
1062
1063    #[test]
1064    fn test_isolation_level_default_trait() {
1065        let default: IsolationLevel = Default::default();
1066        assert_eq!(default, IsolationLevel::SnapshotIsolation);
1067    }
1068
1069    #[test]
1070    fn test_ssi_concurrent_reads_no_conflict() {
1071        let mgr = TransactionManager::new();
1072
1073        let entity = NodeId::new(42);
1074
1075        // Both transactions read the same entity
1076        let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1077        let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1078
1079        mgr.record_read(tx1, entity).unwrap();
1080        mgr.record_read(tx2, entity).unwrap();
1081
1082        // Both should commit successfully (read-read is not a conflict)
1083        assert!(mgr.commit(tx1).is_ok());
1084        assert!(mgr.commit(tx2).is_ok());
1085    }
1086
1087    #[test]
1088    fn test_ssi_write_write_conflict() {
1089        let mgr = TransactionManager::new();
1090
1091        let entity = NodeId::new(42);
1092
1093        // Both transactions attempt to write the same entity
1094        let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1095        let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1096
1097        // First writer succeeds
1098        mgr.record_write(tx1, entity).unwrap();
1099
1100        // Second writer is rejected immediately (first-writer-wins)
1101        let result = mgr.record_write(tx2, entity);
1102        assert!(
1103            result.is_err(),
1104            "Second record_write should fail with write-write conflict"
1105        );
1106
1107        // First commit succeeds
1108        assert!(mgr.commit(tx1).is_ok());
1109    }
1110}