Skip to main content

featherdb_mvcc/
transaction.rs

1//! Transaction management
2
3use crate::{Snapshot, VersionStore};
4use featherdb_core::{Error, Lsn, PageId, Result, TransactionConfig, TransactionId};
5use featherdb_storage::Wal;
6use parking_lot::{Mutex, RwLock};
7use std::collections::{HashMap, HashSet};
8use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
9use std::time::{Duration, Instant};
10
11/// Transaction mode
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum TransactionMode {
14    /// Read-only transaction
15    ReadOnly,
16    /// Read-write transaction
17    ReadWrite,
18}
19
20/// Transaction status
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum TransactionStatus {
23    /// Transaction is active
24    Active,
25    /// Transaction is committing
26    Committing,
27    /// Transaction has committed
28    Committed,
29    /// Transaction has been aborted
30    Aborted,
31}
32
33/// Information about a transaction
34#[derive(Debug, Clone)]
35pub struct TransactionInfo {
36    /// Transaction ID
37    pub id: u64,
38    /// Age of the transaction (time since it started)
39    pub age: Duration,
40}
41
42/// Status of garbage collection
43#[derive(Debug, Clone)]
44pub struct GcStatus {
45    /// ID of the oldest active transaction (blocks GC)
46    pub oldest_active_txn: Option<u64>,
47    /// Whether GC is currently blocked
48    pub blocked: bool,
49    /// Reason GC is blocked (if blocked)
50    pub blocked_reason: Option<String>,
51}
52
53/// Transaction metrics for monitoring
54#[derive(Debug, Default)]
55pub struct TransactionMetrics {
56    /// Number of currently active transactions
57    active_count: AtomicU64,
58    /// Total transactions committed
59    total_commits: AtomicU64,
60    /// Total transactions rolled back
61    total_rollbacks: AtomicU64,
62    /// Number of long-running transactions (updated periodically)
63    long_running_count: AtomicU64,
64    /// Total duration of all completed transactions (microseconds)
65    total_duration_us: AtomicU64,
66    /// Total number of transactions (commits + rollbacks)
67    total_transactions: AtomicU64,
68}
69
70impl TransactionMetrics {
71    pub fn new() -> Self {
72        Self::default()
73    }
74
75    pub fn record_begin(&self) {
76        self.active_count.fetch_add(1, Ordering::Relaxed);
77    }
78
79    pub fn record_commit(&self, duration: Duration) {
80        self.active_count.fetch_sub(1, Ordering::Relaxed);
81        self.total_commits.fetch_add(1, Ordering::Relaxed);
82        self.total_transactions.fetch_add(1, Ordering::Relaxed);
83        self.total_duration_us
84            .fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
85    }
86
87    pub fn record_rollback(&self, duration: Duration) {
88        self.active_count.fetch_sub(1, Ordering::Relaxed);
89        self.total_rollbacks.fetch_add(1, Ordering::Relaxed);
90        self.total_transactions.fetch_add(1, Ordering::Relaxed);
91        self.total_duration_us
92            .fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
93    }
94
95    pub fn update_long_running_count(&self, count: u64) {
96        self.long_running_count.store(count, Ordering::Relaxed);
97    }
98
99    /// Get a snapshot of metrics
100    pub fn snapshot(&self) -> TransactionMetricsSnapshot {
101        let active = self.active_count.load(Ordering::Relaxed);
102        let commits = self.total_commits.load(Ordering::Relaxed);
103        let rollbacks = self.total_rollbacks.load(Ordering::Relaxed);
104        let long_running = self.long_running_count.load(Ordering::Relaxed);
105        let total_duration = self.total_duration_us.load(Ordering::Relaxed);
106        let total = self.total_transactions.load(Ordering::Relaxed);
107
108        let avg_duration_us = if total > 0 { total_duration / total } else { 0 };
109
110        TransactionMetricsSnapshot {
111            active_count: active,
112            total_commits: commits,
113            total_rollbacks: rollbacks,
114            long_running_count: long_running,
115            avg_duration_us,
116            total_transactions: total,
117        }
118    }
119}
120
121/// Snapshot of transaction metrics at a point in time
122#[derive(Debug, Clone)]
123pub struct TransactionMetricsSnapshot {
124    /// Number of currently active transactions
125    pub active_count: u64,
126    /// Total transactions committed
127    pub total_commits: u64,
128    /// Total transactions rolled back
129    pub total_rollbacks: u64,
130    /// Number of long-running transactions
131    pub long_running_count: u64,
132    /// Average transaction duration (microseconds)
133    pub avg_duration_us: u64,
134    /// Total number of transactions
135    pub total_transactions: u64,
136}
137
138impl TransactionMetricsSnapshot {
139    /// Calculate commit rate (commits / total transactions)
140    /// Returns 0.0 if no transactions have completed
141    pub fn commit_rate(&self) -> f64 {
142        if self.total_transactions == 0 {
143            0.0
144        } else {
145            self.total_commits as f64 / self.total_transactions as f64
146        }
147    }
148}
149
150/// Internal transaction state
151#[allow(dead_code)]
152struct TransactionState {
153    id: TransactionId,
154    mode: TransactionMode,
155    status: TransactionStatus,
156    snapshot: Snapshot,
157    start_time: Instant,
158    last_lsn: Lsn,
159    /// Write set: (table_name, serialized_pk) for conflict detection
160    write_set: HashSet<(String, Vec<u8>)>,
161}
162
163/// Manages all transactions in the database
164pub struct TransactionManager {
165    /// Next transaction ID to assign
166    next_txn_id: AtomicU64,
167    /// Currently active transactions
168    active_txns: RwLock<HashMap<TransactionId, TransactionState>>,
169    /// Write lock for serializing writers
170    write_lock: Mutex<()>,
171    /// Oldest active transaction ID (for GC)
172    oldest_active: AtomicU64,
173    /// Version store for old versions
174    version_store: RwLock<VersionStore>,
175    /// Aborted transaction IDs - these transactions' writes should be invisible
176    aborted_txns: RwLock<HashSet<TransactionId>>,
177    /// Fast-path flag: true if any transaction has ever been aborted
178    has_aborted: AtomicBool,
179    /// Transaction metrics
180    metrics: TransactionMetrics,
181    /// Pages dirtied by each active transaction (for deferred flush on commit)
182    dirty_pages: RwLock<HashMap<TransactionId, HashSet<PageId>>>,
183    /// Write sets of recently committed transactions for conflict detection.
184    /// Maps committed txn_id to its write set. Cleaned up when older than oldest active txn.
185    #[allow(clippy::type_complexity)]
186    committed_writes: RwLock<HashMap<TransactionId, HashSet<(String, Vec<u8>)>>>,
187}
188
189impl TransactionManager {
190    /// Create a new transaction manager
191    ///
192    /// For new databases, use this constructor. For restoring from a persisted
193    /// database, use `TransactionManager::with_start_txn_id()` instead.
194    pub fn new() -> Self {
195        TransactionManager {
196            next_txn_id: AtomicU64::new(1),
197            active_txns: RwLock::new(HashMap::new()),
198            write_lock: Mutex::new(()),
199            oldest_active: AtomicU64::new(u64::MAX),
200            version_store: RwLock::new(VersionStore::new()),
201            aborted_txns: RwLock::new(HashSet::new()),
202            has_aborted: AtomicBool::new(false),
203            metrics: TransactionMetrics::new(),
204            dirty_pages: RwLock::new(HashMap::new()),
205            committed_writes: RwLock::new(HashMap::new()),
206        }
207    }
208
209    /// Create a new transaction manager with a specific starting transaction ID.
210    ///
211    /// Used when restoring from a persisted database to maintain MVCC visibility.
212    /// The `start_txn_id` should be the value persisted from the previous session,
213    /// ensuring transaction IDs remain monotonically increasing across restarts.
214    ///
215    /// For new databases, use `TransactionManager::new()` instead.
216    pub fn with_start_txn_id(start_txn_id: u64) -> Self {
217        TransactionManager {
218            next_txn_id: AtomicU64::new(start_txn_id),
219            active_txns: RwLock::new(HashMap::new()),
220            write_lock: Mutex::new(()),
221            oldest_active: AtomicU64::new(u64::MAX),
222            version_store: RwLock::new(VersionStore::new()),
223            aborted_txns: RwLock::new(HashSet::new()),
224            has_aborted: AtomicBool::new(false),
225            metrics: TransactionMetrics::new(),
226            dirty_pages: RwLock::new(HashMap::new()),
227            committed_writes: RwLock::new(HashMap::new()),
228        }
229    }
230
231    /// Get the current next transaction ID (for persistence)
232    ///
233    /// This value should be persisted to ensure transaction IDs remain
234    /// monotonically increasing across database restarts.
235    pub fn next_txn_id(&self) -> u64 {
236        self.next_txn_id.load(Ordering::SeqCst)
237    }
238
239    /// Begin a new transaction with default configuration
240    pub fn begin(&self, mode: TransactionMode) -> Transaction<'_> {
241        self.begin_with_config(mode, TransactionConfig::default())
242    }
243
244    /// Begin a new transaction with specific configuration
245    pub fn begin_with_config(
246        &self,
247        mode: TransactionMode,
248        config: TransactionConfig,
249    ) -> Transaction<'_> {
250        let txn_id = TransactionId(self.next_txn_id.fetch_add(1, Ordering::SeqCst));
251        let created_at = Instant::now();
252
253        // Create snapshot of current state
254        let snapshot = {
255            let active = self.active_txns.read();
256            let in_progress: HashSet<_> = active.keys().copied().collect();
257            let hwm = TransactionId(self.next_txn_id.load(Ordering::SeqCst));
258            Snapshot::new(txn_id, hwm, in_progress)
259        };
260
261        let state = TransactionState {
262            id: txn_id,
263            mode,
264            status: TransactionStatus::Active,
265            snapshot: snapshot.clone(),
266            start_time: created_at,
267            last_lsn: Lsn::ZERO,
268            write_set: HashSet::new(),
269        };
270
271        // Register transaction
272        {
273            let mut active = self.active_txns.write();
274            active.insert(txn_id, state);
275
276            // Update oldest active
277            let oldest = active.keys().min().map(|t| t.0).unwrap_or(u64::MAX);
278            self.oldest_active.store(oldest, Ordering::SeqCst);
279        }
280
281        // Record transaction begin in metrics
282        self.metrics.record_begin();
283
284        Transaction {
285            id: txn_id,
286            mode,
287            snapshot,
288            manager: self,
289            created_at,
290            config,
291            warned: false,
292        }
293    }
294
295    /// Begin a read-only transaction
296    pub fn begin_read_only(&self) -> Transaction<'_> {
297        self.begin(TransactionMode::ReadOnly)
298    }
299
300    /// Begin a read-write transaction
301    pub fn begin_read_write(&self) -> Transaction<'_> {
302        self.begin(TransactionMode::ReadWrite)
303    }
304
305    /// Commit a transaction (public API without WAL)
306    pub fn commit_txn(&self, txn_id: TransactionId) -> Result<()> {
307        self.commit(txn_id, None)
308    }
309
310    /// Abort a transaction (public API without WAL)
311    pub fn abort_txn(&self, txn_id: TransactionId) -> Result<()> {
312        self.abort(txn_id, None)
313    }
314
315    /// Record a write operation for conflict detection (first-committer-wins).
316    ///
317    /// Called by the DML executor for each row modified by INSERT, UPDATE, or DELETE.
318    /// The (table_name, pk_bytes) pair is added to the transaction's write set,
319    /// which is checked against committed write sets at commit time.
320    /// Add an ancestor to a transaction's snapshot (for savepoint support).
321    ///
322    /// When a savepoint creates a sub-transaction, the sub-transaction needs to
323    /// recognize the parent transaction as an ancestor so that write-write conflict
324    /// detection doesn't flag writes from the same logical transaction.
325    pub fn add_snapshot_ancestor(&self, txn_id: TransactionId, ancestor: TransactionId) {
326        let mut active = self.active_txns.write();
327        if let Some(state) = active.get_mut(&txn_id) {
328            state.snapshot.add_ancestor(ancestor);
329        }
330    }
331
332    /// Record a write and check for conflicts (first-committer-wins).
333    ///
334    /// Returns `Err(WriteConflict)` if another transaction has already written
335    /// to the same (table, pk) — either committed after our snapshot or still active.
336    pub fn record_write(
337        &self,
338        txn_id: TransactionId,
339        table: String,
340        pk_bytes: Vec<u8>,
341    ) -> Result<()> {
342        let entry = (table.clone(), pk_bytes);
343
344        // Get our snapshot for visibility checks
345        let snapshot = {
346            let active = self.active_txns.read();
347            match active.get(&txn_id) {
348                Some(state) => state.snapshot.clone(),
349                None => return Ok(()), // txn not found, nothing to do
350            }
351        };
352
353        // Check committed_writes for conflicts (txns committed after our snapshot)
354        {
355            let committed = self.committed_writes.read();
356            for (&committed_txn_id, committed_ws) in committed.iter() {
357                if !snapshot.can_see(committed_txn_id) && committed_ws.contains(&entry) {
358                    return Err(Error::WriteConflict { table });
359                }
360            }
361        }
362
363        // Check active transactions' write sets for conflicts (other in-progress txns).
364        // Skip ancestor transactions (savepoint sub-txns in the same logical txn).
365        {
366            let active = self.active_txns.read();
367            for (&other_txn_id, other_state) in active.iter() {
368                if other_txn_id != txn_id
369                    && !snapshot.is_ancestor(other_txn_id)
370                    && other_state.write_set.contains(&entry)
371                {
372                    return Err(Error::WriteConflict { table });
373                }
374            }
375        }
376
377        // No conflict — record the write
378        let mut active = self.active_txns.write();
379        if let Some(state) = active.get_mut(&txn_id) {
380            state.write_set.insert(entry);
381        }
382
383        Ok(())
384    }
385
386    /// Commit a transaction
387    fn commit(&self, txn_id: TransactionId, wal: Option<&mut Wal>) -> Result<()> {
388        let (duration, result) = {
389            let mut active = self.active_txns.write();
390
391            let state = active.get_mut(&txn_id).ok_or(Error::TransactionEnded)?;
392
393            if state.status != TransactionStatus::Active {
394                return Err(Error::TransactionEnded);
395            }
396
397            let start_time = state.start_time;
398
399            // Write-write conflict detection (first-committer-wins).
400            // Extract what we need before the conflict check to avoid borrow issues.
401            let write_set = std::mem::take(&mut state.write_set);
402            let snapshot = state.snapshot.clone();
403
404            if !write_set.is_empty() {
405                let committed = self.committed_writes.read();
406                let mut conflict_table: Option<String> = None;
407
408                'outer: for (&committed_txn_id, committed_ws) in committed.iter() {
409                    // Only check transactions that are NOT visible to our snapshot
410                    // (i.e., they committed after we started)
411                    if !snapshot.can_see(committed_txn_id) {
412                        for entry in &write_set {
413                            if committed_ws.contains(entry) {
414                                conflict_table = Some(entry.0.clone());
415                                break 'outer;
416                            }
417                        }
418                    }
419                }
420                drop(committed);
421
422                if let Some(table) = conflict_table {
423                    // Conflict detected — abort this transaction
424                    if let Some(s) = active.get_mut(&txn_id) {
425                        s.status = TransactionStatus::Aborted;
426                    }
427                    active.remove(&txn_id);
428                    let oldest = active.keys().min().map(|t| t.0).unwrap_or(u64::MAX);
429                    self.oldest_active.store(oldest, Ordering::SeqCst);
430                    drop(active);
431                    self.aborted_txns.write().insert(txn_id);
432                    self.has_aborted.store(true, Ordering::Release);
433                    self.metrics.record_rollback(start_time.elapsed());
434                    return Err(Error::WriteConflict { table });
435                }
436            }
437
438            state.status = TransactionStatus::Committing;
439
440            // Log commit to WAL
441            if let Some(wal) = wal {
442                wal.log_commit(txn_id, state.last_lsn)?;
443            }
444
445            state.status = TransactionStatus::Committed;
446
447            // Move write set to committed_writes for future conflict checks
448            if !write_set.is_empty() {
449                self.committed_writes.write().insert(txn_id, write_set);
450            }
451
452            // Remove from active transactions
453            active.remove(&txn_id);
454
455            // Update oldest active
456            let oldest = active.keys().min().map(|t| t.0).unwrap_or(u64::MAX);
457            self.oldest_active.store(oldest, Ordering::SeqCst);
458
459            // GC committed_writes: only safe when no active transactions remain,
460            // because an active txn may have started while a now-committed txn was
461            // in-progress (making it invisible to the active txn's snapshot).
462            let oldest_val = self.oldest_active.load(Ordering::SeqCst);
463            if oldest_val == u64::MAX {
464                // No active transactions — all committed writes can be cleaned up
465                self.committed_writes.write().clear();
466            }
467
468            (start_time.elapsed(), Ok(()))
469        };
470
471        // Record commit in metrics
472        self.metrics.record_commit(duration);
473
474        result
475    }
476
477    /// Abort a transaction
478    fn abort(&self, txn_id: TransactionId, wal: Option<&mut Wal>) -> Result<()> {
479        let (duration, result) = {
480            let mut active = self.active_txns.write();
481
482            let state = active.get_mut(&txn_id).ok_or(Error::TransactionEnded)?;
483
484            if state.status != TransactionStatus::Active {
485                return Err(Error::TransactionEnded);
486            }
487
488            let start_time = state.start_time;
489
490            // Log abort to WAL
491            if let Some(wal) = wal {
492                wal.log_abort(txn_id, state.last_lsn)?;
493            }
494
495            state.status = TransactionStatus::Aborted;
496
497            // Remove from active transactions
498            active.remove(&txn_id);
499
500            // Add to aborted transactions set so visibility checks can exclude its writes
501            self.aborted_txns.write().insert(txn_id);
502            self.has_aborted.store(true, Ordering::Release);
503
504            // Update oldest active
505            let oldest = active.keys().min().map(|t| t.0).unwrap_or(u64::MAX);
506            self.oldest_active.store(oldest, Ordering::SeqCst);
507
508            (start_time.elapsed(), Ok(()))
509        };
510
511        // Record rollback in metrics
512        self.metrics.record_rollback(duration);
513
514        result
515    }
516
517    /// Acquire write lock for exclusive writing
518    pub fn acquire_write_lock(&self) -> impl Drop + '_ {
519        self.write_lock.lock()
520    }
521
522    /// Update last LSN for a transaction (used for WAL integration)
523    #[allow(dead_code)]
524    fn update_last_lsn(&self, txn_id: TransactionId, lsn: Lsn) {
525        let mut active = self.active_txns.write();
526        if let Some(state) = active.get_mut(&txn_id) {
527            state.last_lsn = lsn;
528        }
529    }
530
531    /// Get the oldest active transaction ID
532    pub fn oldest_active_txn(&self) -> Option<TransactionId> {
533        let oldest = self.oldest_active.load(Ordering::SeqCst);
534        if oldest == u64::MAX {
535            None
536        } else {
537            Some(TransactionId(oldest))
538        }
539    }
540
541    /// Get the version store
542    pub fn version_store(&self) -> &RwLock<VersionStore> {
543        &self.version_store
544    }
545
546    /// Get number of active transactions
547    pub fn active_count(&self) -> usize {
548        self.active_txns.read().len()
549    }
550
551    /// Quick check whether any aborted transactions exist.
552    /// When false, all `is_aborted` calls are guaranteed to return false.
553    #[inline]
554    pub fn has_aborted_transactions(&self) -> bool {
555        self.has_aborted.load(Ordering::Acquire)
556    }
557
558    /// Check if a transaction was aborted
559    #[inline]
560    pub fn is_aborted(&self, txn_id: TransactionId) -> bool {
561        // Fast path: if no transactions have ever been aborted, skip the lock
562        if !self.has_aborted.load(Ordering::Acquire) {
563            return false;
564        }
565        self.aborted_txns.read().contains(&txn_id)
566    }
567
568    /// Check if a row is visible to a snapshot, accounting for aborted transactions.
569    ///
570    /// A row is visible if:
571    /// 1. The creating transaction was NOT aborted
572    /// 2. The snapshot can see the creating transaction (per standard MVCC rules)
573    /// 3. Either the row is not deleted, the deleting transaction was aborted,
574    ///    or the snapshot cannot see the deletion
575    #[inline]
576    pub fn is_row_visible(
577        &self,
578        snapshot: &Snapshot,
579        created_by: TransactionId,
580        deleted_by: Option<TransactionId>,
581    ) -> bool {
582        // Check if the creating transaction was aborted
583        // If it was aborted, the row should not be visible to anyone
584        if self.is_aborted(created_by) {
585            return false;
586        }
587
588        // If there's a deletion, check if the deleting transaction was aborted
589        // If aborted, treat the row as not deleted
590        let effective_deleted_by = match deleted_by {
591            Some(del_by) if self.is_aborted(del_by) => None, // Deletion was aborted, ignore it
592            other => other,
593        };
594
595        // Now check standard MVCC visibility with the effective deletion status
596        snapshot.is_visible(created_by, effective_deleted_by)
597    }
598
599    /// Get the number of aborted transactions being tracked
600    pub fn aborted_count(&self) -> usize {
601        self.aborted_txns.read().len()
602    }
603
604    /// Clean up old aborted transaction IDs that are no longer needed.
605    ///
606    /// Aborted transaction IDs can be removed once all their written data
607    /// has been garbage collected. This is typically safe to do when
608    /// the aborted transaction ID is older than any active transaction's
609    /// snapshot, since all such data would have been cleaned up by GC.
610    pub fn cleanup_aborted_txns(&self, older_than: TransactionId) {
611        let mut aborted = self.aborted_txns.write();
612        aborted.retain(|&txn_id| txn_id >= older_than);
613        if aborted.is_empty() {
614            self.has_aborted.store(false, Ordering::Release);
615        }
616    }
617
618    // ==================== Dirty Page Tracking ====================
619
620    /// Register a page as dirtied by a transaction
621    ///
622    /// This is called when a transaction modifies a page. The page ID
623    /// is tracked so that on commit, only this transaction's pages are
624    /// flushed, and on abort, they can be discarded.
625    pub fn register_dirty_page(&self, txn_id: TransactionId, page_id: PageId) {
626        let mut dirty = self.dirty_pages.write();
627        dirty.entry(txn_id).or_default().insert(page_id);
628    }
629
630    /// Get all pages dirtied by a transaction
631    pub fn get_dirty_pages(&self, txn_id: TransactionId) -> HashSet<PageId> {
632        self.dirty_pages
633            .read()
634            .get(&txn_id)
635            .cloned()
636            .unwrap_or_default()
637    }
638
639    /// Clear dirty page tracking for a transaction (called on commit/abort)
640    pub fn clear_dirty_pages(&self, txn_id: TransactionId) {
641        self.dirty_pages.write().remove(&txn_id);
642    }
643
644    /// Get the number of dirty pages for a transaction
645    pub fn dirty_page_count(&self, txn_id: TransactionId) -> usize {
646        self.dirty_pages
647            .read()
648            .get(&txn_id)
649            .map(|s| s.len())
650            .unwrap_or(0)
651    }
652
653    /// Run garbage collection on old versions.
654    ///
655    /// Removes old versions that no active transaction can see.
656    /// Returns statistics about what was collected.
657    pub fn run_gc(&self) -> crate::version::GcStats {
658        let oldest = self.oldest_active_txn();
659        let mut version_store = self.version_store.write();
660        version_store.gc(oldest)
661    }
662
663    /// Get the number of old versions in the version store
664    pub fn version_count(&self) -> usize {
665        self.version_store.read().len()
666    }
667
668    /// Get information about long-running transactions
669    ///
670    /// Returns a list of transactions that have been running longer than the
671    /// specified threshold, sorted by age (oldest first).
672    pub fn long_running_transactions(&self, threshold: Duration) -> Vec<TransactionInfo> {
673        let active = self.active_txns.read();
674        let now = Instant::now();
675
676        let mut results: Vec<TransactionInfo> = active
677            .iter()
678            .filter_map(|(id, state)| {
679                let elapsed = now.duration_since(state.start_time);
680                if elapsed >= threshold {
681                    Some(TransactionInfo {
682                        id: id.0,
683                        age: elapsed,
684                    })
685                } else {
686                    None
687                }
688            })
689            .collect();
690
691        // Sort by age, oldest first
692        results.sort_by(|a, b| b.age.cmp(&a.age));
693        results
694    }
695
696    /// Get current GC status
697    ///
698    /// Returns information about whether garbage collection is blocked
699    /// by active transactions.
700    pub fn gc_status(&self) -> GcStatus {
701        let active = self.active_txns.read();
702        let oldest = active.keys().min().copied();
703
704        GcStatus {
705            oldest_active_txn: oldest.map(|id| id.0),
706            blocked: oldest.is_some(),
707            blocked_reason: oldest
708                .map(|id| format!("Waiting for transaction {} to complete", id.0)),
709        }
710    }
711
712    /// Get transaction metrics snapshot
713    ///
714    /// Returns a snapshot of transaction metrics including active count,
715    /// commits, rollbacks, and average transaction duration.
716    pub fn metrics(&self) -> TransactionMetricsSnapshot {
717        // Update long-running count before taking snapshot
718        let long_running = self.long_running_transactions(Duration::from_secs(30));
719        self.metrics
720            .update_long_running_count(long_running.len() as u64);
721
722        self.metrics.snapshot()
723    }
724}
725
726impl Default for TransactionManager {
727    fn default() -> Self {
728        Self::new()
729    }
730}
731
732/// A database transaction
733pub struct Transaction<'a> {
734    /// Transaction ID
735    id: TransactionId,
736    /// Transaction mode
737    mode: TransactionMode,
738    /// Snapshot for visibility
739    snapshot: Snapshot,
740    /// Reference to transaction manager
741    manager: &'a TransactionManager,
742    /// Time when transaction was created
743    created_at: Instant,
744    /// Transaction configuration (timeout, warning threshold)
745    config: TransactionConfig,
746    /// Track if we've already issued a warning
747    warned: bool,
748}
749
750impl<'a> Transaction<'a> {
751    /// Get the transaction ID
752    pub fn id(&self) -> TransactionId {
753        self.id
754    }
755
756    /// Get the transaction mode
757    pub fn mode(&self) -> TransactionMode {
758        self.mode
759    }
760
761    /// Get the snapshot
762    pub fn snapshot(&self) -> &Snapshot {
763        &self.snapshot
764    }
765
766    /// Check if this is a read-only transaction
767    pub fn is_read_only(&self) -> bool {
768        self.mode == TransactionMode::ReadOnly
769    }
770
771    /// Commit the transaction
772    pub fn commit(self) -> Result<()> {
773        self.check_timeout()?;
774        self.manager.commit(self.id, None)
775    }
776
777    /// Commit with WAL
778    pub fn commit_with_wal(self, wal: &mut Wal) -> Result<()> {
779        self.check_timeout()?;
780        self.manager.commit(self.id, Some(wal))
781    }
782
783    /// Abort the transaction
784    pub fn abort(self) -> Result<()> {
785        self.manager.abort(self.id, None)
786    }
787
788    /// Abort with WAL
789    pub fn abort_with_wal(self, wal: &mut Wal) -> Result<()> {
790        self.manager.abort(self.id, Some(wal))
791    }
792
793    /// Check if a version is visible
794    pub fn can_see(&self, created_by: TransactionId) -> bool {
795        self.snapshot.can_see(created_by)
796    }
797
798    /// Check write permission
799    pub fn check_write(&self) -> Result<()> {
800        self.check_timeout()?;
801        if self.mode == TransactionMode::ReadOnly {
802            return Err(Error::ReadOnly);
803        }
804        Ok(())
805    }
806
807    /// Check read permission (primarily for timeout enforcement)
808    pub fn check_read(&self) -> Result<()> {
809        self.check_timeout()
810    }
811
812    /// Check if this transaction has exceeded its timeout
813    pub fn is_expired(&self) -> bool {
814        if let Some(timeout_ms) = self.config.timeout_ms {
815            self.created_at.elapsed().as_millis() as u64 > timeout_ms
816        } else {
817            false
818        }
819    }
820
821    /// Get remaining time before timeout (None if no timeout configured or already expired)
822    pub fn remaining_time(&self) -> Option<Duration> {
823        let timeout_ms = self.config.timeout_ms?;
824        let elapsed = self.created_at.elapsed();
825        let timeout = Duration::from_millis(timeout_ms);
826        if elapsed >= timeout {
827            None
828        } else {
829            Some(timeout - elapsed)
830        }
831    }
832
833    /// Get elapsed time since transaction started
834    pub fn elapsed(&self) -> Duration {
835        self.created_at.elapsed()
836    }
837
838    /// Check if warning threshold exceeded (call this periodically)
839    /// Returns true only once when the threshold is first exceeded
840    pub fn should_warn(&mut self) -> bool {
841        if self.warned {
842            return false;
843        }
844        if let Some(warn_ms) = self.config.warn_after_ms {
845            if self.created_at.elapsed().as_millis() as u64 > warn_ms {
846                self.warned = true;
847                return true;
848            }
849        }
850        false
851    }
852
853    /// Check if transaction is expired and return error if so
854    fn check_timeout(&self) -> Result<()> {
855        if self.is_expired() {
856            return Err(Error::transaction_timeout(
857                self.id.0,
858                self.created_at.elapsed().as_millis() as u64,
859                self.config.timeout_ms.unwrap_or(0),
860            ));
861        }
862        Ok(())
863    }
864}
865
866impl<'a> Drop for Transaction<'a> {
867    fn drop(&mut self) {
868        // Auto-abort if not committed
869        let _ = self.manager.abort(self.id, None);
870    }
871}
872
873#[cfg(test)]
874mod tests {
875    use super::*;
876
877    #[test]
878    fn test_transaction_manager() {
879        let tm = TransactionManager::new();
880
881        let txn1 = tm.begin_read_write();
882        assert_eq!(txn1.id(), TransactionId(1));
883
884        let txn2 = tm.begin_read_only();
885        assert_eq!(txn2.id(), TransactionId(2));
886
887        assert_eq!(tm.active_count(), 2);
888
889        txn1.commit().unwrap();
890        assert_eq!(tm.active_count(), 1);
891
892        txn2.abort().unwrap();
893        assert_eq!(tm.active_count(), 0);
894    }
895
896    #[test]
897    fn test_snapshot_isolation() {
898        let tm = TransactionManager::new();
899
900        let txn1 = tm.begin_read_write();
901        let txn2 = tm.begin_read_only();
902
903        // txn2 cannot see txn1's changes (txn1 started before txn2 but hasn't committed)
904        assert!(!txn2.can_see(txn1.id()));
905
906        // Each transaction can see its own changes
907        assert!(txn1.can_see(txn1.id()));
908        assert!(txn2.can_see(txn2.id()));
909    }
910
911    #[test]
912    fn test_read_only_cannot_write() {
913        let tm = TransactionManager::new();
914        let txn = tm.begin_read_only();
915
916        assert!(txn.check_write().is_err());
917    }
918
919    #[test]
920    fn test_auto_abort_on_drop() {
921        let tm = TransactionManager::new();
922
923        {
924            let _txn = tm.begin_read_write();
925            assert_eq!(tm.active_count(), 1);
926            // txn dropped here without commit
927        }
928
929        assert_eq!(tm.active_count(), 0);
930    }
931
932    #[test]
933    fn test_run_gc() {
934        use featherdb_core::Value;
935
936        let tm = TransactionManager::new();
937
938        // Add some old versions to the version store
939        {
940            let mut vs = tm.version_store().write();
941
942            // Insert version that was deleted by an old transaction
943            let old_version = crate::version::OldVersion {
944                data: vec![Value::Integer(42)],
945                created_by: TransactionId(1),
946                deleted_by: Some(TransactionId(2)),
947                prev_version: None,
948            };
949            vs.insert(old_version);
950
951            // Insert version deleted by a future transaction
952            let new_version = crate::version::OldVersion {
953                data: vec![Value::Integer(100)],
954                created_by: TransactionId(3),
955                deleted_by: Some(TransactionId(100)),
956                prev_version: None,
957            };
958            vs.insert(new_version);
959        }
960
961        assert_eq!(tm.version_count(), 2);
962
963        // No active transactions, so GC should clean up all deleted versions
964        let stats = tm.run_gc();
965
966        assert_eq!(stats.versions_removed, 2);
967        assert_eq!(tm.version_count(), 0);
968    }
969
970    #[test]
971    fn test_gc_preserves_versions_for_active_transactions() {
972        let tm = TransactionManager::new();
973
974        // Add an old version deleted at transaction 5
975        {
976            let mut vs = tm.version_store().write();
977            let old_version = crate::version::OldVersion {
978                data: vec![featherdb_core::Value::Integer(42)],
979                created_by: TransactionId(1),
980                deleted_by: Some(TransactionId(5)),
981                prev_version: None,
982            };
983            vs.insert(old_version);
984        }
985
986        // Start a transaction (will be txn id 1)
987        let txn = tm.begin_read_write();
988
989        // Advance transaction IDs by starting and committing several transactions
990        for _ in 0..10 {
991            let t = tm.begin_read_write();
992            t.commit().unwrap();
993        }
994
995        // Now run GC - should NOT remove version because txn is still active
996        // (txn was started before we simulate the deletion)
997        // Actually, we need to think about this more carefully:
998        // The oldest active is txn.id() = 1
999        // The version was deleted_by = 5
1000        // So 5 >= 1, meaning NOT all active can see the deletion
1001        // Therefore GC should NOT collect this version
1002
1003        let stats = tm.run_gc();
1004
1005        // Version should NOT be removed because oldest active (txn id 1)
1006        // cannot see the deletion (deleted_by 5 >= oldest 1)
1007        assert_eq!(stats.versions_removed, 0);
1008        assert_eq!(tm.version_count(), 1);
1009
1010        // Commit the transaction
1011        txn.commit().unwrap();
1012
1013        // Now GC should clean it up (no active transactions)
1014        let stats = tm.run_gc();
1015        assert_eq!(stats.versions_removed, 1);
1016        assert_eq!(tm.version_count(), 0);
1017    }
1018
1019    #[test]
1020    fn test_transaction_default_has_timeout() {
1021        let tm = TransactionManager::new();
1022        let txn = tm.begin_read_write();
1023
1024        // Default config has 30 second timeout (safe default)
1025        assert!(!txn.is_expired());
1026        let remaining = txn.remaining_time();
1027        assert!(remaining.is_some());
1028        // Should be close to 30 seconds
1029        assert!(remaining.unwrap().as_secs() >= 29);
1030    }
1031
1032    #[test]
1033    fn test_transaction_no_limits() {
1034        let tm = TransactionManager::new();
1035        let config = TransactionConfig::no_limits();
1036        let txn = tm.begin_with_config(TransactionMode::ReadWrite, config);
1037
1038        // No timeout configured with no_limits()
1039        assert!(!txn.is_expired());
1040        assert_eq!(txn.remaining_time(), None);
1041    }
1042
1043    #[test]
1044    fn test_transaction_with_timeout() {
1045        use std::thread;
1046        use std::time::Duration;
1047
1048        let tm = TransactionManager::new();
1049        let config = TransactionConfig::new().with_timeout(100);
1050        let txn = tm.begin_with_config(TransactionMode::ReadWrite, config);
1051
1052        // Should not be expired immediately
1053        assert!(!txn.is_expired());
1054        assert!(txn.remaining_time().is_some());
1055
1056        // Wait for timeout to expire
1057        thread::sleep(Duration::from_millis(150));
1058
1059        // Should be expired now
1060        assert!(txn.is_expired());
1061        assert_eq!(txn.remaining_time(), None);
1062    }
1063
1064    #[test]
1065    fn test_transaction_elapsed_time() {
1066        use std::thread;
1067        use std::time::Duration;
1068
1069        let tm = TransactionManager::new();
1070        let txn = tm.begin_read_write();
1071
1072        // Elapsed time should be very small initially
1073        assert!(txn.elapsed().as_millis() < 10);
1074
1075        thread::sleep(Duration::from_millis(50));
1076
1077        // Should have elapsed at least 50ms
1078        assert!(txn.elapsed().as_millis() >= 50);
1079    }
1080
1081    #[test]
1082    fn test_transaction_warning_threshold() {
1083        use std::thread;
1084        use std::time::Duration;
1085
1086        let tm = TransactionManager::new();
1087        let config = TransactionConfig::new().with_warning(50);
1088        let mut txn = tm.begin_with_config(TransactionMode::ReadWrite, config);
1089
1090        // Should not warn immediately
1091        assert!(!txn.should_warn());
1092
1093        thread::sleep(Duration::from_millis(70));
1094
1095        // Should warn after threshold
1096        assert!(txn.should_warn());
1097
1098        // Should only warn once
1099        assert!(!txn.should_warn());
1100    }
1101
1102    #[test]
1103    fn test_transaction_timeout_and_warning() {
1104        use std::thread;
1105        use std::time::Duration;
1106
1107        let tm = TransactionManager::new();
1108        let config = TransactionConfig::with_timeout_and_warning(200, 50);
1109        let mut txn = tm.begin_with_config(TransactionMode::ReadWrite, config);
1110
1111        // Initially neither expired nor warning
1112        assert!(!txn.is_expired());
1113        assert!(!txn.should_warn());
1114
1115        thread::sleep(Duration::from_millis(70));
1116
1117        // Should warn but not expired yet
1118        assert!(txn.should_warn());
1119        assert!(!txn.is_expired());
1120        assert!(txn.remaining_time().is_some());
1121
1122        thread::sleep(Duration::from_millis(150));
1123
1124        // Should be expired now
1125        assert!(txn.is_expired());
1126        assert_eq!(txn.remaining_time(), None);
1127    }
1128
1129    #[test]
1130    fn test_transaction_remaining_time_decreases() {
1131        use std::thread;
1132        use std::time::Duration;
1133
1134        let tm = TransactionManager::new();
1135        let config = TransactionConfig::new().with_timeout(500);
1136        let txn = tm.begin_with_config(TransactionMode::ReadWrite, config);
1137
1138        let remaining1 = txn.remaining_time().unwrap();
1139        thread::sleep(Duration::from_millis(100));
1140        let remaining2 = txn.remaining_time().unwrap();
1141
1142        // Remaining time should decrease
1143        assert!(remaining2 < remaining1);
1144        assert!(remaining2.as_millis() < remaining1.as_millis());
1145    }
1146
1147    #[test]
1148    fn test_default_transaction_config() {
1149        let tm = TransactionManager::new();
1150        let config = TransactionConfig::default();
1151        let txn = tm.begin_with_config(TransactionMode::ReadWrite, config);
1152
1153        // Default config has 30 second timeout (safe default)
1154        assert!(!txn.is_expired());
1155        let remaining = txn.remaining_time();
1156        assert!(remaining.is_some());
1157        assert!(remaining.unwrap().as_secs() >= 29);
1158    }
1159
1160    #[test]
1161    fn test_timeout_prevents_commit() {
1162        use std::thread;
1163        use std::time::Duration;
1164
1165        let tm = TransactionManager::new();
1166        let config = TransactionConfig::new().with_timeout(50);
1167        let txn = tm.begin_with_config(TransactionMode::ReadWrite, config);
1168
1169        // Sleep past the timeout
1170        thread::sleep(Duration::from_millis(70));
1171
1172        // Commit should fail with TransactionTimeout error
1173        let result = txn.commit();
1174        assert!(result.is_err());
1175        match result {
1176            Err(Error::TransactionTimeout {
1177                transaction_id,
1178                elapsed_ms,
1179                timeout_ms,
1180            }) => {
1181                assert_eq!(transaction_id, 1);
1182                assert!(elapsed_ms >= 70);
1183                assert_eq!(timeout_ms, 50);
1184            }
1185            _ => panic!("Expected TransactionTimeout error"),
1186        }
1187    }
1188
1189    #[test]
1190    fn test_timeout_prevents_write_operations() {
1191        use std::thread;
1192        use std::time::Duration;
1193
1194        let tm = TransactionManager::new();
1195        let config = TransactionConfig::new().with_timeout(50);
1196        let txn = tm.begin_with_config(TransactionMode::ReadWrite, config);
1197
1198        // Sleep past the timeout
1199        thread::sleep(Duration::from_millis(70));
1200
1201        // check_write should fail with TransactionTimeout error
1202        let result = txn.check_write();
1203        assert!(result.is_err());
1204        match result {
1205            Err(Error::TransactionTimeout {
1206                transaction_id,
1207                elapsed_ms,
1208                timeout_ms,
1209            }) => {
1210                assert_eq!(transaction_id, 1);
1211                assert!(elapsed_ms >= 70);
1212                assert_eq!(timeout_ms, 50);
1213            }
1214            _ => panic!("Expected TransactionTimeout error"),
1215        }
1216    }
1217
1218    #[test]
1219    fn test_timeout_prevents_read_operations() {
1220        use std::thread;
1221        use std::time::Duration;
1222
1223        let tm = TransactionManager::new();
1224        let config = TransactionConfig::new().with_timeout(50);
1225        let txn = tm.begin_with_config(TransactionMode::ReadOnly, config);
1226
1227        // Sleep past the timeout
1228        thread::sleep(Duration::from_millis(70));
1229
1230        // check_read should fail with TransactionTimeout error
1231        let result = txn.check_read();
1232        assert!(result.is_err());
1233        match result {
1234            Err(Error::TransactionTimeout {
1235                transaction_id,
1236                elapsed_ms,
1237                timeout_ms,
1238            }) => {
1239                assert_eq!(transaction_id, 1);
1240                assert!(elapsed_ms >= 70);
1241                assert_eq!(timeout_ms, 50);
1242            }
1243            _ => panic!("Expected TransactionTimeout error"),
1244        }
1245    }
1246
1247    #[test]
1248    fn test_no_timeout_allows_operations() {
1249        use std::thread;
1250        use std::time::Duration;
1251
1252        let tm = TransactionManager::new();
1253        let txn = tm.begin_read_write();
1254
1255        // Sleep for a while
1256        thread::sleep(Duration::from_millis(50));
1257
1258        // Operations should succeed without timeout configured
1259        assert!(txn.check_write().is_ok());
1260        assert!(txn.check_read().is_ok());
1261        assert!(txn.commit().is_ok());
1262    }
1263
1264    #[test]
1265    fn test_operations_succeed_within_timeout() {
1266        use std::thread;
1267        use std::time::Duration;
1268
1269        let tm = TransactionManager::new();
1270        let config = TransactionConfig::new().with_timeout(200);
1271        let txn = tm.begin_with_config(TransactionMode::ReadWrite, config);
1272
1273        // Sleep but stay within timeout
1274        thread::sleep(Duration::from_millis(50));
1275
1276        // Operations should succeed
1277        assert!(txn.check_write().is_ok());
1278        assert!(txn.check_read().is_ok());
1279        assert!(txn.commit().is_ok());
1280    }
1281
1282    #[test]
1283    fn test_timeout_error_message() {
1284        use std::thread;
1285        use std::time::Duration;
1286
1287        let tm = TransactionManager::new();
1288        let config = TransactionConfig::new().with_timeout(30);
1289        let txn = tm.begin_with_config(TransactionMode::ReadWrite, config);
1290
1291        thread::sleep(Duration::from_millis(50));
1292
1293        let result = txn.commit();
1294        assert!(result.is_err());
1295
1296        let error_msg = result.unwrap_err().to_string();
1297        assert!(error_msg.contains("Transaction"));
1298        assert!(error_msg.contains("exceeded timeout"));
1299        assert!(error_msg.contains("30ms"));
1300    }
1301
1302    #[test]
1303    fn test_long_running_transactions_empty() {
1304        use std::time::Duration;
1305
1306        let tm = TransactionManager::new();
1307
1308        // No transactions running
1309        let long_running = tm.long_running_transactions(Duration::from_secs(1));
1310        assert!(long_running.is_empty());
1311    }
1312
1313    #[test]
1314    fn test_long_running_transactions_below_threshold() {
1315        use std::time::Duration;
1316
1317        let tm = TransactionManager::new();
1318        let _txn1 = tm.begin_read_write();
1319        let _txn2 = tm.begin_read_only();
1320
1321        // Transactions just started, should not appear as long-running
1322        let long_running = tm.long_running_transactions(Duration::from_secs(1));
1323        assert!(long_running.is_empty());
1324    }
1325
1326    #[test]
1327    fn test_long_running_transactions_above_threshold() {
1328        use std::thread;
1329        use std::time::Duration;
1330
1331        let tm = TransactionManager::new();
1332        let _txn1 = tm.begin_read_write();
1333
1334        // Sleep past threshold
1335        thread::sleep(Duration::from_millis(100));
1336
1337        let _txn2 = tm.begin_read_only();
1338
1339        // Check with 50ms threshold
1340        let long_running = tm.long_running_transactions(Duration::from_millis(50));
1341
1342        // Both transactions should appear (txn1 > 100ms, txn2 > 0ms but we check against 50ms)
1343        // Actually only txn1 should appear since txn2 was just created
1344        assert_eq!(long_running.len(), 1);
1345        assert_eq!(long_running[0].id, 1);
1346        assert!(long_running[0].age >= Duration::from_millis(100));
1347    }
1348
1349    #[test]
1350    fn test_long_running_transactions_sorted_by_age() {
1351        use std::thread;
1352        use std::time::Duration;
1353
1354        let tm = TransactionManager::new();
1355        let _txn1 = tm.begin_read_write();
1356
1357        thread::sleep(Duration::from_millis(50));
1358        let _txn2 = tm.begin_read_only();
1359
1360        thread::sleep(Duration::from_millis(50));
1361        let _txn3 = tm.begin_read_write();
1362
1363        // Sleep a bit more to ensure all transactions are above threshold
1364        thread::sleep(Duration::from_millis(30));
1365
1366        // All transactions should be above 20ms threshold
1367        let long_running = tm.long_running_transactions(Duration::from_millis(20));
1368
1369        // Should be sorted by age, oldest first
1370        assert_eq!(long_running.len(), 3);
1371        assert_eq!(long_running[0].id, 1); // Oldest
1372        assert_eq!(long_running[1].id, 2); // Middle
1373        assert_eq!(long_running[2].id, 3); // Youngest
1374
1375        // Verify ages are in descending order
1376        assert!(long_running[0].age >= long_running[1].age);
1377        assert!(long_running[1].age >= long_running[2].age);
1378    }
1379
1380    #[test]
1381    fn test_long_running_transactions_after_commit() {
1382        use std::thread;
1383        use std::time::Duration;
1384
1385        let tm = TransactionManager::new();
1386        let txn1 = tm.begin_read_write();
1387
1388        thread::sleep(Duration::from_millis(50));
1389
1390        // Transaction should appear as long-running
1391        let long_running = tm.long_running_transactions(Duration::from_millis(20));
1392        assert_eq!(long_running.len(), 1);
1393        assert_eq!(long_running[0].id, 1);
1394
1395        // Commit the transaction
1396        txn1.commit().unwrap();
1397
1398        // Should no longer appear
1399        let long_running = tm.long_running_transactions(Duration::from_millis(20));
1400        assert!(long_running.is_empty());
1401    }
1402
1403    #[test]
1404    fn test_gc_status_no_active_transactions() {
1405        let tm = TransactionManager::new();
1406
1407        let status = tm.gc_status();
1408        assert_eq!(status.oldest_active_txn, None);
1409        assert!(!status.blocked);
1410        assert_eq!(status.blocked_reason, None);
1411    }
1412
1413    #[test]
1414    fn test_gc_status_with_active_transaction() {
1415        let tm = TransactionManager::new();
1416        let _txn = tm.begin_read_write();
1417
1418        let status = tm.gc_status();
1419        assert_eq!(status.oldest_active_txn, Some(1));
1420        assert!(status.blocked);
1421        assert!(status.blocked_reason.is_some());
1422        assert!(status.blocked_reason.unwrap().contains("transaction 1"));
1423    }
1424
1425    #[test]
1426    fn test_gc_status_multiple_transactions() {
1427        let tm = TransactionManager::new();
1428        let _txn1 = tm.begin_read_write();
1429        let _txn2 = tm.begin_read_only();
1430        let _txn3 = tm.begin_read_write();
1431
1432        let status = tm.gc_status();
1433        // Oldest should be transaction 1
1434        assert_eq!(status.oldest_active_txn, Some(1));
1435        assert!(status.blocked);
1436        assert!(status.blocked_reason.unwrap().contains("transaction 1"));
1437    }
1438
1439    #[test]
1440    fn test_gc_status_after_oldest_commits() {
1441        let tm = TransactionManager::new();
1442        let txn1 = tm.begin_read_write();
1443        let _txn2 = tm.begin_read_only();
1444        let _txn3 = tm.begin_read_write();
1445
1446        // Initially blocked by transaction 1
1447        let status = tm.gc_status();
1448        assert_eq!(status.oldest_active_txn, Some(1));
1449        assert!(status.blocked);
1450
1451        // Commit transaction 1
1452        txn1.commit().unwrap();
1453
1454        // Now blocked by transaction 2
1455        let status = tm.gc_status();
1456        assert_eq!(status.oldest_active_txn, Some(2));
1457        assert!(status.blocked);
1458        assert!(status.blocked_reason.unwrap().contains("transaction 2"));
1459    }
1460
1461    #[test]
1462    fn test_gc_status_after_all_commit() {
1463        let tm = TransactionManager::new();
1464        let txn1 = tm.begin_read_write();
1465        let txn2 = tm.begin_read_only();
1466
1467        // Initially blocked
1468        let status = tm.gc_status();
1469        assert!(status.blocked);
1470
1471        // Commit all
1472        txn1.commit().unwrap();
1473        txn2.commit().unwrap();
1474
1475        // No longer blocked
1476        let status = tm.gc_status();
1477        assert_eq!(status.oldest_active_txn, None);
1478        assert!(!status.blocked);
1479        assert_eq!(status.blocked_reason, None);
1480    }
1481
1482    #[test]
1483    fn test_long_running_and_gc_status_integration() {
1484        use std::thread;
1485        use std::time::Duration;
1486
1487        let tm = TransactionManager::new();
1488
1489        // Start a long-running transaction
1490        let _old_txn = tm.begin_read_write();
1491
1492        thread::sleep(Duration::from_millis(100));
1493
1494        // Start newer transactions
1495        let _new_txn1 = tm.begin_read_only();
1496        let _new_txn2 = tm.begin_read_write();
1497
1498        // Check long-running transactions
1499        let long_running = tm.long_running_transactions(Duration::from_millis(50));
1500        assert_eq!(long_running.len(), 1);
1501        assert_eq!(long_running[0].id, 1);
1502
1503        // Check GC status
1504        let gc_status = tm.gc_status();
1505        assert_eq!(gc_status.oldest_active_txn, Some(1));
1506        assert!(gc_status.blocked);
1507        assert!(gc_status.blocked_reason.unwrap().contains("transaction 1"));
1508
1509        // The oldest transaction (1) is long-running and blocking GC
1510        assert_eq!(long_running[0].id, gc_status.oldest_active_txn.unwrap());
1511    }
1512
1513    #[test]
1514    fn test_transaction_info_structure() {
1515        use std::thread;
1516        use std::time::Duration;
1517
1518        let tm = TransactionManager::new();
1519        let _txn = tm.begin_read_write();
1520
1521        thread::sleep(Duration::from_millis(50));
1522
1523        let long_running = tm.long_running_transactions(Duration::from_millis(20));
1524        assert_eq!(long_running.len(), 1);
1525
1526        let info = &long_running[0];
1527        assert_eq!(info.id, 1);
1528        assert!(info.age >= Duration::from_millis(50));
1529        assert!(info.age < Duration::from_secs(1));
1530    }
1531
1532    #[test]
1533    fn test_gc_status_structure() {
1534        let tm = TransactionManager::new();
1535        let _txn = tm.begin_read_write();
1536
1537        let status = tm.gc_status();
1538        assert_eq!(status.oldest_active_txn, Some(1));
1539        assert!(status.blocked);
1540
1541        let reason = status.blocked_reason.unwrap();
1542        assert!(reason.contains("Waiting for"));
1543        assert!(reason.contains("transaction 1"));
1544        assert!(reason.contains("complete"));
1545    }
1546
1547    #[test]
1548    fn test_transaction_metrics_initial_state() {
1549        let tm = TransactionManager::new();
1550        let metrics = tm.metrics();
1551
1552        assert_eq!(metrics.active_count, 0);
1553        assert_eq!(metrics.total_commits, 0);
1554        assert_eq!(metrics.total_rollbacks, 0);
1555        assert_eq!(metrics.long_running_count, 0);
1556        assert_eq!(metrics.avg_duration_us, 0);
1557        assert_eq!(metrics.total_transactions, 0);
1558        assert_eq!(metrics.commit_rate(), 0.0);
1559    }
1560
1561    #[test]
1562    fn test_transaction_metrics_begin() {
1563        let tm = TransactionManager::new();
1564        let _txn1 = tm.begin_read_write();
1565        let _txn2 = tm.begin_read_only();
1566
1567        let metrics = tm.metrics();
1568        assert_eq!(metrics.active_count, 2);
1569        assert_eq!(metrics.total_commits, 0);
1570        assert_eq!(metrics.total_rollbacks, 0);
1571    }
1572
1573    #[test]
1574    fn test_transaction_metrics_commit() {
1575        use std::thread;
1576
1577        let tm = TransactionManager::new();
1578        let txn = tm.begin_read_write();
1579
1580        // Sleep a bit to ensure measurable duration
1581        thread::sleep(Duration::from_millis(10));
1582
1583        txn.commit().unwrap();
1584
1585        let metrics = tm.metrics();
1586        assert_eq!(metrics.active_count, 0);
1587        assert_eq!(metrics.total_commits, 1);
1588        assert_eq!(metrics.total_rollbacks, 0);
1589        assert_eq!(metrics.total_transactions, 1);
1590        assert!(metrics.avg_duration_us > 0);
1591        assert_eq!(metrics.commit_rate(), 1.0);
1592    }
1593
1594    #[test]
1595    fn test_transaction_metrics_rollback() {
1596        use std::thread;
1597
1598        let tm = TransactionManager::new();
1599        let txn = tm.begin_read_write();
1600
1601        thread::sleep(Duration::from_millis(10));
1602
1603        txn.abort().unwrap();
1604
1605        let metrics = tm.metrics();
1606        assert_eq!(metrics.active_count, 0);
1607        assert_eq!(metrics.total_commits, 0);
1608        assert_eq!(metrics.total_rollbacks, 1);
1609        assert_eq!(metrics.total_transactions, 1);
1610        assert!(metrics.avg_duration_us > 0);
1611        assert_eq!(metrics.commit_rate(), 0.0);
1612    }
1613
1614    #[test]
1615    fn test_transaction_metrics_mixed() {
1616        use std::thread;
1617
1618        let tm = TransactionManager::new();
1619
1620        // Commit 3 transactions
1621        for _ in 0..3 {
1622            let txn = tm.begin_read_write();
1623            thread::sleep(Duration::from_millis(5));
1624            txn.commit().unwrap();
1625        }
1626
1627        // Rollback 1 transaction
1628        let txn = tm.begin_read_write();
1629        thread::sleep(Duration::from_millis(5));
1630        txn.abort().unwrap();
1631
1632        let metrics = tm.metrics();
1633        assert_eq!(metrics.active_count, 0);
1634        assert_eq!(metrics.total_commits, 3);
1635        assert_eq!(metrics.total_rollbacks, 1);
1636        assert_eq!(metrics.total_transactions, 4);
1637        assert!(metrics.avg_duration_us > 0);
1638        assert_eq!(metrics.commit_rate(), 0.75); // 3/4
1639    }
1640
1641    #[test]
1642    fn test_transaction_metrics_long_running() {
1643        use std::thread;
1644
1645        let tm = TransactionManager::new();
1646        let _txn1 = tm.begin_read_write();
1647
1648        // Sleep past the 30s threshold used in metrics() method
1649        // But we'll use a shorter sleep and just check the structure
1650        thread::sleep(Duration::from_millis(10));
1651
1652        let _txn2 = tm.begin_read_only();
1653
1654        let metrics = tm.metrics();
1655        assert_eq!(metrics.active_count, 2);
1656        // long_running_count depends on the 30s threshold in metrics()
1657        // so it will be 0 in this quick test
1658        assert_eq!(metrics.long_running_count, 0);
1659    }
1660
1661    #[test]
1662    fn test_transaction_metrics_snapshot_commit_rate() {
1663        let snapshot = TransactionMetricsSnapshot {
1664            active_count: 2,
1665            total_commits: 7,
1666            total_rollbacks: 3,
1667            long_running_count: 1,
1668            avg_duration_us: 1000,
1669            total_transactions: 10,
1670        };
1671
1672        assert_eq!(snapshot.commit_rate(), 0.7);
1673    }
1674
1675    #[test]
1676    fn test_transaction_metrics_snapshot_commit_rate_zero_transactions() {
1677        let snapshot = TransactionMetricsSnapshot {
1678            active_count: 0,
1679            total_commits: 0,
1680            total_rollbacks: 0,
1681            long_running_count: 0,
1682            avg_duration_us: 0,
1683            total_transactions: 0,
1684        };
1685
1686        assert_eq!(snapshot.commit_rate(), 0.0);
1687    }
1688
1689    #[test]
1690    fn test_transaction_metrics_average_duration() {
1691        use std::thread;
1692
1693        let tm = TransactionManager::new();
1694
1695        // Create several transactions with different durations
1696        let txn1 = tm.begin_read_write();
1697        thread::sleep(Duration::from_millis(10));
1698        txn1.commit().unwrap();
1699
1700        let txn2 = tm.begin_read_write();
1701        thread::sleep(Duration::from_millis(20));
1702        txn2.commit().unwrap();
1703
1704        let metrics = tm.metrics();
1705        assert_eq!(metrics.total_transactions, 2);
1706        // Average should be at least 10ms (in microseconds)
1707        // No upper bound — CI runners have unpredictable scheduling latency
1708        assert!(metrics.avg_duration_us >= 10_000);
1709    }
1710
1711    #[test]
1712    fn test_transaction_metrics_auto_abort() {
1713        let tm = TransactionManager::new();
1714
1715        {
1716            let _txn = tm.begin_read_write();
1717            // txn dropped here without commit - should auto-abort
1718        }
1719
1720        let metrics = tm.metrics();
1721        assert_eq!(metrics.active_count, 0);
1722        assert_eq!(metrics.total_commits, 0);
1723        assert_eq!(metrics.total_rollbacks, 1);
1724        assert_eq!(metrics.total_transactions, 1);
1725    }
1726}