Skip to main content

grafeo_engine/transaction/
manager.rs

1//! Transaction manager.
2
3use std::collections::HashSet;
4use std::sync::atomic::{AtomicU64, Ordering};
5
6use grafeo_common::types::{EdgeId, EpochId, NodeId, TransactionId};
7use grafeo_common::utils::error::{Error, Result, TransactionError};
8use grafeo_common::utils::hash::FxHashMap;
9use parking_lot::RwLock;
10
11/// State of a transaction.
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum TransactionState {
14    /// Transaction is active.
15    Active,
16    /// Transaction is committed.
17    Committed,
18    /// Transaction is aborted.
19    Aborted,
20}
21
22/// Transaction isolation level.
23///
24/// Controls the consistency guarantees and performance tradeoffs for transactions.
25///
26/// # Comparison
27///
28/// | Level | Dirty Reads | Non-Repeatable Reads | Phantom Reads | Write Skew |
29/// |-------|-------------|----------------------|---------------|------------|
30/// | ReadCommitted | No | Yes | Yes | Yes |
31/// | SnapshotIsolation | No | No | No | Yes |
32/// | Serializable | No | No | No | No |
33///
34/// # Performance
35///
36/// Higher isolation levels require more bookkeeping:
37/// - `ReadCommitted`: Only tracks writes
38/// - `SnapshotIsolation`: Tracks writes + snapshot versioning
39/// - `Serializable`: Tracks writes + reads + SSI validation
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
41pub enum IsolationLevel {
42    /// Read Committed: sees only committed data, but may see different
43    /// versions of the same row within a transaction.
44    ///
45    /// Lowest overhead, highest throughput, but weaker consistency.
46    ReadCommitted,
47
48    /// Snapshot Isolation (default): each transaction sees a consistent
49    /// snapshot as of transaction start. Prevents non-repeatable reads
50    /// and phantom reads.
51    ///
52    /// Vulnerable to write skew anomaly.
53    #[default]
54    SnapshotIsolation,
55
56    /// Serializable Snapshot Isolation (SSI): provides full serializability
57    /// by detecting read-write conflicts in addition to write-write conflicts.
58    ///
59    /// Prevents all anomalies including write skew, but may abort more
60    /// transactions due to stricter conflict detection.
61    Serializable,
62}
63
64/// Entity identifier for write tracking.
65#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
66pub enum EntityId {
67    /// A node.
68    Node(NodeId),
69    /// An edge.
70    Edge(EdgeId),
71}
72
73impl From<NodeId> for EntityId {
74    fn from(id: NodeId) -> Self {
75        Self::Node(id)
76    }
77}
78
79impl From<EdgeId> for EntityId {
80    fn from(id: EdgeId) -> Self {
81        Self::Edge(id)
82    }
83}
84
85/// Information about an active transaction.
86pub struct TransactionInfo {
87    /// Transaction state.
88    pub state: TransactionState,
89    /// Isolation level for this transaction.
90    pub isolation_level: IsolationLevel,
91    /// Start epoch (snapshot epoch for reads).
92    pub start_epoch: EpochId,
93    /// Set of entities written by this transaction.
94    pub write_set: HashSet<EntityId>,
95    /// Set of entities read by this transaction (for serializable isolation).
96    pub read_set: HashSet<EntityId>,
97}
98
99impl TransactionInfo {
100    /// Creates a new transaction info with the given isolation level.
101    fn new(start_epoch: EpochId, isolation_level: IsolationLevel) -> Self {
102        Self {
103            state: TransactionState::Active,
104            isolation_level,
105            start_epoch,
106            write_set: HashSet::new(),
107            read_set: HashSet::new(),
108        }
109    }
110}
111
112/// Manages transactions and MVCC versioning.
113pub struct TransactionManager {
114    /// Next transaction ID.
115    next_transaction_id: AtomicU64,
116    /// Current epoch.
117    current_epoch: AtomicU64,
118    /// Active transactions.
119    transactions: RwLock<FxHashMap<TransactionId, TransactionInfo>>,
120    /// Committed transaction epochs (for conflict detection).
121    /// Maps TransactionId -> commit epoch.
122    committed_epochs: RwLock<FxHashMap<TransactionId, EpochId>>,
123}
124
125impl TransactionManager {
126    /// Creates a new transaction manager.
127    #[must_use]
128    pub fn new() -> Self {
129        Self {
130            // Start at 2 to avoid collision with TransactionId::SYSTEM (which is 1)
131            // TransactionId::INVALID = u64::MAX, TransactionId::SYSTEM = 1, user transactions start at 2
132            next_transaction_id: AtomicU64::new(2),
133            current_epoch: AtomicU64::new(0),
134            transactions: RwLock::new(FxHashMap::default()),
135            committed_epochs: RwLock::new(FxHashMap::default()),
136        }
137    }
138
139    /// Begins a new transaction with the default isolation level (Snapshot Isolation).
140    pub fn begin(&self) -> TransactionId {
141        self.begin_with_isolation(IsolationLevel::default())
142    }
143
144    /// Begins a new transaction with the specified isolation level.
145    pub fn begin_with_isolation(&self, isolation_level: IsolationLevel) -> TransactionId {
146        let transaction_id =
147            TransactionId::new(self.next_transaction_id.fetch_add(1, Ordering::Relaxed));
148        let epoch = EpochId::new(self.current_epoch.load(Ordering::Acquire));
149
150        let info = TransactionInfo::new(epoch, isolation_level);
151        self.transactions.write().insert(transaction_id, info);
152        transaction_id
153    }
154
155    /// Returns the isolation level of a transaction.
156    pub fn isolation_level(&self, transaction_id: TransactionId) -> Option<IsolationLevel> {
157        self.transactions
158            .read()
159            .get(&transaction_id)
160            .map(|info| info.isolation_level)
161    }
162
163    /// Records a write operation for the transaction.
164    ///
165    /// # Errors
166    ///
167    /// Returns an error if the transaction is not active.
168    pub fn record_write(
169        &self,
170        transaction_id: TransactionId,
171        entity: impl Into<EntityId>,
172    ) -> Result<()> {
173        let mut txns = self.transactions.write();
174        let info = txns.get_mut(&transaction_id).ok_or_else(|| {
175            Error::Transaction(TransactionError::InvalidState(
176                "Transaction not found".to_string(),
177            ))
178        })?;
179
180        if info.state != TransactionState::Active {
181            return Err(Error::Transaction(TransactionError::InvalidState(
182                "Transaction is not active".to_string(),
183            )));
184        }
185
186        info.write_set.insert(entity.into());
187        Ok(())
188    }
189
190    /// Records a read operation for the transaction (for serializable isolation).
191    ///
192    /// # Errors
193    ///
194    /// Returns an error if the transaction is not active.
195    pub fn record_read(
196        &self,
197        transaction_id: TransactionId,
198        entity: impl Into<EntityId>,
199    ) -> Result<()> {
200        let mut txns = self.transactions.write();
201        let info = txns.get_mut(&transaction_id).ok_or_else(|| {
202            Error::Transaction(TransactionError::InvalidState(
203                "Transaction not found".to_string(),
204            ))
205        })?;
206
207        if info.state != TransactionState::Active {
208            return Err(Error::Transaction(TransactionError::InvalidState(
209                "Transaction is not active".to_string(),
210            )));
211        }
212
213        info.read_set.insert(entity.into());
214        Ok(())
215    }
216
217    /// Commits a transaction with conflict detection.
218    ///
219    /// # Conflict Detection
220    ///
221    /// - **All isolation levels**: Write-write conflicts (two transactions writing
222    ///   to the same entity) are always detected and cause the second committer to abort.
223    ///
224    /// - **Serializable only**: Read-write conflicts (SSI validation) are additionally
225    ///   checked. If transaction T1 read an entity that another transaction T2 wrote,
226    ///   and T2 committed after T1 started, T1 will abort. This prevents write skew.
227    ///
228    /// # Errors
229    ///
230    /// Returns an error if:
231    /// - The transaction is not active
232    /// - There's a write-write conflict with another committed transaction
233    /// - (Serializable only) There's a read-write conflict (SSI violation)
234    pub fn commit(&self, transaction_id: TransactionId) -> Result<EpochId> {
235        let mut txns = self.transactions.write();
236        let committed = self.committed_epochs.read();
237
238        // First, validate the transaction exists and is active
239        let (our_isolation, our_start_epoch, our_write_set, our_read_set) = {
240            let info = txns.get(&transaction_id).ok_or_else(|| {
241                Error::Transaction(TransactionError::InvalidState(
242                    "Transaction not found".to_string(),
243                ))
244            })?;
245
246            if info.state != TransactionState::Active {
247                return Err(Error::Transaction(TransactionError::InvalidState(
248                    "Transaction is not active".to_string(),
249                )));
250            }
251
252            (
253                info.isolation_level,
254                info.start_epoch,
255                info.write_set.clone(),
256                info.read_set.clone(),
257            )
258        };
259
260        // Check for write-write conflicts with other committed transactions
261        for (other_tx, other_info) in txns.iter() {
262            if *other_tx == transaction_id {
263                continue;
264            }
265            if other_info.state == TransactionState::Committed {
266                // Check if any of our writes conflict with their writes
267                for entity in &our_write_set {
268                    if other_info.write_set.contains(entity) {
269                        return Err(Error::Transaction(TransactionError::WriteConflict(
270                            format!("Write-write conflict on entity {:?}", entity),
271                        )));
272                    }
273                }
274            }
275        }
276
277        // Also check against recently committed transactions
278        for (other_tx, commit_epoch) in committed.iter() {
279            if *other_tx != transaction_id && commit_epoch.as_u64() > our_start_epoch.as_u64() {
280                // Check if that transaction wrote to any of our entities
281                if let Some(other_info) = txns.get(other_tx) {
282                    for entity in &our_write_set {
283                        if other_info.write_set.contains(entity) {
284                            return Err(Error::Transaction(TransactionError::WriteConflict(
285                                format!("Write-write conflict on entity {:?}", entity),
286                            )));
287                        }
288                    }
289                }
290            }
291        }
292
293        // SSI validation for Serializable isolation level
294        // Check for read-write conflicts: if we read an entity that another
295        // transaction (that committed after we started) wrote, we have a
296        // "rw-antidependency" which can cause write skew.
297        if our_isolation == IsolationLevel::Serializable && !our_read_set.is_empty() {
298            for (other_tx, commit_epoch) in committed.iter() {
299                if *other_tx != transaction_id && commit_epoch.as_u64() > our_start_epoch.as_u64() {
300                    // Check if that transaction wrote to any entity we read
301                    if let Some(other_info) = txns.get(other_tx) {
302                        for entity in &our_read_set {
303                            if other_info.write_set.contains(entity) {
304                                return Err(Error::Transaction(
305                                    TransactionError::SerializationFailure(format!(
306                                        "Read-write conflict on entity {:?}: \
307                                         another transaction modified data we read",
308                                        entity
309                                    )),
310                                ));
311                            }
312                        }
313                    }
314                }
315            }
316
317            // Also check against transactions that are already marked committed
318            // but not yet in committed_epochs map
319            for (other_tx, other_info) in txns.iter() {
320                if *other_tx == transaction_id {
321                    continue;
322                }
323                if other_info.state == TransactionState::Committed {
324                    // If we can see their write set and we read something they wrote
325                    for entity in &our_read_set {
326                        if other_info.write_set.contains(entity) {
327                            // Check if they committed after we started
328                            if let Some(commit_epoch) = committed.get(other_tx)
329                                && commit_epoch.as_u64() > our_start_epoch.as_u64()
330                            {
331                                return Err(Error::Transaction(
332                                    TransactionError::SerializationFailure(format!(
333                                        "Read-write conflict on entity {:?}: \
334                                             another transaction modified data we read",
335                                        entity
336                                    )),
337                                ));
338                            }
339                        }
340                    }
341                }
342            }
343        }
344
345        // Commit successful - advance epoch atomically
346        // SeqCst ensures all threads see commits in a consistent total order
347        let commit_epoch = EpochId::new(self.current_epoch.fetch_add(1, Ordering::SeqCst) + 1);
348
349        // Now update state
350        if let Some(info) = txns.get_mut(&transaction_id) {
351            info.state = TransactionState::Committed;
352        }
353
354        // Record commit epoch (need to drop read lock first)
355        drop(committed);
356        self.committed_epochs
357            .write()
358            .insert(transaction_id, commit_epoch);
359
360        Ok(commit_epoch)
361    }
362
363    /// Aborts a transaction.
364    ///
365    /// # Errors
366    ///
367    /// Returns an error if the transaction is not active.
368    pub fn abort(&self, transaction_id: TransactionId) -> Result<()> {
369        let mut txns = self.transactions.write();
370
371        let info = txns.get_mut(&transaction_id).ok_or_else(|| {
372            Error::Transaction(TransactionError::InvalidState(
373                "Transaction not found".to_string(),
374            ))
375        })?;
376
377        if info.state != TransactionState::Active {
378            return Err(Error::Transaction(TransactionError::InvalidState(
379                "Transaction is not active".to_string(),
380            )));
381        }
382
383        info.state = TransactionState::Aborted;
384        Ok(())
385    }
386
387    /// Returns the write set of a transaction.
388    ///
389    /// This returns a copy of the entities written by this transaction,
390    /// used for rollback to discard uncommitted versions.
391    pub fn get_write_set(&self, transaction_id: TransactionId) -> Result<HashSet<EntityId>> {
392        let txns = self.transactions.read();
393        let info = txns.get(&transaction_id).ok_or_else(|| {
394            Error::Transaction(TransactionError::InvalidState(
395                "Transaction not found".to_string(),
396            ))
397        })?;
398        Ok(info.write_set.clone())
399    }
400
401    /// Replaces the write set of a transaction (used for savepoint rollback).
402    ///
403    /// # Errors
404    ///
405    /// Returns an error if the transaction is not found.
406    pub fn reset_write_set(
407        &self,
408        transaction_id: TransactionId,
409        write_set: HashSet<EntityId>,
410    ) -> Result<()> {
411        let mut txns = self.transactions.write();
412        let info = txns.get_mut(&transaction_id).ok_or_else(|| {
413            Error::Transaction(TransactionError::InvalidState(
414                "Transaction not found".to_string(),
415            ))
416        })?;
417        info.write_set = write_set;
418        Ok(())
419    }
420
421    /// Aborts all active transactions.
422    ///
423    /// Used during database shutdown.
424    pub fn abort_all_active(&self) {
425        let mut txns = self.transactions.write();
426        for info in txns.values_mut() {
427            if info.state == TransactionState::Active {
428                info.state = TransactionState::Aborted;
429            }
430        }
431    }
432
433    /// Returns the state of a transaction.
434    pub fn state(&self, transaction_id: TransactionId) -> Option<TransactionState> {
435        self.transactions
436            .read()
437            .get(&transaction_id)
438            .map(|info| info.state)
439    }
440
441    /// Returns the start epoch of a transaction.
442    pub fn start_epoch(&self, transaction_id: TransactionId) -> Option<EpochId> {
443        self.transactions
444            .read()
445            .get(&transaction_id)
446            .map(|info| info.start_epoch)
447    }
448
449    /// Returns the current epoch.
450    #[must_use]
451    pub fn current_epoch(&self) -> EpochId {
452        EpochId::new(self.current_epoch.load(Ordering::Acquire))
453    }
454
455    /// Returns the minimum epoch that must be preserved for active transactions.
456    ///
457    /// This is used for garbage collection - versions visible at this epoch
458    /// must be preserved.
459    #[must_use]
460    pub fn min_active_epoch(&self) -> EpochId {
461        let txns = self.transactions.read();
462        txns.values()
463            .filter(|info| info.state == TransactionState::Active)
464            .map(|info| info.start_epoch)
465            .min()
466            .unwrap_or_else(|| self.current_epoch())
467    }
468
469    /// Returns the number of active transactions.
470    #[must_use]
471    pub fn active_count(&self) -> usize {
472        self.transactions
473            .read()
474            .values()
475            .filter(|info| info.state == TransactionState::Active)
476            .count()
477    }
478
479    /// Cleans up completed transactions that are no longer needed for conflict detection.
480    ///
481    /// A committed transaction's write set must be preserved until all transactions
482    /// that started before its commit have completed. This ensures write-write
483    /// conflict detection works correctly.
484    ///
485    /// Returns the number of transactions cleaned up.
486    pub fn gc(&self) -> usize {
487        let mut txns = self.transactions.write();
488        let mut committed = self.committed_epochs.write();
489
490        // Find the minimum start epoch among active transactions
491        let min_active_start = txns
492            .values()
493            .filter(|info| info.state == TransactionState::Active)
494            .map(|info| info.start_epoch)
495            .min();
496
497        let initial_count = txns.len();
498
499        // Collect transactions safe to remove
500        let to_remove: Vec<TransactionId> = txns
501            .iter()
502            .filter(|(transaction_id, info)| {
503                match info.state {
504                    TransactionState::Active => false, // Never remove active transactions
505                    TransactionState::Aborted => true, // Always safe to remove aborted transactions
506                    TransactionState::Committed => {
507                        // Only remove committed transactions if their commit epoch
508                        // is older than all active transactions' start epochs
509                        if let Some(min_start) = min_active_start {
510                            if let Some(commit_epoch) = committed.get(*transaction_id) {
511                                // Safe to remove if committed before all active txns started
512                                commit_epoch.as_u64() < min_start.as_u64()
513                            } else {
514                                // No commit epoch recorded, keep it to be safe
515                                false
516                            }
517                        } else {
518                            // No active transactions, safe to remove all committed
519                            true
520                        }
521                    }
522                }
523            })
524            .map(|(id, _)| *id)
525            .collect();
526
527        for id in &to_remove {
528            txns.remove(id);
529            committed.remove(id);
530        }
531
532        initial_count - txns.len()
533    }
534
535    /// Marks a transaction as committed at a specific epoch.
536    ///
537    /// Used during recovery to restore transaction state.
538    pub fn mark_committed(&self, transaction_id: TransactionId, epoch: EpochId) {
539        self.committed_epochs.write().insert(transaction_id, epoch);
540    }
541
542    /// Returns the last assigned transaction ID.
543    ///
544    /// Returns `None` if no transactions have been started yet.
545    #[must_use]
546    pub fn last_assigned_transaction_id(&self) -> Option<TransactionId> {
547        let next = self.next_transaction_id.load(Ordering::Relaxed);
548        if next > 1 {
549            Some(TransactionId::new(next - 1))
550        } else {
551            None
552        }
553    }
554}
555
556impl Default for TransactionManager {
557    fn default() -> Self {
558        Self::new()
559    }
560}
561
562#[cfg(test)]
563mod tests {
564    use super::*;
565
566    #[test]
567    fn test_begin_commit() {
568        let mgr = TransactionManager::new();
569
570        let tx = mgr.begin();
571        assert_eq!(mgr.state(tx), Some(TransactionState::Active));
572
573        let commit_epoch = mgr.commit(tx).unwrap();
574        assert_eq!(mgr.state(tx), Some(TransactionState::Committed));
575        assert!(commit_epoch.as_u64() > 0);
576    }
577
578    #[test]
579    fn test_begin_abort() {
580        let mgr = TransactionManager::new();
581
582        let tx = mgr.begin();
583        mgr.abort(tx).unwrap();
584        assert_eq!(mgr.state(tx), Some(TransactionState::Aborted));
585    }
586
587    #[test]
588    fn test_epoch_advancement() {
589        let mgr = TransactionManager::new();
590
591        let initial_epoch = mgr.current_epoch();
592
593        let tx = mgr.begin();
594        let commit_epoch = mgr.commit(tx).unwrap();
595
596        assert!(mgr.current_epoch().as_u64() > initial_epoch.as_u64());
597        assert!(commit_epoch.as_u64() > initial_epoch.as_u64());
598    }
599
600    #[test]
601    fn test_gc_preserves_needed_write_sets() {
602        let mgr = TransactionManager::new();
603
604        let tx1 = mgr.begin();
605        let tx2 = mgr.begin();
606
607        mgr.commit(tx1).unwrap();
608        // tx2 still active - started before tx1 committed
609
610        assert_eq!(mgr.active_count(), 1);
611
612        // GC should NOT remove tx1 because tx2 might need its write set for conflict detection
613        let cleaned = mgr.gc();
614        assert_eq!(cleaned, 0);
615
616        // Both transactions should remain
617        assert_eq!(mgr.state(tx1), Some(TransactionState::Committed));
618        assert_eq!(mgr.state(tx2), Some(TransactionState::Active));
619    }
620
621    #[test]
622    fn test_gc_removes_old_commits() {
623        let mgr = TransactionManager::new();
624
625        // tx1 commits at epoch 1
626        let tx1 = mgr.begin();
627        mgr.commit(tx1).unwrap();
628
629        // tx2 starts at epoch 1, commits at epoch 2
630        let tx2 = mgr.begin();
631        mgr.commit(tx2).unwrap();
632
633        // tx3 starts at epoch 2
634        let tx3 = mgr.begin();
635
636        // At this point:
637        // - tx1 committed at epoch 1, tx3 started at epoch 2 → tx1 commit < tx3 start → safe to GC
638        // - tx2 committed at epoch 2, tx3 started at epoch 2 → tx2 commit >= tx3 start → NOT safe
639        let cleaned = mgr.gc();
640        assert_eq!(cleaned, 1); // Only tx1 removed
641
642        assert_eq!(mgr.state(tx1), None);
643        assert_eq!(mgr.state(tx2), Some(TransactionState::Committed)); // Preserved for conflict detection
644        assert_eq!(mgr.state(tx3), Some(TransactionState::Active));
645
646        // After tx3 commits, tx2 can be GC'd
647        mgr.commit(tx3).unwrap();
648        let cleaned = mgr.gc();
649        assert_eq!(cleaned, 2); // tx2 and tx3 both cleaned (no active transactions)
650    }
651
652    #[test]
653    fn test_gc_removes_aborted() {
654        let mgr = TransactionManager::new();
655
656        let tx1 = mgr.begin();
657        let tx2 = mgr.begin();
658
659        mgr.abort(tx1).unwrap();
660        // tx2 still active
661
662        // Aborted transactions are always safe to remove
663        let cleaned = mgr.gc();
664        assert_eq!(cleaned, 1);
665
666        assert_eq!(mgr.state(tx1), None);
667        assert_eq!(mgr.state(tx2), Some(TransactionState::Active));
668    }
669
670    #[test]
671    fn test_write_tracking() {
672        let mgr = TransactionManager::new();
673
674        let tx = mgr.begin();
675
676        // Record writes
677        mgr.record_write(tx, NodeId::new(1)).unwrap();
678        mgr.record_write(tx, NodeId::new(2)).unwrap();
679        mgr.record_write(tx, EdgeId::new(100)).unwrap();
680
681        // Should commit successfully (no conflicts)
682        assert!(mgr.commit(tx).is_ok());
683    }
684
685    #[test]
686    fn test_min_active_epoch() {
687        let mgr = TransactionManager::new();
688
689        // No active transactions - should return current epoch
690        assert_eq!(mgr.min_active_epoch(), mgr.current_epoch());
691
692        // Start some transactions
693        let tx1 = mgr.begin();
694        let epoch1 = mgr.start_epoch(tx1).unwrap();
695
696        // Advance epoch
697        let tx2 = mgr.begin();
698        mgr.commit(tx2).unwrap();
699
700        let _tx3 = mgr.begin();
701
702        // min_active_epoch should be tx1's start epoch (earliest active)
703        assert_eq!(mgr.min_active_epoch(), epoch1);
704    }
705
706    #[test]
707    fn test_abort_all_active() {
708        let mgr = TransactionManager::new();
709
710        let tx1 = mgr.begin();
711        let tx2 = mgr.begin();
712        let tx3 = mgr.begin();
713
714        mgr.commit(tx1).unwrap();
715        // tx2 and tx3 still active
716
717        mgr.abort_all_active();
718
719        assert_eq!(mgr.state(tx1), Some(TransactionState::Committed)); // Already committed
720        assert_eq!(mgr.state(tx2), Some(TransactionState::Aborted));
721        assert_eq!(mgr.state(tx3), Some(TransactionState::Aborted));
722    }
723
724    #[test]
725    fn test_start_epoch_snapshot() {
726        let mgr = TransactionManager::new();
727
728        // Start epoch for tx1
729        let tx1 = mgr.begin();
730        let start1 = mgr.start_epoch(tx1).unwrap();
731
732        // Commit tx1, advancing epoch
733        mgr.commit(tx1).unwrap();
734
735        // Start tx2 after epoch advanced
736        let tx2 = mgr.begin();
737        let start2 = mgr.start_epoch(tx2).unwrap();
738
739        // tx2 should have a later start epoch
740        assert!(start2.as_u64() > start1.as_u64());
741    }
742
743    #[test]
744    fn test_write_write_conflict_detection() {
745        let mgr = TransactionManager::new();
746
747        // Both transactions start at the same epoch
748        let tx1 = mgr.begin();
749        let tx2 = mgr.begin();
750
751        // Both try to write to the same entity
752        let entity = NodeId::new(42);
753        mgr.record_write(tx1, entity).unwrap();
754        mgr.record_write(tx2, entity).unwrap();
755
756        // First commit succeeds
757        let result1 = mgr.commit(tx1);
758        assert!(result1.is_ok());
759
760        // Second commit should fail due to write-write conflict
761        let result2 = mgr.commit(tx2);
762        assert!(result2.is_err());
763        assert!(
764            result2
765                .unwrap_err()
766                .to_string()
767                .contains("Write-write conflict"),
768            "Expected write-write conflict error"
769        );
770    }
771
772    #[test]
773    fn test_commit_epoch_monotonicity() {
774        let mgr = TransactionManager::new();
775
776        let mut epochs = Vec::new();
777
778        // Commit multiple transactions and verify epochs are strictly increasing
779        for _ in 0..10 {
780            let tx = mgr.begin();
781            let epoch = mgr.commit(tx).unwrap();
782            epochs.push(epoch.as_u64());
783        }
784
785        // Verify strict monotonicity
786        for i in 1..epochs.len() {
787            assert!(
788                epochs[i] > epochs[i - 1],
789                "Epoch {} ({}) should be greater than epoch {} ({})",
790                i,
791                epochs[i],
792                i - 1,
793                epochs[i - 1]
794            );
795        }
796    }
797
798    #[test]
799    fn test_concurrent_commits_via_threads() {
800        use std::sync::Arc;
801        use std::thread;
802
803        let mgr = Arc::new(TransactionManager::new());
804        let num_threads = 10;
805        let commits_per_thread = 100;
806
807        let handles: Vec<_> = (0..num_threads)
808            .map(|_| {
809                let mgr = Arc::clone(&mgr);
810                thread::spawn(move || {
811                    let mut epochs = Vec::new();
812                    for _ in 0..commits_per_thread {
813                        let tx = mgr.begin();
814                        let epoch = mgr.commit(tx).unwrap();
815                        epochs.push(epoch.as_u64());
816                    }
817                    epochs
818                })
819            })
820            .collect();
821
822        let mut all_epochs: Vec<u64> = handles
823            .into_iter()
824            .flat_map(|h| h.join().unwrap())
825            .collect();
826
827        // All epochs should be unique (no duplicates)
828        all_epochs.sort_unstable();
829        let unique_count = all_epochs.len();
830        all_epochs.dedup();
831        assert_eq!(
832            all_epochs.len(),
833            unique_count,
834            "All commit epochs should be unique"
835        );
836
837        // Final epoch should equal number of commits
838        assert_eq!(
839            mgr.current_epoch().as_u64(),
840            (num_threads * commits_per_thread) as u64,
841            "Final epoch should equal total commits"
842        );
843    }
844
845    #[test]
846    fn test_isolation_level_default() {
847        let mgr = TransactionManager::new();
848
849        let tx = mgr.begin();
850        assert_eq!(
851            mgr.isolation_level(tx),
852            Some(IsolationLevel::SnapshotIsolation)
853        );
854    }
855
856    #[test]
857    fn test_isolation_level_explicit() {
858        let mgr = TransactionManager::new();
859
860        let transaction_rc = mgr.begin_with_isolation(IsolationLevel::ReadCommitted);
861        let transaction_si = mgr.begin_with_isolation(IsolationLevel::SnapshotIsolation);
862        let transaction_ser = mgr.begin_with_isolation(IsolationLevel::Serializable);
863
864        assert_eq!(
865            mgr.isolation_level(transaction_rc),
866            Some(IsolationLevel::ReadCommitted)
867        );
868        assert_eq!(
869            mgr.isolation_level(transaction_si),
870            Some(IsolationLevel::SnapshotIsolation)
871        );
872        assert_eq!(
873            mgr.isolation_level(transaction_ser),
874            Some(IsolationLevel::Serializable)
875        );
876    }
877
878    #[test]
879    fn test_ssi_read_write_conflict_detected() {
880        let mgr = TransactionManager::new();
881
882        // tx1 starts with Serializable isolation
883        let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
884
885        // tx2 starts and will modify an entity
886        let tx2 = mgr.begin();
887
888        // tx1 reads entity 42
889        let entity = NodeId::new(42);
890        mgr.record_read(tx1, entity).unwrap();
891
892        // tx2 writes to the same entity and commits
893        mgr.record_write(tx2, entity).unwrap();
894        mgr.commit(tx2).unwrap();
895
896        // tx1 tries to commit - should fail due to SSI read-write conflict
897        let result = mgr.commit(tx1);
898        assert!(result.is_err());
899        assert!(
900            result
901                .unwrap_err()
902                .to_string()
903                .contains("Serialization failure"),
904            "Expected serialization failure error"
905        );
906    }
907
908    #[test]
909    fn test_ssi_no_conflict_when_not_serializable() {
910        let mgr = TransactionManager::new();
911
912        // tx1 starts with default Snapshot Isolation
913        let tx1 = mgr.begin();
914
915        // tx2 starts and will modify an entity
916        let tx2 = mgr.begin();
917
918        // tx1 reads entity 42
919        let entity = NodeId::new(42);
920        mgr.record_read(tx1, entity).unwrap();
921
922        // tx2 writes to the same entity and commits
923        mgr.record_write(tx2, entity).unwrap();
924        mgr.commit(tx2).unwrap();
925
926        // tx1 should commit successfully (SI doesn't check read-write conflicts)
927        let result = mgr.commit(tx1);
928        assert!(
929            result.is_ok(),
930            "Snapshot Isolation should not detect read-write conflicts"
931        );
932    }
933
934    #[test]
935    fn test_ssi_no_conflict_when_write_before_read() {
936        let mgr = TransactionManager::new();
937
938        // tx1 writes and commits first
939        let tx1 = mgr.begin();
940        let entity = NodeId::new(42);
941        mgr.record_write(tx1, entity).unwrap();
942        mgr.commit(tx1).unwrap();
943
944        // tx2 starts AFTER tx1 committed and reads the entity
945        let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
946        mgr.record_read(tx2, entity).unwrap();
947
948        // tx2 should commit successfully (tx1 committed before tx2 started)
949        let result = mgr.commit(tx2);
950        assert!(
951            result.is_ok(),
952            "Should not conflict when writer committed before reader started"
953        );
954    }
955
956    #[test]
957    fn test_write_skew_prevented_by_ssi() {
958        // Classic write skew scenario:
959        // Account A = 50, Account B = 50, constraint: A + B >= 0
960        // T1 reads A, B, writes A = A - 100
961        // T2 reads A, B, writes B = B - 100
962        // Without SSI, both could commit violating the constraint.
963
964        let mgr = TransactionManager::new();
965
966        let account_a = NodeId::new(1);
967        let account_b = NodeId::new(2);
968
969        // T1 and T2 both start with Serializable isolation
970        let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
971        let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
972
973        // Both read both accounts
974        mgr.record_read(tx1, account_a).unwrap();
975        mgr.record_read(tx1, account_b).unwrap();
976        mgr.record_read(tx2, account_a).unwrap();
977        mgr.record_read(tx2, account_b).unwrap();
978
979        // T1 writes to A, T2 writes to B (no write-write conflict)
980        mgr.record_write(tx1, account_a).unwrap();
981        mgr.record_write(tx2, account_b).unwrap();
982
983        // T1 commits first
984        let result1 = mgr.commit(tx1);
985        assert!(result1.is_ok(), "First commit should succeed");
986
987        // T2 tries to commit - should fail because it read account_a which T1 wrote
988        let result2 = mgr.commit(tx2);
989        assert!(result2.is_err(), "Second commit should fail due to SSI");
990        assert!(
991            result2
992                .unwrap_err()
993                .to_string()
994                .contains("Serialization failure"),
995            "Expected serialization failure error for write skew prevention"
996        );
997    }
998
999    #[test]
1000    fn test_read_committed_allows_non_repeatable_reads() {
1001        let mgr = TransactionManager::new();
1002
1003        // tx1 starts with ReadCommitted isolation
1004        let tx1 = mgr.begin_with_isolation(IsolationLevel::ReadCommitted);
1005        let entity = NodeId::new(42);
1006
1007        // tx1 reads entity
1008        mgr.record_read(tx1, entity).unwrap();
1009
1010        // tx2 writes and commits
1011        let tx2 = mgr.begin();
1012        mgr.record_write(tx2, entity).unwrap();
1013        mgr.commit(tx2).unwrap();
1014
1015        // tx1 can still commit (ReadCommitted allows non-repeatable reads)
1016        let result = mgr.commit(tx1);
1017        assert!(
1018            result.is_ok(),
1019            "ReadCommitted should allow non-repeatable reads"
1020        );
1021    }
1022
1023    #[test]
1024    fn test_isolation_level_debug() {
1025        assert_eq!(
1026            format!("{:?}", IsolationLevel::ReadCommitted),
1027            "ReadCommitted"
1028        );
1029        assert_eq!(
1030            format!("{:?}", IsolationLevel::SnapshotIsolation),
1031            "SnapshotIsolation"
1032        );
1033        assert_eq!(
1034            format!("{:?}", IsolationLevel::Serializable),
1035            "Serializable"
1036        );
1037    }
1038
1039    #[test]
1040    fn test_isolation_level_default_trait() {
1041        let default: IsolationLevel = Default::default();
1042        assert_eq!(default, IsolationLevel::SnapshotIsolation);
1043    }
1044
1045    #[test]
1046    fn test_ssi_concurrent_reads_no_conflict() {
1047        let mgr = TransactionManager::new();
1048
1049        let entity = NodeId::new(42);
1050
1051        // Both transactions read the same entity
1052        let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1053        let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1054
1055        mgr.record_read(tx1, entity).unwrap();
1056        mgr.record_read(tx2, entity).unwrap();
1057
1058        // Both should commit successfully (read-read is not a conflict)
1059        assert!(mgr.commit(tx1).is_ok());
1060        assert!(mgr.commit(tx2).is_ok());
1061    }
1062
1063    #[test]
1064    fn test_ssi_write_write_conflict() {
1065        let mgr = TransactionManager::new();
1066
1067        let entity = NodeId::new(42);
1068
1069        // Both transactions write the same entity
1070        let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1071        let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1072
1073        mgr.record_write(tx1, entity).unwrap();
1074        mgr.record_write(tx2, entity).unwrap();
1075
1076        // First commit succeeds
1077        assert!(mgr.commit(tx1).is_ok());
1078
1079        // Second commit fails (write-write conflict)
1080        let result = mgr.commit(tx2);
1081        assert!(result.is_err());
1082    }
1083}