Skip to main content

reddb_server/storage/wal/
transaction.rs

1//! Transaction Manager
2//!
3//! Provides ACID transaction support for the RedDB storage engine.
4//!
5//! # Transaction Lifecycle
6//!
7//! 1. Begin: Allocate transaction ID, write Begin record to WAL
8//! 2. Read/Write: Track page reads and buffer page writes
9//! 3. Commit: Write Commit record to WAL, sync WAL
10//! 4. Rollback: Write Rollback record to WAL, discard buffered writes
11//!
12//! # Isolation Level
13//!
14//! Currently implements Read Committed isolation:
15//! - Reads see committed data at the start of the statement
16//! - No dirty reads
17//! - Possible non-repeatable reads
18//!
19//! # References
20//!
21//! - Turso `core/transaction.rs` - Transaction implementation
22//! - SQLite transaction documentation
23
24use std::collections::HashMap;
25use std::io;
26use std::path::{Path, PathBuf};
27use std::sync::atomic::{AtomicU64, Ordering};
28use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
29
30use parking_lot::{Mutex, MutexGuard};
31
32use super::append_coordinator::WalAppendCoordinator;
33use super::record::WalRecord;
34use super::writer::WalWriter;
35use crate::storage::engine::{Page, Pager, PAGE_SIZE};
36
37/// Global transaction ID counter
38static NEXT_TX_ID: AtomicU64 = AtomicU64::new(1);
39
40/// Generate a new unique transaction ID
41fn next_transaction_id() -> u64 {
42    NEXT_TX_ID.fetch_add(1, Ordering::SeqCst)
43}
44
45/// Transaction state
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum TxState {
48    /// Transaction is active and can perform operations
49    Active,
50    /// Transaction has been committed
51    Committed,
52    /// Transaction has been rolled back
53    Aborted,
54}
55
56/// Transaction error types
57#[derive(Debug)]
58pub enum TxError {
59    /// I/O error
60    Io(io::Error),
61    /// Pager error
62    Pager(String),
63    /// Internal lock was poisoned by a panic
64    LockPoisoned(&'static str),
65    /// Transaction is not active
66    NotActive,
67    /// Transaction already committed
68    AlreadyCommitted,
69    /// Transaction already aborted
70    AlreadyAborted,
71    /// Write conflict
72    WriteConflict(u32),
73    /// Invalid page data
74    InvalidPage(String),
75}
76
77impl std::fmt::Display for TxError {
78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79        match self {
80            Self::Io(e) => write!(f, "I/O error: {}", e),
81            Self::Pager(msg) => write!(f, "Pager error: {}", msg),
82            Self::LockPoisoned(name) => write!(f, "Lock poisoned: {}", name),
83            Self::NotActive => write!(f, "Transaction is not active"),
84            Self::AlreadyCommitted => write!(f, "Transaction already committed"),
85            Self::AlreadyAborted => write!(f, "Transaction already aborted"),
86            Self::WriteConflict(page_id) => write!(f, "Write conflict on page {}", page_id),
87            Self::InvalidPage(msg) => write!(f, "Invalid page: {}", msg),
88        }
89    }
90}
91
92impl std::error::Error for TxError {}
93
94impl From<io::Error> for TxError {
95    fn from(e: io::Error) -> Self {
96        Self::Io(e)
97    }
98}
99
100/// A buffered page write
101#[derive(Clone)]
102struct BufferedWrite {
103    page_id: u32,
104    data: [u8; PAGE_SIZE],
105}
106
107/// A single transaction
108///
109/// Transactions buffer writes and commit them atomically to the WAL.
110pub struct Transaction {
111    /// Transaction ID
112    id: u64,
113    /// Transaction state
114    state: TxState,
115    /// Buffered page writes (page_id -> page data)
116    write_set: HashMap<u32, BufferedWrite>,
117    /// Pages read in this transaction (for conflict detection)
118    read_set: Vec<u32>,
119    /// Reference to the transaction manager
120    manager: Arc<TransactionManager>,
121}
122
123impl Transaction {
124    /// Get transaction ID
125    pub fn id(&self) -> u64 {
126        self.id
127    }
128
129    /// Get transaction state
130    pub fn state(&self) -> TxState {
131        self.state
132    }
133
134    /// Check if transaction is active
135    pub fn is_active(&self) -> bool {
136        self.state == TxState::Active
137    }
138
139    /// Read a page through this transaction
140    ///
141    /// If the page has been written in this transaction, returns the buffered version.
142    /// Otherwise, reads from the pager.
143    pub fn read_page(&mut self, page_id: u32) -> Result<Page, TxError> {
144        if self.state != TxState::Active {
145            return Err(TxError::NotActive);
146        }
147
148        // Check write set first
149        if let Some(buffered) = self.write_set.get(&page_id) {
150            return Ok(Page::from_bytes(buffered.data));
151        }
152
153        // Track the read
154        self.read_set.push(page_id);
155
156        // Read from pager
157        self.manager
158            .pager
159            .read_page(page_id)
160            .map_err(|e| TxError::Pager(e.to_string()))
161    }
162
163    /// Write a page through this transaction
164    ///
165    /// The write is buffered and will be committed to the WAL on commit.
166    pub fn write_page(&mut self, page_id: u32, page: Page) -> Result<(), TxError> {
167        if self.state != TxState::Active {
168            return Err(TxError::NotActive);
169        }
170
171        // Buffer the write
172        let mut data = [0u8; PAGE_SIZE];
173        data.copy_from_slice(page.as_bytes());
174
175        self.write_set
176            .insert(page_id, BufferedWrite { page_id, data });
177
178        Ok(())
179    }
180
181    /// Commit the transaction
182    ///
183    /// Writes all buffered pages to the WAL, then writes a Commit record.
184    ///
185    /// **Read-only fast path:** when `write_set` is empty, the
186    /// transaction wrote nothing, so there is nothing to make
187    /// durable. We skip the WAL append, the `wal.sync()` (which costs
188    /// ~100 µs of fsync), and the pager apply loop entirely. The
189    /// transaction still transitions to `Committed` and unregisters
190    /// from the manager so subsequent state checks work correctly.
191    /// This mirrors postgres' optimisation in `RecordTransactionCommit`
192    /// (`xact.c`) which skips `XLogFlush` when nothing was written.
193    pub fn commit(mut self) -> Result<(), TxError> {
194        if self.state != TxState::Active {
195            return match self.state {
196                TxState::Committed => Err(TxError::AlreadyCommitted),
197                TxState::Aborted => Err(TxError::AlreadyAborted),
198                _ => Err(TxError::NotActive),
199            };
200        }
201
202        // ── Read-only fast path ─────────────────────────────────────
203        // No writes → no WAL record → no fsync. Saves ~100 µs per
204        // read-only commit and removes contention on the WAL writer
205        // mutex for read-heavy workloads.
206        if self.write_set.is_empty() {
207            self.state = TxState::Committed;
208            self.manager.unregister_transaction(self.id);
209            return Ok(());
210        }
211
212        // ── Encode phase (no lock) ──────────────────────────────────
213        // Encode every WAL record into one contiguous byte blob
214        // OUTSIDE any lock. This is the bulk of the per-commit work
215        // and pays no contention cost — the old path encoded each
216        // record while holding `Mutex<WalWriter>`, which under 16-way
217        // concurrency produced the park-convoy that bottlenecked
218        // `concurrent` and `insert_sequential`.
219        let mut blob = Vec::with_capacity(64 + self.write_set.len() * (PAGE_SIZE + 32));
220        for (page_id, buffered) in &self.write_set {
221            let record = WalRecord::PageWrite {
222                tx_id: self.id,
223                page_id: *page_id,
224                data: buffered.data.to_vec(),
225            };
226            // Encode straight into the per-call blob scratch — no fresh Vec
227            // per record. `blob` is owned by this commit frame and never
228            // shared across appenders, so this is safe on the lock-free path.
229            record.encode_into(&mut blob);
230        }
231        WalRecord::Commit { tx_id: self.id }.encode_into(&mut blob);
232
233        // ── Reserve + enqueue (lock-free) ───────────────────────────
234        // One atomic fetch_add reserves our LSN range; one
235        // SegQueue::push hands the bytes to the leader. Both
236        // operations are wait-free for the writer.
237        let commit_lsn = self.manager.coordinator.reserve_and_enqueue(blob);
238
239        // ── Wait for durability ─────────────────────────────────────
240        // If `durable_lsn >= commit_lsn` already, the leader (some
241        // earlier thread) covered us — return immediately. Otherwise
242        // we either become the leader and drive the drain, or park
243        // on the coordinator's parking_lot::Condvar until a leader
244        // publishes a `durable_lsn` past our target.
245        self.manager
246            .coordinator
247            .commit_at_least(commit_lsn, &self.manager.wal)
248            .map_err(TxError::Io)?;
249
250        // Apply writes to pager cache (for immediate visibility)
251        for (page_id, buffered) in &self.write_set {
252            let page = Page::from_bytes(buffered.data);
253            self.manager
254                .pager
255                .write_page(*page_id, page)
256                .map_err(|e| TxError::Pager(e.to_string()))?;
257        }
258
259        self.state = TxState::Committed;
260
261        // Unregister from manager
262        self.manager.unregister_transaction(self.id);
263
264        Ok(())
265    }
266
267    /// Rollback the transaction
268    ///
269    /// Discards all buffered writes and writes a Rollback record to the WAL.
270    pub fn rollback(mut self) -> Result<(), TxError> {
271        if self.state != TxState::Active {
272            return match self.state {
273                TxState::Committed => Err(TxError::AlreadyCommitted),
274                TxState::Aborted => Err(TxError::AlreadyAborted),
275                _ => Err(TxError::NotActive),
276            };
277        }
278
279        // Route rollback through the coordinator so its bytes land
280        // in LSN order with any concurrent commits. Going around the
281        // coordinator (direct `wal.lock().append`) would race with
282        // the leader's `append_bytes` and corrupt the file.
283        let blob = WalRecord::Rollback { tx_id: self.id }.encode();
284        let target = self.manager.coordinator.reserve_and_enqueue(blob);
285        self.manager
286            .coordinator
287            .commit_at_least(target, &self.manager.wal)
288            .map_err(TxError::Io)?;
289
290        // Clear write set
291        self.write_set.clear();
292        self.state = TxState::Aborted;
293
294        // Unregister from manager
295        self.manager.unregister_transaction(self.id);
296
297        Ok(())
298    }
299}
300
301impl Drop for Transaction {
302    fn drop(&mut self) {
303        // If transaction is still active when dropped, it means it was neither
304        // committed nor rolled back. This is a bug, but we'll clean up anyway.
305        if self.state == TxState::Active {
306            // Best-effort rollback through the coordinator. We can't
307            // bypass the coordinator with a direct `wal.lock()` here:
308            // any in-flight reservations would still be holding LSN
309            // slots ahead of us and the file would gain a hole. The
310            // coordinator handles ordering correctly even from Drop.
311            let blob = WalRecord::Rollback { tx_id: self.id }.encode();
312            let target = self.manager.coordinator.reserve_and_enqueue(blob);
313            let _ = self
314                .manager
315                .coordinator
316                .commit_at_least(target, &self.manager.wal);
317            self.manager.unregister_transaction(self.id);
318        }
319    }
320}
321
322/// Transaction Manager
323///
324/// Coordinates transactions and manages the WAL.
325pub struct TransactionManager {
326    /// Pager for reading/writing pages
327    pager: Arc<Pager>,
328    /// WAL writer protected by a `parking_lot::Mutex`. The mutex is
329    /// taken only by the leader-flush path inside the coordinator;
330    /// concurrent writers no longer contend on it during the
331    /// encode-and-append step. See [`WalAppendCoordinator`].
332    wal: Mutex<WalWriter>,
333    /// WAL file path
334    wal_path: PathBuf,
335    /// Active transaction IDs
336    active_transactions: RwLock<Vec<u64>>,
337    /// Lock-free append coordinator. Replaces the per-commit
338    /// `wal.lock()` that used to serialise 16 concurrent writers
339    /// (Roadmap #2 / issue #157). Writers reserve an LSN range via
340    /// atomic fetch_add, push their encoded bytes onto a SegQueue,
341    /// then call `commit_at_least` to wait for durability. The
342    /// first thread into `commit_at_least` becomes the leader and
343    /// drives the WAL drain + fsync; everyone else parks on the
344    /// coordinator's condvar.
345    coordinator: WalAppendCoordinator,
346}
347
348impl TransactionManager {
349    /// Create a new transaction manager
350    ///
351    /// # Arguments
352    ///
353    /// * `pager` - The pager to use for page I/O
354    /// * `wal_path` - Path to the WAL file
355    pub fn new(pager: Arc<Pager>, wal_path: impl AsRef<Path>) -> io::Result<Self> {
356        let wal_path = wal_path.as_ref().to_path_buf();
357        let wal = WalWriter::open(&wal_path)?;
358        let initial_current = wal.current_lsn();
359        let initial_durable = wal.durable_lsn();
360
361        Ok(Self {
362            pager,
363            wal: Mutex::new(wal),
364            wal_path,
365            active_transactions: RwLock::new(Vec::new()),
366            coordinator: WalAppendCoordinator::new(initial_current, initial_durable),
367        })
368    }
369
370    fn wal_writer(&self) -> Result<MutexGuard<'_, WalWriter>, TxError> {
371        // parking_lot::Mutex does not poison on panic, so this is
372        // infallible. We keep the `Result` return type to avoid
373        // touching every call site, but the error variant is now
374        // unreachable in normal operation. Tests that previously
375        // poisoned the std::sync::Mutex deliberately have been
376        // adjusted to assert the new non-poisoning behaviour.
377        Ok(self.wal.lock())
378    }
379
380    fn active_transactions_write(&self) -> RwLockWriteGuard<'_, Vec<u64>> {
381        self.active_transactions
382            .write()
383            .unwrap_or_else(|poisoned| poisoned.into_inner())
384    }
385
386    fn active_transactions_read(&self) -> RwLockReadGuard<'_, Vec<u64>> {
387        self.active_transactions
388            .read()
389            .unwrap_or_else(|poisoned| poisoned.into_inner())
390    }
391
392    /// Begin a new transaction
393    pub fn begin(self: &Arc<Self>) -> Result<Transaction, TxError> {
394        let tx_id = next_transaction_id();
395
396        // Route the Begin record through the coordinator (same
397        // ordering guarantees as commit/rollback). Begin is not
398        // strictly required to be durable before subsequent appends
399        // — recovery treats a Begin without a matching Commit as a
400        // rolled-back txn — so we don't wait on `commit_at_least`.
401        // The bytes are queued and the next leader picks them up
402        // alongside our own Commit record when we eventually commit.
403        let blob = WalRecord::Begin { tx_id }.encode();
404        let _begin_lsn = self.coordinator.reserve_and_enqueue(blob);
405
406        // Register transaction
407        {
408            let mut active = self.active_transactions_write();
409            active.push(tx_id);
410        }
411
412        Ok(Transaction {
413            id: tx_id,
414            state: TxState::Active,
415            write_set: HashMap::new(),
416            read_set: Vec::new(),
417            manager: Arc::clone(self),
418        })
419    }
420
421    /// Unregister a transaction (called on commit/rollback)
422    fn unregister_transaction(&self, tx_id: u64) {
423        let mut active = self.active_transactions_write();
424        active.retain(|&id| id != tx_id);
425    }
426
427    /// Get list of active transaction IDs
428    pub fn active_transactions(&self) -> Vec<u64> {
429        self.active_transactions_read().clone()
430    }
431
432    /// Get WAL file path
433    pub fn wal_path(&self) -> &Path {
434        &self.wal_path
435    }
436
437    /// Get reference to pager
438    pub fn pager(&self) -> &Arc<Pager> {
439        &self.pager
440    }
441
442    /// Sync WAL to disk. Drains every byte that has been reserved
443    /// via the coordinator — i.e. waits until the coordinator's
444    /// `next_lsn` is durable.
445    pub fn sync_wal(&self) -> io::Result<()> {
446        let target = self.coordinator.next_lsn();
447        self.coordinator.commit_at_least(target, &self.wal)
448    }
449
450    /// Check if there are active transactions
451    pub fn has_active_transactions(&self) -> bool {
452        !self.active_transactions_read().is_empty()
453    }
454}
455
456#[cfg(test)]
457mod tests {
458    use super::*;
459    use crate::storage::engine::PageType;
460    use std::fs;
461    use std::time::{SystemTime, UNIX_EPOCH};
462
463    fn temp_dir() -> PathBuf {
464        let timestamp = SystemTime::now()
465            .duration_since(UNIX_EPOCH)
466            .expect("value is present")
467            .as_nanos();
468        std::env::temp_dir().join(format!("reddb_tx_test_{}", timestamp))
469    }
470
471    fn cleanup(dir: &Path) {
472        let _ = fs::remove_dir_all(dir);
473    }
474
475    fn temp_wal_path(dir: &Path, name: &str) -> PathBuf {
476        reddb_file::layout::wal_component_temp_path(dir, "transaction", name, std::process::id())
477    }
478
479    #[test]
480    fn test_transaction_commit() {
481        let dir = temp_dir();
482        let _ = fs::create_dir_all(&dir);
483        let db_path = dir.join("test.db");
484        let wal_path = temp_wal_path(&dir, "commit");
485
486        // Create pager
487        let pager = Arc::new(Pager::open_default(&db_path).expect("open_default() should succeed"));
488
489        // Allocate a page
490        let page = pager
491            .allocate_page(PageType::BTreeLeaf)
492            .expect("allocate_page() should succeed");
493        let page_id = page.page_id();
494
495        // Create transaction manager
496        let tm = Arc::new(
497            TransactionManager::new(Arc::clone(&pager), &wal_path).expect("new() should succeed"),
498        );
499
500        // Begin transaction
501        let mut tx = tm.begin().expect("begin() should succeed");
502        assert!(tx.is_active());
503
504        // Write through transaction
505        let mut page = Page::new(PageType::BTreeLeaf, page_id);
506        page.as_bytes_mut()[100] = 0xAB;
507        tx.write_page(page_id, page)
508            .expect("write_page() should succeed");
509
510        // Read through transaction (should see buffered write)
511        let read_page = tx.read_page(page_id).expect("read_page() should succeed");
512        assert_eq!(read_page.as_bytes()[100], 0xAB);
513
514        // Commit
515        tx.commit().expect("commit() should succeed");
516
517        // Verify write is visible through pager
518        let final_page = pager
519            .read_page(page_id)
520            .expect("read_page() should succeed");
521        assert_eq!(final_page.as_bytes()[100], 0xAB);
522
523        cleanup(&dir);
524    }
525
526    #[test]
527    fn test_transaction_rollback() {
528        let dir = temp_dir();
529        let _ = fs::create_dir_all(&dir);
530        let db_path = dir.join("test.db");
531        let wal_path = temp_wal_path(&dir, "rollback");
532
533        // Create pager
534        let pager = Arc::new(Pager::open_default(&db_path).expect("open_default() should succeed"));
535
536        // Allocate a page and write initial value
537        let mut page = pager
538            .allocate_page(PageType::BTreeLeaf)
539            .expect("allocate_page() should succeed");
540        let page_id = page.page_id();
541        page.as_bytes_mut()[100] = 0x11;
542        pager
543            .write_page(page_id, page)
544            .expect("write_page() should succeed");
545
546        // Create transaction manager
547        let tm = Arc::new(
548            TransactionManager::new(Arc::clone(&pager), &wal_path).expect("new() should succeed"),
549        );
550
551        // Begin transaction
552        let mut tx = tm.begin().expect("begin() should succeed");
553
554        // Write through transaction
555        let mut page = Page::new(PageType::BTreeLeaf, page_id);
556        page.as_bytes_mut()[100] = 0xAB;
557        tx.write_page(page_id, page)
558            .expect("write_page() should succeed");
559
560        // Read through transaction (should see buffered write)
561        let read_page = tx.read_page(page_id).expect("read_page() should succeed");
562        assert_eq!(read_page.as_bytes()[100], 0xAB);
563
564        // Rollback
565        tx.rollback().expect("rollback() should succeed");
566
567        // Original value should be preserved
568        let final_page = pager
569            .read_page(page_id)
570            .expect("read_page() should succeed");
571        assert_eq!(final_page.as_bytes()[100], 0x11);
572
573        cleanup(&dir);
574    }
575
576    #[test]
577    fn test_multiple_transactions() {
578        let dir = temp_dir();
579        let _ = fs::create_dir_all(&dir);
580        let db_path = dir.join("test.db");
581        let wal_path = temp_wal_path(&dir, "multiple");
582
583        // Create pager
584        let pager = Arc::new(Pager::open_default(&db_path).expect("open_default() should succeed"));
585
586        // Allocate two pages
587        let page1 = pager
588            .allocate_page(PageType::BTreeLeaf)
589            .expect("allocate_page() should succeed");
590        let page2 = pager
591            .allocate_page(PageType::BTreeLeaf)
592            .expect("allocate_page() should succeed");
593        let page1_id = page1.page_id();
594        let page2_id = page2.page_id();
595
596        // Create transaction manager
597        let tm = Arc::new(
598            TransactionManager::new(Arc::clone(&pager), &wal_path).expect("new() should succeed"),
599        );
600
601        // Transaction 1: Write to page 1
602        let mut tx1 = tm.begin().expect("begin() should succeed");
603        let mut page1 = Page::new(PageType::BTreeLeaf, page1_id);
604        page1.as_bytes_mut()[100] = 0x11;
605        tx1.write_page(page1_id, page1)
606            .expect("write_page() should succeed");
607        tx1.commit().expect("commit() should succeed");
608
609        // Transaction 2: Write to page 2
610        let mut tx2 = tm.begin().expect("begin() should succeed");
611        let mut page2 = Page::new(PageType::BTreeLeaf, page2_id);
612        page2.as_bytes_mut()[100] = 0x22;
613        tx2.write_page(page2_id, page2)
614            .expect("write_page() should succeed");
615        tx2.commit().expect("commit() should succeed");
616
617        // Verify both writes
618        let final_page1 = pager
619            .read_page(page1_id)
620            .expect("read_page() should succeed");
621        let final_page2 = pager
622            .read_page(page2_id)
623            .expect("read_page() should succeed");
624        assert_eq!(final_page1.as_bytes()[100], 0x11);
625        assert_eq!(final_page2.as_bytes()[100], 0x22);
626
627        cleanup(&dir);
628    }
629
630    #[test]
631    fn test_transaction_isolation() {
632        let dir = temp_dir();
633        let _ = fs::create_dir_all(&dir);
634        let db_path = dir.join("test.db");
635        let wal_path = temp_wal_path(&dir, "isolation");
636
637        // Create pager
638        let pager = Arc::new(Pager::open_default(&db_path).expect("open_default() should succeed"));
639
640        // Allocate a page with initial value
641        let mut page = pager
642            .allocate_page(PageType::BTreeLeaf)
643            .expect("allocate_page() should succeed");
644        let page_id = page.page_id();
645        page.as_bytes_mut()[100] = 0x00;
646        pager
647            .write_page(page_id, page)
648            .expect("write_page() should succeed");
649
650        // Create transaction manager
651        let tm = Arc::new(
652            TransactionManager::new(Arc::clone(&pager), &wal_path).expect("new() should succeed"),
653        );
654
655        // Transaction 1: Begin and write (but don't commit yet)
656        let mut tx1 = tm.begin().expect("begin() should succeed");
657        let mut page1 = Page::new(PageType::BTreeLeaf, page_id);
658        page1.as_bytes_mut()[100] = 0x11;
659        tx1.write_page(page_id, page1)
660            .expect("write_page() should succeed");
661
662        // Transaction 1 should see its own write
663        let tx1_read = tx1.read_page(page_id).expect("read_page() should succeed");
664        assert_eq!(tx1_read.as_bytes()[100], 0x11);
665
666        // Another read from pager should not see uncommitted write
667        let pager_read = pager
668            .read_page(page_id)
669            .expect("read_page() should succeed");
670        assert_eq!(pager_read.as_bytes()[100], 0x00);
671
672        // Commit tx1
673        tx1.commit().expect("commit() should succeed");
674
675        // Now pager should see the write
676        let final_read = pager
677            .read_page(page_id)
678            .expect("read_page() should succeed");
679        assert_eq!(final_read.as_bytes()[100], 0x11);
680
681        cleanup(&dir);
682    }
683
684    #[test]
685    fn test_active_transaction_tracking() {
686        let dir = temp_dir();
687        let _ = fs::create_dir_all(&dir);
688        let db_path = dir.join("test.db");
689        let wal_path = temp_wal_path(&dir, "tracking");
690
691        let pager = Arc::new(Pager::open_default(&db_path).expect("open_default() should succeed"));
692        let tm = Arc::new(
693            TransactionManager::new(Arc::clone(&pager), &wal_path).expect("new() should succeed"),
694        );
695
696        assert!(!tm.has_active_transactions());
697
698        let tx1 = tm.begin().expect("begin() should succeed");
699        let tx1_id = tx1.id();
700        assert!(tm.has_active_transactions());
701        assert!(tm.active_transactions().contains(&tx1_id));
702
703        let tx2 = tm.begin().expect("begin() should succeed");
704        let tx2_id = tx2.id();
705        assert_eq!(tm.active_transactions().len(), 2);
706
707        tx1.commit().expect("commit() should succeed");
708        assert!(!tm.active_transactions().contains(&tx1_id));
709        assert!(tm.active_transactions().contains(&tx2_id));
710
711        tx2.rollback().expect("rollback() should succeed");
712        assert!(!tm.has_active_transactions());
713
714        cleanup(&dir);
715    }
716
717    #[test]
718    fn test_transaction_double_commit() {
719        let dir = temp_dir();
720        let _ = fs::create_dir_all(&dir);
721        let db_path = dir.join("test.db");
722        let wal_path = temp_wal_path(&dir, "double-commit");
723
724        let pager = Arc::new(Pager::open_default(&db_path).expect("open_default() should succeed"));
725        let tm = Arc::new(
726            TransactionManager::new(Arc::clone(&pager), &wal_path).expect("new() should succeed"),
727        );
728
729        // The transaction is consumed on commit, so double commit is impossible at compile time
730        // This test just verifies commit works
731        let tx = tm.begin().expect("begin() should succeed");
732        tx.commit().expect("commit() should succeed");
733
734        cleanup(&dir);
735    }
736
737    #[test]
738    fn test_begin_succeeds_after_panic_in_lock_holder() {
739        // After Roadmap #2 the WAL mutex is `parking_lot::Mutex`,
740        // which does NOT poison on panic. A previous version of this
741        // test asserted that a panicking thread holding the lock
742        // produced a `TxError::LockPoisoned` on the next `begin()` —
743        // that was a property of `std::sync::Mutex`. The new
744        // contract is the opposite: writers continue uninterrupted.
745        let dir = temp_dir();
746        let _ = fs::create_dir_all(&dir);
747        let db_path = dir.join("test.db");
748        let wal_path = temp_wal_path(&dir, "panic-lock-holder");
749
750        let pager = Arc::new(Pager::open_default(&db_path).expect("open_default() should succeed"));
751        let tm = Arc::new(
752            TransactionManager::new(Arc::clone(&pager), &wal_path).expect("new() should succeed"),
753        );
754
755        let poison_target = Arc::clone(&tm);
756        let _ = std::thread::spawn(move || {
757            let _guard = poison_target.wal.lock();
758            panic!("would-poison the wal mutex on std::sync");
759        })
760        .join();
761
762        // parking_lot recovers transparently. begin() must succeed.
763        match tm.begin() {
764            Ok(_) => {}
765            Err(err) => panic!("begin must succeed despite prior panic: {err:?}"),
766        }
767
768        cleanup(&dir);
769    }
770
771    // ---------------------------------------------------------------
772    // Perf 1.2: read-only commit fast path
773    // ---------------------------------------------------------------
774
775    #[test]
776    fn read_only_commit_does_not_advance_durable_lsn() {
777        let dir = temp_dir();
778        let _ = fs::create_dir_all(&dir);
779        let db_path = dir.join("ro_durable.db");
780        let wal_path = temp_wal_path(&dir, "ro-durable");
781
782        let pager = Arc::new(Pager::open_default(&db_path).expect("open_default() should succeed"));
783        let tm = Arc::new(
784            TransactionManager::new(Arc::clone(&pager), &wal_path).expect("new() should succeed"),
785        );
786
787        // Snapshot the WAL durable_lsn BEFORE the txn.
788        let before = {
789            let wal = tm.wal_writer().expect("wal_writer() should succeed");
790            wal.durable_lsn()
791        };
792
793        let tx = tm.begin().expect("begin() should succeed");
794        // Empty write_set on purpose — read-only.
795        tx.commit().expect("commit() should succeed");
796
797        // After RO commit, the WAL durable_lsn must NOT have advanced.
798        // No Begin record, no Commit record, no fsync.
799        let after = {
800            let wal = tm.wal_writer().expect("wal_writer() should succeed");
801            wal.durable_lsn()
802        };
803        assert_eq!(
804            before, after,
805            "read-only commit must not advance durable_lsn (was {} → {})",
806            before, after
807        );
808
809        cleanup(&dir);
810    }
811
812    #[test]
813    fn read_only_commit_does_not_grow_wal_file() {
814        let dir = temp_dir();
815        let _ = fs::create_dir_all(&dir);
816        let db_path = dir.join("ro_size.db");
817        let wal_path = temp_wal_path(&dir, "ro-size");
818
819        let pager = Arc::new(Pager::open_default(&db_path).expect("open_default() should succeed"));
820        let tm = Arc::new(
821            TransactionManager::new(Arc::clone(&pager), &wal_path).expect("new() should succeed"),
822        );
823
824        // Snapshot file size after WAL header.
825        let size_before = std::fs::metadata(&wal_path)
826            .expect("metadata() should succeed")
827            .len();
828        assert_eq!(
829            size_before,
830            reddb_file::WAL_FILE_HEADER_BYTES as u64,
831            "fresh WAL must be exactly the 8-byte header"
832        );
833
834        // 100 read-only commits in a loop.
835        for _ in 0..100 {
836            let tx = tm.begin().expect("begin() should succeed");
837            tx.commit().expect("commit() should succeed");
838        }
839
840        let size_after = std::fs::metadata(&wal_path)
841            .expect("metadata() should succeed")
842            .len();
843        assert_eq!(
844            size_after, size_before,
845            "100 read-only commits should not have written any WAL bytes"
846        );
847        cleanup(&dir);
848    }
849
850    #[test]
851    fn read_only_commit_marks_transaction_committed() {
852        let dir = temp_dir();
853        let _ = fs::create_dir_all(&dir);
854        let db_path = dir.join("ro_state.db");
855        let wal_path = temp_wal_path(&dir, "ro-state");
856
857        let pager = Arc::new(Pager::open_default(&db_path).expect("open_default() should succeed"));
858        let tm = Arc::new(
859            TransactionManager::new(Arc::clone(&pager), &wal_path).expect("new() should succeed"),
860        );
861
862        let tx = tm.begin().expect("begin() should succeed");
863        let id = tx.id();
864        tx.commit().expect("commit() should succeed");
865
866        // Manager must have unregistered this txn — the active list
867        // no longer contains its id.
868        assert!(
869            !tm.active_transactions().contains(&id),
870            "RO-committed txn {id} must no longer be active in the manager"
871        );
872
873        cleanup(&dir);
874    }
875
876    #[test]
877    fn writing_commit_still_syncs_after_ro_fast_path() {
878        // Sanity: the fast path must NOT short-circuit a transaction
879        // that did write something. Verify the writing commit path
880        // still flushes WAL and the value lands in the pager.
881        let dir = temp_dir();
882        let _ = fs::create_dir_all(&dir);
883        let db_path = dir.join("rw_after_ro.db");
884        let wal_path = temp_wal_path(&dir, "rw-after-ro");
885
886        let pager = Arc::new(Pager::open_default(&db_path).expect("open_default() should succeed"));
887        let allocated = pager
888            .allocate_page(PageType::BTreeLeaf)
889            .expect("allocate_page() should succeed");
890        let page_id = allocated.page_id();
891        let tm = Arc::new(
892            TransactionManager::new(Arc::clone(&pager), &wal_path).expect("new() should succeed"),
893        );
894
895        // First a RO commit (must take the fast path).
896        let ro = tm.begin().expect("begin() should succeed");
897        ro.commit().expect("commit() should succeed");
898
899        // Then a real writing commit.
900        let mut rw = tm.begin().expect("begin() should succeed");
901        let mut page = Page::new(PageType::BTreeLeaf, page_id);
902        page.as_bytes_mut()[42] = 0x77;
903        rw.write_page(page_id, page)
904            .expect("write_page() should succeed");
905        rw.commit().expect("commit() should succeed");
906
907        // The WAL file must now contain bytes (PageWrite + Commit
908        // records, and the BufWriter has been flushed by sync()).
909        let size = std::fs::metadata(&wal_path)
910            .expect("metadata() should succeed")
911            .len();
912        assert!(size > 8, "writing commit should grow the WAL");
913
914        // The pager cache must reflect the write.
915        let read_back = pager
916            .read_page(page_id)
917            .expect("read_page() should succeed");
918        assert_eq!(read_back.as_bytes()[42], 0x77);
919
920        cleanup(&dir);
921    }
922}