Skip to main content

grafeo_engine/transaction/
manager.rs

1//! Transaction manager.
2
3use std::collections::HashSet;
4use std::sync::atomic::{AtomicU64, Ordering};
5
6use grafeo_common::types::{EdgeId, EpochId, NodeId, TransactionId};
7use grafeo_common::utils::error::{Error, Result, TransactionError};
8use grafeo_common::utils::hash::FxHashMap;
9use parking_lot::RwLock;
10
11/// State of a transaction.
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13#[non_exhaustive]
14pub enum TransactionState {
15    /// Transaction is active.
16    Active,
17    /// Transaction is committed.
18    Committed,
19    /// Transaction is aborted.
20    Aborted,
21}
22
23/// Transaction isolation level.
24///
25/// Controls the consistency guarantees and performance tradeoffs for transactions.
26///
27/// # Comparison
28///
29/// | Level | Dirty Reads | Non-Repeatable Reads | Phantom Reads | Write Skew |
30/// |-------|-------------|----------------------|---------------|------------|
31/// | ReadCommitted | No | Yes | Yes | Yes |
32/// | SnapshotIsolation | No | No | No | Yes |
33/// | Serializable | No | No | No | No |
34///
35/// # Performance
36///
37/// Higher isolation levels require more bookkeeping:
38/// - `ReadCommitted`: Only tracks writes
39/// - `SnapshotIsolation`: Tracks writes + snapshot versioning
40/// - `Serializable`: Tracks writes + reads + SSI validation
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
42#[non_exhaustive]
43pub enum IsolationLevel {
44    /// Read Committed: sees only committed data, but may see different
45    /// versions of the same row within a transaction.
46    ///
47    /// Lowest overhead, highest throughput, but weaker consistency.
48    ReadCommitted,
49
50    /// Snapshot Isolation (default): each transaction sees a consistent
51    /// snapshot as of transaction start. Prevents non-repeatable reads
52    /// and phantom reads.
53    ///
54    /// Vulnerable to write skew anomaly.
55    #[default]
56    SnapshotIsolation,
57
58    /// Serializable Snapshot Isolation (SSI): provides full serializability
59    /// by detecting read-write conflicts in addition to write-write conflicts.
60    ///
61    /// Prevents all anomalies including write skew, but may abort more
62    /// transactions due to stricter conflict detection.
63    Serializable,
64}
65
66/// Entity identifier for write tracking.
67#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
68#[non_exhaustive]
69pub enum EntityId {
70    /// A node.
71    Node(NodeId),
72    /// An edge.
73    Edge(EdgeId),
74}
75
76impl From<NodeId> for EntityId {
77    fn from(id: NodeId) -> Self {
78        Self::Node(id)
79    }
80}
81
82impl From<EdgeId> for EntityId {
83    fn from(id: EdgeId) -> Self {
84        Self::Edge(id)
85    }
86}
87
88/// Information about an active transaction.
89pub struct TransactionInfo {
90    /// Transaction state.
91    pub state: TransactionState,
92    /// Isolation level for this transaction.
93    pub isolation_level: IsolationLevel,
94    /// Start epoch (snapshot epoch for reads).
95    pub start_epoch: EpochId,
96    /// Set of entities written by this transaction.
97    pub write_set: HashSet<EntityId>,
98    /// Set of entities read by this transaction (for serializable isolation).
99    pub read_set: HashSet<EntityId>,
100}
101
102impl TransactionInfo {
103    /// Creates a new transaction info with the given isolation level.
104    fn new(start_epoch: EpochId, isolation_level: IsolationLevel) -> Self {
105        Self {
106            state: TransactionState::Active,
107            isolation_level,
108            start_epoch,
109            write_set: HashSet::new(),
110            read_set: HashSet::new(),
111        }
112    }
113}
114
115/// Manages transactions and MVCC versioning.
116pub struct TransactionManager {
117    /// Next transaction ID.
118    next_transaction_id: AtomicU64,
119    /// Current epoch.
120    current_epoch: AtomicU64,
121    /// Number of currently active transactions (for fast-path conflict skip).
122    active_count: AtomicU64,
123    /// Active transactions.
124    transactions: RwLock<FxHashMap<TransactionId, TransactionInfo>>,
125    /// Committed transaction epochs (for conflict detection).
126    /// Maps TransactionId -> commit epoch.
127    committed_epochs: RwLock<FxHashMap<TransactionId, EpochId>>,
128}
129
130impl TransactionManager {
131    /// Creates a new transaction manager.
132    #[must_use]
133    pub fn new() -> Self {
134        Self {
135            // Start at 2 to avoid collision with TransactionId::SYSTEM (which is 1)
136            // TransactionId::INVALID = u64::MAX, TransactionId::SYSTEM = 1, user transactions start at 2
137            next_transaction_id: AtomicU64::new(2),
138            current_epoch: AtomicU64::new(0),
139            active_count: AtomicU64::new(0),
140            transactions: RwLock::new(FxHashMap::default()),
141            committed_epochs: RwLock::new(FxHashMap::default()),
142        }
143    }
144
145    /// Begins a new transaction with the default isolation level (Snapshot Isolation).
146    pub fn begin(&self) -> TransactionId {
147        self.begin_with_isolation(IsolationLevel::default())
148    }
149
150    /// Begins a new transaction with the specified isolation level.
151    pub fn begin_with_isolation(&self, isolation_level: IsolationLevel) -> TransactionId {
152        let transaction_id =
153            TransactionId::new(self.next_transaction_id.fetch_add(1, Ordering::Relaxed));
154        let epoch = EpochId::new(self.current_epoch.load(Ordering::Acquire));
155
156        let info = TransactionInfo::new(epoch, isolation_level);
157        self.transactions.write().insert(transaction_id, info);
158        self.active_count.fetch_add(1, Ordering::Relaxed);
159        transaction_id
160    }
161
162    /// Returns the isolation level of a transaction.
163    pub fn isolation_level(&self, transaction_id: TransactionId) -> Option<IsolationLevel> {
164        self.transactions
165            .read()
166            .get(&transaction_id)
167            .map(|info| info.isolation_level)
168    }
169
170    /// Records a write operation for the transaction.
171    ///
172    /// Uses first-writer-wins: if another active transaction has already
173    /// written to the same entity, returns a write-write conflict error
174    /// immediately (before the caller mutates the store).
175    ///
176    /// # Errors
177    ///
178    /// Returns an error if the transaction is not active or if another
179    /// active transaction has already written to the same entity.
180    pub fn record_write(
181        &self,
182        transaction_id: TransactionId,
183        entity: impl Into<EntityId>,
184    ) -> Result<()> {
185        let entity = entity.into();
186        let mut txns = self.transactions.write();
187
188        // First-writer-wins conflict detection. Skip the scan when only one
189        // transaction is active (common case for auto-commit).
190        if self.active_count.load(Ordering::Relaxed) > 1 {
191            for (other_tx, other_info) in txns.iter() {
192                if *other_tx != transaction_id
193                    && other_info.state == TransactionState::Active
194                    && other_info.write_set.contains(&entity)
195                {
196                    return Err(Error::Transaction(TransactionError::WriteConflict(
197                        format!("Write-write conflict on entity {entity:?}"),
198                    )));
199                }
200            }
201        }
202
203        // Single lookup: get_mut for both state check and write_set insert
204        let info = txns.get_mut(&transaction_id).ok_or_else(|| {
205            Error::Transaction(TransactionError::InvalidState(
206                "Transaction not found".to_string(),
207            ))
208        })?;
209
210        if info.state != TransactionState::Active {
211            return Err(Error::Transaction(TransactionError::InvalidState(
212                "Transaction is not active".to_string(),
213            )));
214        }
215
216        info.write_set.insert(entity);
217        Ok(())
218    }
219
220    /// Records a read operation for the transaction (for serializable isolation).
221    ///
222    /// # Errors
223    ///
224    /// Returns an error if the transaction is not active.
225    pub fn record_read(
226        &self,
227        transaction_id: TransactionId,
228        entity: impl Into<EntityId>,
229    ) -> Result<()> {
230        let mut txns = self.transactions.write();
231        let info = txns.get_mut(&transaction_id).ok_or_else(|| {
232            Error::Transaction(TransactionError::InvalidState(
233                "Transaction not found".to_string(),
234            ))
235        })?;
236
237        if info.state != TransactionState::Active {
238            return Err(Error::Transaction(TransactionError::InvalidState(
239                "Transaction is not active".to_string(),
240            )));
241        }
242
243        info.read_set.insert(entity.into());
244        Ok(())
245    }
246
247    /// Commits a transaction with conflict detection.
248    ///
249    /// # Conflict Detection
250    ///
251    /// - **All isolation levels**: Write-write conflicts (two transactions writing
252    ///   to the same entity) are always detected and cause the second committer to abort.
253    ///
254    /// - **Serializable only**: Read-write conflicts (SSI validation) are additionally
255    ///   checked. If transaction T1 read an entity that another transaction T2 wrote,
256    ///   and T2 committed after T1 started, T1 will abort. This prevents write skew.
257    ///
258    /// # Errors
259    ///
260    /// Returns an error if:
261    /// - The transaction is not active
262    /// - There's a write-write conflict with another committed transaction
263    /// - (Serializable only) There's a read-write conflict (SSI violation)
264    pub fn commit(&self, transaction_id: TransactionId) -> Result<EpochId> {
265        let mut txns = self.transactions.write();
266        let committed = self.committed_epochs.read();
267
268        // First, validate the transaction exists and is active
269        let (our_isolation, our_start_epoch, our_write_set, our_read_set) = {
270            let info = txns.get(&transaction_id).ok_or_else(|| {
271                Error::Transaction(TransactionError::InvalidState(
272                    "Transaction not found".to_string(),
273                ))
274            })?;
275
276            if info.state != TransactionState::Active {
277                return Err(Error::Transaction(TransactionError::InvalidState(
278                    "Transaction is not active".to_string(),
279                )));
280            }
281
282            (
283                info.isolation_level,
284                info.start_epoch,
285                info.write_set.clone(),
286                info.read_set.clone(),
287            )
288        };
289
290        // Check for write-write conflicts with transactions that committed
291        // after our snapshot (i.e., concurrent writers to the same entities).
292        // Transactions committed before our start_epoch are part of our visible
293        // snapshot, so overwriting their values is not a conflict.
294        for (other_tx, commit_epoch) in committed.iter() {
295            if *other_tx != transaction_id && commit_epoch.as_u64() > our_start_epoch.as_u64() {
296                // Check if that transaction wrote to any of our entities
297                if let Some(other_info) = txns.get(other_tx) {
298                    for entity in &our_write_set {
299                        if other_info.write_set.contains(entity) {
300                            return Err(Error::Transaction(TransactionError::WriteConflict(
301                                format!("Write-write conflict on entity {:?}", entity),
302                            )));
303                        }
304                    }
305                }
306            }
307        }
308
309        // SSI validation for Serializable isolation level
310        // Check for read-write conflicts: if we read an entity that another
311        // transaction (that committed after we started) wrote, we have a
312        // "rw-antidependency" which can cause write skew.
313        if our_isolation == IsolationLevel::Serializable && !our_read_set.is_empty() {
314            for (other_tx, commit_epoch) in committed.iter() {
315                if *other_tx != transaction_id && commit_epoch.as_u64() > our_start_epoch.as_u64() {
316                    // Check if that transaction wrote to any entity we read
317                    if let Some(other_info) = txns.get(other_tx) {
318                        for entity in &our_read_set {
319                            if other_info.write_set.contains(entity) {
320                                return Err(Error::Transaction(
321                                    TransactionError::SerializationFailure(format!(
322                                        "Read-write conflict on entity {:?}: \
323                                         another transaction modified data we read",
324                                        entity
325                                    )),
326                                ));
327                            }
328                        }
329                    }
330                }
331            }
332
333            // Also check against transactions that are already marked committed
334            // but not yet in committed_epochs map
335            for (other_tx, other_info) in txns.iter() {
336                if *other_tx == transaction_id {
337                    continue;
338                }
339                if other_info.state == TransactionState::Committed {
340                    // If we can see their write set and we read something they wrote
341                    for entity in &our_read_set {
342                        if other_info.write_set.contains(entity) {
343                            // Check if they committed after we started
344                            if let Some(commit_epoch) = committed.get(other_tx)
345                                && commit_epoch.as_u64() > our_start_epoch.as_u64()
346                            {
347                                return Err(Error::Transaction(
348                                    TransactionError::SerializationFailure(format!(
349                                        "Read-write conflict on entity {:?}: \
350                                             another transaction modified data we read",
351                                        entity
352                                    )),
353                                ));
354                            }
355                        }
356                    }
357                }
358            }
359        }
360
361        // Commit successful - advance epoch atomically
362        // SeqCst ensures all threads see commits in a consistent total order
363        let commit_epoch = EpochId::new(self.current_epoch.fetch_add(1, Ordering::SeqCst) + 1);
364
365        // Now update state
366        if let Some(info) = txns.get_mut(&transaction_id) {
367            info.state = TransactionState::Committed;
368        }
369        self.active_count.fetch_sub(1, Ordering::Relaxed);
370
371        // Record commit epoch (need to drop read lock first)
372        drop(committed);
373        self.committed_epochs
374            .write()
375            .insert(transaction_id, commit_epoch);
376
377        Ok(commit_epoch)
378    }
379
380    /// Aborts a transaction.
381    ///
382    /// # Errors
383    ///
384    /// Returns an error if the transaction is not active.
385    pub fn abort(&self, transaction_id: TransactionId) -> Result<()> {
386        let mut txns = self.transactions.write();
387
388        let info = txns.get_mut(&transaction_id).ok_or_else(|| {
389            Error::Transaction(TransactionError::InvalidState(
390                "Transaction not found".to_string(),
391            ))
392        })?;
393
394        if info.state != TransactionState::Active {
395            return Err(Error::Transaction(TransactionError::InvalidState(
396                "Transaction is not active".to_string(),
397            )));
398        }
399
400        info.state = TransactionState::Aborted;
401        self.active_count.fetch_sub(1, Ordering::Relaxed);
402        Ok(())
403    }
404
405    /// Returns the write set of a transaction.
406    ///
407    /// This returns a copy of the entities written by this transaction,
408    /// used for rollback to discard uncommitted versions.
409    ///
410    /// # Errors
411    ///
412    /// Returns a `TransactionError::InvalidState` if the transaction is not found.
413    pub fn get_write_set(&self, transaction_id: TransactionId) -> Result<HashSet<EntityId>> {
414        let txns = self.transactions.read();
415        let info = txns.get(&transaction_id).ok_or_else(|| {
416            Error::Transaction(TransactionError::InvalidState(
417                "Transaction not found".to_string(),
418            ))
419        })?;
420        Ok(info.write_set.clone())
421    }
422
423    /// Replaces the write set of a transaction (used for savepoint rollback).
424    ///
425    /// # Errors
426    ///
427    /// Returns an error if the transaction is not found.
428    pub fn reset_write_set(
429        &self,
430        transaction_id: TransactionId,
431        write_set: HashSet<EntityId>,
432    ) -> Result<()> {
433        let mut txns = self.transactions.write();
434        let info = txns.get_mut(&transaction_id).ok_or_else(|| {
435            Error::Transaction(TransactionError::InvalidState(
436                "Transaction not found".to_string(),
437            ))
438        })?;
439        info.write_set = write_set;
440        Ok(())
441    }
442
443    /// Aborts all active transactions.
444    ///
445    /// Used during database shutdown.
446    pub fn abort_all_active(&self) {
447        let mut txns = self.transactions.write();
448        for info in txns.values_mut() {
449            if info.state == TransactionState::Active {
450                info.state = TransactionState::Aborted;
451                self.active_count.fetch_sub(1, Ordering::Relaxed);
452            }
453        }
454    }
455
456    /// Returns the state of a transaction.
457    pub fn state(&self, transaction_id: TransactionId) -> Option<TransactionState> {
458        self.transactions
459            .read()
460            .get(&transaction_id)
461            .map(|info| info.state)
462    }
463
464    /// Returns the start epoch of a transaction.
465    pub fn start_epoch(&self, transaction_id: TransactionId) -> Option<EpochId> {
466        self.transactions
467            .read()
468            .get(&transaction_id)
469            .map(|info| info.start_epoch)
470    }
471
472    /// Returns the current epoch.
473    #[must_use]
474    pub fn current_epoch(&self) -> EpochId {
475        EpochId::new(self.current_epoch.load(Ordering::Acquire))
476    }
477
478    /// Synchronizes the epoch counter to at least the given value.
479    ///
480    /// Used after snapshot import and WAL recovery to align the
481    /// TransactionManager epoch with the store epoch.
482    pub fn sync_epoch(&self, epoch: EpochId) {
483        self.current_epoch
484            .fetch_max(epoch.as_u64(), Ordering::SeqCst);
485    }
486
487    /// Returns the minimum epoch that must be preserved for active transactions.
488    ///
489    /// This is used for garbage collection - versions visible at this epoch
490    /// must be preserved.
491    #[must_use]
492    pub fn min_active_epoch(&self) -> EpochId {
493        let txns = self.transactions.read();
494        txns.values()
495            .filter(|info| info.state == TransactionState::Active)
496            .map(|info| info.start_epoch)
497            .min()
498            .unwrap_or_else(|| self.current_epoch())
499    }
500
501    /// Returns the number of active transactions.
502    #[must_use]
503    pub fn active_count(&self) -> usize {
504        self.transactions
505            .read()
506            .values()
507            .filter(|info| info.state == TransactionState::Active)
508            .count()
509    }
510
511    /// Cleans up completed transactions that are no longer needed for conflict detection.
512    ///
513    /// A committed transaction's write set must be preserved until all transactions
514    /// that started before its commit have completed. This ensures write-write
515    /// conflict detection works correctly.
516    ///
517    /// Returns the number of transactions cleaned up.
518    pub fn gc(&self) -> usize {
519        let mut txns = self.transactions.write();
520        let mut committed = self.committed_epochs.write();
521
522        // Find the minimum start epoch among active transactions
523        let min_active_start = txns
524            .values()
525            .filter(|info| info.state == TransactionState::Active)
526            .map(|info| info.start_epoch)
527            .min();
528
529        let initial_count = txns.len();
530
531        // Collect transactions safe to remove
532        let to_remove: Vec<TransactionId> = txns
533            .iter()
534            .filter(|(transaction_id, info)| {
535                match info.state {
536                    TransactionState::Active => false, // Never remove active transactions
537                    TransactionState::Aborted => true, // Always safe to remove aborted transactions
538                    TransactionState::Committed => {
539                        // Only remove committed transactions if their commit epoch
540                        // is older than all active transactions' start epochs
541                        if let Some(min_start) = min_active_start {
542                            if let Some(commit_epoch) = committed.get(*transaction_id) {
543                                // Safe to remove if committed before all active txns started
544                                commit_epoch.as_u64() < min_start.as_u64()
545                            } else {
546                                // No commit epoch recorded, keep it to be safe
547                                false
548                            }
549                        } else {
550                            // No active transactions, safe to remove all committed
551                            true
552                        }
553                    }
554                }
555            })
556            .map(|(id, _)| *id)
557            .collect();
558
559        for id in &to_remove {
560            txns.remove(id);
561            committed.remove(id);
562        }
563
564        initial_count - txns.len()
565    }
566
567    /// Marks a transaction as committed at a specific epoch.
568    ///
569    /// Used during recovery to restore transaction state.
570    pub fn mark_committed(&self, transaction_id: TransactionId, epoch: EpochId) {
571        self.committed_epochs.write().insert(transaction_id, epoch);
572    }
573
574    /// Returns the last assigned transaction ID.
575    ///
576    /// Returns `None` if no transactions have been started yet.
577    #[must_use]
578    pub fn last_assigned_transaction_id(&self) -> Option<TransactionId> {
579        let next = self.next_transaction_id.load(Ordering::Relaxed);
580        if next > 1 {
581            Some(TransactionId::new(next - 1))
582        } else {
583            None
584        }
585    }
586}
587
588impl Default for TransactionManager {
589    fn default() -> Self {
590        Self::new()
591    }
592}
593
594#[cfg(test)]
595mod tests {
596    use super::*;
597
598    #[test]
599    fn test_begin_commit() {
600        let mgr = TransactionManager::new();
601
602        let tx = mgr.begin();
603        assert_eq!(mgr.state(tx), Some(TransactionState::Active));
604
605        let commit_epoch = mgr.commit(tx).unwrap();
606        assert_eq!(mgr.state(tx), Some(TransactionState::Committed));
607        assert!(commit_epoch.as_u64() > 0);
608    }
609
610    #[test]
611    fn test_begin_abort() {
612        let mgr = TransactionManager::new();
613
614        let tx = mgr.begin();
615        mgr.abort(tx).unwrap();
616        assert_eq!(mgr.state(tx), Some(TransactionState::Aborted));
617    }
618
619    #[test]
620    fn test_epoch_advancement() {
621        let mgr = TransactionManager::new();
622
623        let initial_epoch = mgr.current_epoch();
624
625        let tx = mgr.begin();
626        let commit_epoch = mgr.commit(tx).unwrap();
627
628        assert!(mgr.current_epoch().as_u64() > initial_epoch.as_u64());
629        assert!(commit_epoch.as_u64() > initial_epoch.as_u64());
630    }
631
632    #[test]
633    fn test_gc_preserves_needed_write_sets() {
634        let mgr = TransactionManager::new();
635
636        let tx1 = mgr.begin();
637        let tx2 = mgr.begin();
638
639        mgr.commit(tx1).unwrap();
640        // tx2 still active - started before tx1 committed
641
642        assert_eq!(mgr.active_count(), 1);
643
644        // GC should NOT remove tx1 because tx2 might need its write set for conflict detection
645        let cleaned = mgr.gc();
646        assert_eq!(cleaned, 0);
647
648        // Both transactions should remain
649        assert_eq!(mgr.state(tx1), Some(TransactionState::Committed));
650        assert_eq!(mgr.state(tx2), Some(TransactionState::Active));
651    }
652
653    #[test]
654    fn test_gc_removes_old_commits() {
655        let mgr = TransactionManager::new();
656
657        // tx1 commits at epoch 1
658        let tx1 = mgr.begin();
659        mgr.commit(tx1).unwrap();
660
661        // tx2 starts at epoch 1, commits at epoch 2
662        let tx2 = mgr.begin();
663        mgr.commit(tx2).unwrap();
664
665        // tx3 starts at epoch 2
666        let tx3 = mgr.begin();
667
668        // At this point:
669        // - tx1 committed at epoch 1, tx3 started at epoch 2 → tx1 commit < tx3 start → safe to GC
670        // - tx2 committed at epoch 2, tx3 started at epoch 2 → tx2 commit >= tx3 start → NOT safe
671        let cleaned = mgr.gc();
672        assert_eq!(cleaned, 1); // Only tx1 removed
673
674        assert_eq!(mgr.state(tx1), None);
675        assert_eq!(mgr.state(tx2), Some(TransactionState::Committed)); // Preserved for conflict detection
676        assert_eq!(mgr.state(tx3), Some(TransactionState::Active));
677
678        // After tx3 commits, tx2 can be GC'd
679        mgr.commit(tx3).unwrap();
680        let cleaned = mgr.gc();
681        assert_eq!(cleaned, 2); // tx2 and tx3 both cleaned (no active transactions)
682    }
683
684    #[test]
685    fn test_gc_removes_aborted() {
686        let mgr = TransactionManager::new();
687
688        let tx1 = mgr.begin();
689        let tx2 = mgr.begin();
690
691        mgr.abort(tx1).unwrap();
692        // tx2 still active
693
694        // Aborted transactions are always safe to remove
695        let cleaned = mgr.gc();
696        assert_eq!(cleaned, 1);
697
698        assert_eq!(mgr.state(tx1), None);
699        assert_eq!(mgr.state(tx2), Some(TransactionState::Active));
700    }
701
702    #[test]
703    fn test_write_tracking() {
704        let mgr = TransactionManager::new();
705
706        let tx = mgr.begin();
707
708        // Record writes
709        mgr.record_write(tx, NodeId::new(1)).unwrap();
710        mgr.record_write(tx, NodeId::new(2)).unwrap();
711        mgr.record_write(tx, EdgeId::new(100)).unwrap();
712
713        // Should commit successfully (no conflicts)
714        assert!(mgr.commit(tx).is_ok());
715    }
716
717    #[test]
718    fn test_min_active_epoch() {
719        let mgr = TransactionManager::new();
720
721        // No active transactions - should return current epoch
722        assert_eq!(mgr.min_active_epoch(), mgr.current_epoch());
723
724        // Start some transactions
725        let tx1 = mgr.begin();
726        let epoch1 = mgr.start_epoch(tx1).unwrap();
727
728        // Advance epoch
729        let tx2 = mgr.begin();
730        mgr.commit(tx2).unwrap();
731
732        let _tx3 = mgr.begin();
733
734        // min_active_epoch should be tx1's start epoch (earliest active)
735        assert_eq!(mgr.min_active_epoch(), epoch1);
736    }
737
738    #[test]
739    fn test_abort_all_active() {
740        let mgr = TransactionManager::new();
741
742        let tx1 = mgr.begin();
743        let tx2 = mgr.begin();
744        let tx3 = mgr.begin();
745
746        mgr.commit(tx1).unwrap();
747        // tx2 and tx3 still active
748
749        mgr.abort_all_active();
750
751        assert_eq!(mgr.state(tx1), Some(TransactionState::Committed)); // Already committed
752        assert_eq!(mgr.state(tx2), Some(TransactionState::Aborted));
753        assert_eq!(mgr.state(tx3), Some(TransactionState::Aborted));
754    }
755
756    #[test]
757    fn test_start_epoch_snapshot() {
758        let mgr = TransactionManager::new();
759
760        // Start epoch for tx1
761        let tx1 = mgr.begin();
762        let start1 = mgr.start_epoch(tx1).unwrap();
763
764        // Commit tx1, advancing epoch
765        mgr.commit(tx1).unwrap();
766
767        // Start tx2 after epoch advanced
768        let tx2 = mgr.begin();
769        let start2 = mgr.start_epoch(tx2).unwrap();
770
771        // tx2 should have a later start epoch
772        assert!(start2.as_u64() > start1.as_u64());
773    }
774
775    #[test]
776    fn test_write_write_conflict_detection() {
777        let mgr = TransactionManager::new();
778
779        // Both transactions start at the same epoch
780        let tx1 = mgr.begin();
781        let tx2 = mgr.begin();
782
783        // First writer succeeds
784        let entity = NodeId::new(42);
785        mgr.record_write(tx1, entity).unwrap();
786
787        // Second writer is rejected immediately (first-writer-wins)
788        let result = mgr.record_write(tx2, entity);
789        assert!(result.is_err());
790        assert!(
791            result
792                .unwrap_err()
793                .to_string()
794                .contains("Write-write conflict"),
795            "Expected write-write conflict error"
796        );
797
798        // First commit succeeds (no conflict at commit time either)
799        let result1 = mgr.commit(tx1);
800        assert!(result1.is_ok());
801    }
802
803    #[test]
804    fn test_commit_epoch_monotonicity() {
805        let mgr = TransactionManager::new();
806
807        let mut epochs = Vec::new();
808
809        // Commit multiple transactions and verify epochs are strictly increasing
810        for _ in 0..10 {
811            let tx = mgr.begin();
812            let epoch = mgr.commit(tx).unwrap();
813            epochs.push(epoch.as_u64());
814        }
815
816        // Verify strict monotonicity
817        for i in 1..epochs.len() {
818            assert!(
819                epochs[i] > epochs[i - 1],
820                "Epoch {} ({}) should be greater than epoch {} ({})",
821                i,
822                epochs[i],
823                i - 1,
824                epochs[i - 1]
825            );
826        }
827    }
828
829    #[test]
830    fn test_concurrent_commits_via_threads() {
831        use std::sync::Arc;
832        use std::thread;
833
834        let mgr = Arc::new(TransactionManager::new());
835        let num_threads = 10;
836        let commits_per_thread = 100;
837
838        let handles: Vec<_> = (0..num_threads)
839            .map(|_| {
840                let mgr = Arc::clone(&mgr);
841                thread::spawn(move || {
842                    let mut epochs = Vec::new();
843                    for _ in 0..commits_per_thread {
844                        let tx = mgr.begin();
845                        let epoch = mgr.commit(tx).unwrap();
846                        epochs.push(epoch.as_u64());
847                    }
848                    epochs
849                })
850            })
851            .collect();
852
853        let mut all_epochs: Vec<u64> = handles
854            .into_iter()
855            .flat_map(|h| h.join().unwrap())
856            .collect();
857
858        // All epochs should be unique (no duplicates)
859        all_epochs.sort_unstable();
860        let unique_count = all_epochs.len();
861        all_epochs.dedup();
862        assert_eq!(
863            all_epochs.len(),
864            unique_count,
865            "All commit epochs should be unique"
866        );
867
868        // Final epoch should equal number of commits
869        assert_eq!(
870            mgr.current_epoch().as_u64(),
871            (num_threads * commits_per_thread) as u64,
872            "Final epoch should equal total commits"
873        );
874    }
875
876    #[test]
877    fn test_isolation_level_default() {
878        let mgr = TransactionManager::new();
879
880        let tx = mgr.begin();
881        assert_eq!(
882            mgr.isolation_level(tx),
883            Some(IsolationLevel::SnapshotIsolation)
884        );
885    }
886
887    #[test]
888    fn test_isolation_level_explicit() {
889        let mgr = TransactionManager::new();
890
891        let transaction_rc = mgr.begin_with_isolation(IsolationLevel::ReadCommitted);
892        let transaction_si = mgr.begin_with_isolation(IsolationLevel::SnapshotIsolation);
893        let transaction_ser = mgr.begin_with_isolation(IsolationLevel::Serializable);
894
895        assert_eq!(
896            mgr.isolation_level(transaction_rc),
897            Some(IsolationLevel::ReadCommitted)
898        );
899        assert_eq!(
900            mgr.isolation_level(transaction_si),
901            Some(IsolationLevel::SnapshotIsolation)
902        );
903        assert_eq!(
904            mgr.isolation_level(transaction_ser),
905            Some(IsolationLevel::Serializable)
906        );
907    }
908
909    #[test]
910    fn test_ssi_read_write_conflict_detected() {
911        let mgr = TransactionManager::new();
912
913        // tx1 starts with Serializable isolation
914        let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
915
916        // tx2 starts and will modify an entity
917        let tx2 = mgr.begin();
918
919        // tx1 reads entity 42
920        let entity = NodeId::new(42);
921        mgr.record_read(tx1, entity).unwrap();
922
923        // tx2 writes to the same entity and commits
924        mgr.record_write(tx2, entity).unwrap();
925        mgr.commit(tx2).unwrap();
926
927        // tx1 tries to commit - should fail due to SSI read-write conflict
928        let result = mgr.commit(tx1);
929        assert!(result.is_err());
930        assert!(
931            result
932                .unwrap_err()
933                .to_string()
934                .contains("Serialization failure"),
935            "Expected serialization failure error"
936        );
937    }
938
939    #[test]
940    fn test_ssi_no_conflict_when_not_serializable() {
941        let mgr = TransactionManager::new();
942
943        // tx1 starts with default Snapshot Isolation
944        let tx1 = mgr.begin();
945
946        // tx2 starts and will modify an entity
947        let tx2 = mgr.begin();
948
949        // tx1 reads entity 42
950        let entity = NodeId::new(42);
951        mgr.record_read(tx1, entity).unwrap();
952
953        // tx2 writes to the same entity and commits
954        mgr.record_write(tx2, entity).unwrap();
955        mgr.commit(tx2).unwrap();
956
957        // tx1 should commit successfully (SI doesn't check read-write conflicts)
958        let result = mgr.commit(tx1);
959        assert!(
960            result.is_ok(),
961            "Snapshot Isolation should not detect read-write conflicts"
962        );
963    }
964
965    #[test]
966    fn test_ssi_no_conflict_when_write_before_read() {
967        let mgr = TransactionManager::new();
968
969        // tx1 writes and commits first
970        let tx1 = mgr.begin();
971        let entity = NodeId::new(42);
972        mgr.record_write(tx1, entity).unwrap();
973        mgr.commit(tx1).unwrap();
974
975        // tx2 starts AFTER tx1 committed and reads the entity
976        let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
977        mgr.record_read(tx2, entity).unwrap();
978
979        // tx2 should commit successfully (tx1 committed before tx2 started)
980        let result = mgr.commit(tx2);
981        assert!(
982            result.is_ok(),
983            "Should not conflict when writer committed before reader started"
984        );
985    }
986
987    #[test]
988    fn test_write_skew_prevented_by_ssi() {
989        // Classic write skew scenario:
990        // Account A = 50, Account B = 50, constraint: A + B >= 0
991        // T1 reads A, B, writes A = A - 100
992        // T2 reads A, B, writes B = B - 100
993        // Without SSI, both could commit violating the constraint.
994
995        let mgr = TransactionManager::new();
996
997        let account_a = NodeId::new(1);
998        let account_b = NodeId::new(2);
999
1000        // T1 and T2 both start with Serializable isolation
1001        let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1002        let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1003
1004        // Both read both accounts
1005        mgr.record_read(tx1, account_a).unwrap();
1006        mgr.record_read(tx1, account_b).unwrap();
1007        mgr.record_read(tx2, account_a).unwrap();
1008        mgr.record_read(tx2, account_b).unwrap();
1009
1010        // T1 writes to A, T2 writes to B (no write-write conflict)
1011        mgr.record_write(tx1, account_a).unwrap();
1012        mgr.record_write(tx2, account_b).unwrap();
1013
1014        // T1 commits first
1015        let result1 = mgr.commit(tx1);
1016        assert!(result1.is_ok(), "First commit should succeed");
1017
1018        // T2 tries to commit - should fail because it read account_a which T1 wrote
1019        let result2 = mgr.commit(tx2);
1020        assert!(result2.is_err(), "Second commit should fail due to SSI");
1021        assert!(
1022            result2
1023                .unwrap_err()
1024                .to_string()
1025                .contains("Serialization failure"),
1026            "Expected serialization failure error for write skew prevention"
1027        );
1028    }
1029
1030    #[test]
1031    fn test_read_committed_allows_non_repeatable_reads() {
1032        let mgr = TransactionManager::new();
1033
1034        // tx1 starts with ReadCommitted isolation
1035        let tx1 = mgr.begin_with_isolation(IsolationLevel::ReadCommitted);
1036        let entity = NodeId::new(42);
1037
1038        // tx1 reads entity
1039        mgr.record_read(tx1, entity).unwrap();
1040
1041        // tx2 writes and commits
1042        let tx2 = mgr.begin();
1043        mgr.record_write(tx2, entity).unwrap();
1044        mgr.commit(tx2).unwrap();
1045
1046        // tx1 can still commit (ReadCommitted allows non-repeatable reads)
1047        let result = mgr.commit(tx1);
1048        assert!(
1049            result.is_ok(),
1050            "ReadCommitted should allow non-repeatable reads"
1051        );
1052    }
1053
1054    #[test]
1055    fn test_isolation_level_debug() {
1056        assert_eq!(
1057            format!("{:?}", IsolationLevel::ReadCommitted),
1058            "ReadCommitted"
1059        );
1060        assert_eq!(
1061            format!("{:?}", IsolationLevel::SnapshotIsolation),
1062            "SnapshotIsolation"
1063        );
1064        assert_eq!(
1065            format!("{:?}", IsolationLevel::Serializable),
1066            "Serializable"
1067        );
1068    }
1069
1070    #[test]
1071    fn test_isolation_level_default_trait() {
1072        let default: IsolationLevel = Default::default();
1073        assert_eq!(default, IsolationLevel::SnapshotIsolation);
1074    }
1075
1076    #[test]
1077    fn test_ssi_concurrent_reads_no_conflict() {
1078        let mgr = TransactionManager::new();
1079
1080        let entity = NodeId::new(42);
1081
1082        // Both transactions read the same entity
1083        let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1084        let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1085
1086        mgr.record_read(tx1, entity).unwrap();
1087        mgr.record_read(tx2, entity).unwrap();
1088
1089        // Both should commit successfully (read-read is not a conflict)
1090        assert!(mgr.commit(tx1).is_ok());
1091        assert!(mgr.commit(tx2).is_ok());
1092    }
1093
1094    #[test]
1095    fn test_ssi_write_write_conflict() {
1096        let mgr = TransactionManager::new();
1097
1098        let entity = NodeId::new(42);
1099
1100        // Both transactions attempt to write the same entity
1101        let tx1 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1102        let tx2 = mgr.begin_with_isolation(IsolationLevel::Serializable);
1103
1104        // First writer succeeds
1105        mgr.record_write(tx1, entity).unwrap();
1106
1107        // Second writer is rejected immediately (first-writer-wins)
1108        let result = mgr.record_write(tx2, entity);
1109        assert!(
1110            result.is_err(),
1111            "Second record_write should fail with write-write conflict"
1112        );
1113
1114        // First commit succeeds
1115        assert!(mgr.commit(tx1).is_ok());
1116    }
1117}