reddb_server/storage/wal/
transaction.rs1use std::collections::HashMap;
25use std::io;
26use std::path::{Path, PathBuf};
27use std::sync::atomic::{AtomicU64, Ordering};
28use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
29
30use parking_lot::{Mutex, MutexGuard};
31
32use super::append_coordinator::WalAppendCoordinator;
33use super::record::WalRecord;
34use super::writer::WalWriter;
35use crate::storage::engine::{Page, Pager, PAGE_SIZE};
36
37static NEXT_TX_ID: AtomicU64 = AtomicU64::new(1);
39
40fn next_transaction_id() -> u64 {
42 NEXT_TX_ID.fetch_add(1, Ordering::SeqCst)
43}
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum TxState {
48 Active,
50 Committed,
52 Aborted,
54}
55
56#[derive(Debug)]
58pub enum TxError {
59 Io(io::Error),
61 Pager(String),
63 LockPoisoned(&'static str),
65 NotActive,
67 AlreadyCommitted,
69 AlreadyAborted,
71 WriteConflict(u32),
73 InvalidPage(String),
75}
76
77impl std::fmt::Display for TxError {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 match self {
80 Self::Io(e) => write!(f, "I/O error: {}", e),
81 Self::Pager(msg) => write!(f, "Pager error: {}", msg),
82 Self::LockPoisoned(name) => write!(f, "Lock poisoned: {}", name),
83 Self::NotActive => write!(f, "Transaction is not active"),
84 Self::AlreadyCommitted => write!(f, "Transaction already committed"),
85 Self::AlreadyAborted => write!(f, "Transaction already aborted"),
86 Self::WriteConflict(page_id) => write!(f, "Write conflict on page {}", page_id),
87 Self::InvalidPage(msg) => write!(f, "Invalid page: {}", msg),
88 }
89 }
90}
91
92impl std::error::Error for TxError {}
93
94impl From<io::Error> for TxError {
95 fn from(e: io::Error) -> Self {
96 Self::Io(e)
97 }
98}
99
100#[derive(Clone)]
102struct BufferedWrite {
103 page_id: u32,
104 data: [u8; PAGE_SIZE],
105}
106
107pub struct Transaction {
111 id: u64,
113 state: TxState,
115 write_set: HashMap<u32, BufferedWrite>,
117 read_set: Vec<u32>,
119 manager: Arc<TransactionManager>,
121}
122
123impl Transaction {
124 pub fn id(&self) -> u64 {
126 self.id
127 }
128
129 pub fn state(&self) -> TxState {
131 self.state
132 }
133
134 pub fn is_active(&self) -> bool {
136 self.state == TxState::Active
137 }
138
139 pub fn read_page(&mut self, page_id: u32) -> Result<Page, TxError> {
144 if self.state != TxState::Active {
145 return Err(TxError::NotActive);
146 }
147
148 if let Some(buffered) = self.write_set.get(&page_id) {
150 return Ok(Page::from_bytes(buffered.data));
151 }
152
153 self.read_set.push(page_id);
155
156 self.manager
158 .pager
159 .read_page(page_id)
160 .map_err(|e| TxError::Pager(e.to_string()))
161 }
162
163 pub fn write_page(&mut self, page_id: u32, page: Page) -> Result<(), TxError> {
167 if self.state != TxState::Active {
168 return Err(TxError::NotActive);
169 }
170
171 let mut data = [0u8; PAGE_SIZE];
173 data.copy_from_slice(page.as_bytes());
174
175 self.write_set
176 .insert(page_id, BufferedWrite { page_id, data });
177
178 Ok(())
179 }
180
181 pub fn commit(mut self) -> Result<(), TxError> {
194 if self.state != TxState::Active {
195 return match self.state {
196 TxState::Committed => Err(TxError::AlreadyCommitted),
197 TxState::Aborted => Err(TxError::AlreadyAborted),
198 _ => Err(TxError::NotActive),
199 };
200 }
201
202 if self.write_set.is_empty() {
207 self.state = TxState::Committed;
208 self.manager.unregister_transaction(self.id);
209 return Ok(());
210 }
211
212 let mut blob = Vec::with_capacity(64 + self.write_set.len() * (PAGE_SIZE + 32));
220 for (page_id, buffered) in &self.write_set {
221 let record = WalRecord::PageWrite {
222 tx_id: self.id,
223 page_id: *page_id,
224 data: buffered.data.to_vec(),
225 };
226 blob.extend_from_slice(&record.encode());
227 }
228 blob.extend_from_slice(&WalRecord::Commit { tx_id: self.id }.encode());
229
230 let commit_lsn = self.manager.coordinator.reserve_and_enqueue(blob);
235
236 self.manager
243 .coordinator
244 .commit_at_least(commit_lsn, &self.manager.wal)
245 .map_err(TxError::Io)?;
246
247 for (page_id, buffered) in &self.write_set {
249 let page = Page::from_bytes(buffered.data);
250 self.manager
251 .pager
252 .write_page(*page_id, page)
253 .map_err(|e| TxError::Pager(e.to_string()))?;
254 }
255
256 self.state = TxState::Committed;
257
258 self.manager.unregister_transaction(self.id);
260
261 Ok(())
262 }
263
264 pub fn rollback(mut self) -> Result<(), TxError> {
268 if self.state != TxState::Active {
269 return match self.state {
270 TxState::Committed => Err(TxError::AlreadyCommitted),
271 TxState::Aborted => Err(TxError::AlreadyAborted),
272 _ => Err(TxError::NotActive),
273 };
274 }
275
276 let blob = WalRecord::Rollback { tx_id: self.id }.encode();
281 let target = self.manager.coordinator.reserve_and_enqueue(blob);
282 self.manager
283 .coordinator
284 .commit_at_least(target, &self.manager.wal)
285 .map_err(TxError::Io)?;
286
287 self.write_set.clear();
289 self.state = TxState::Aborted;
290
291 self.manager.unregister_transaction(self.id);
293
294 Ok(())
295 }
296}
297
298impl Drop for Transaction {
299 fn drop(&mut self) {
300 if self.state == TxState::Active {
303 let blob = WalRecord::Rollback { tx_id: self.id }.encode();
309 let target = self.manager.coordinator.reserve_and_enqueue(blob);
310 let _ = self
311 .manager
312 .coordinator
313 .commit_at_least(target, &self.manager.wal);
314 self.manager.unregister_transaction(self.id);
315 }
316 }
317}
318
319pub struct TransactionManager {
323 pager: Arc<Pager>,
325 wal: Mutex<WalWriter>,
330 wal_path: PathBuf,
332 active_transactions: RwLock<Vec<u64>>,
334 coordinator: WalAppendCoordinator,
343}
344
345impl TransactionManager {
346 pub fn new(pager: Arc<Pager>, wal_path: impl AsRef<Path>) -> io::Result<Self> {
353 let wal_path = wal_path.as_ref().to_path_buf();
354 let wal = WalWriter::open(&wal_path)?;
355 let initial_current = wal.current_lsn();
356 let initial_durable = wal.durable_lsn();
357
358 Ok(Self {
359 pager,
360 wal: Mutex::new(wal),
361 wal_path,
362 active_transactions: RwLock::new(Vec::new()),
363 coordinator: WalAppendCoordinator::new(initial_current, initial_durable),
364 })
365 }
366
367 fn wal_writer(&self) -> Result<MutexGuard<'_, WalWriter>, TxError> {
368 Ok(self.wal.lock())
375 }
376
377 fn active_transactions_write(&self) -> RwLockWriteGuard<'_, Vec<u64>> {
378 self.active_transactions
379 .write()
380 .unwrap_or_else(|poisoned| poisoned.into_inner())
381 }
382
383 fn active_transactions_read(&self) -> RwLockReadGuard<'_, Vec<u64>> {
384 self.active_transactions
385 .read()
386 .unwrap_or_else(|poisoned| poisoned.into_inner())
387 }
388
389 pub fn begin(self: &Arc<Self>) -> Result<Transaction, TxError> {
391 let tx_id = next_transaction_id();
392
393 let blob = WalRecord::Begin { tx_id }.encode();
401 let _begin_lsn = self.coordinator.reserve_and_enqueue(blob);
402
403 {
405 let mut active = self.active_transactions_write();
406 active.push(tx_id);
407 }
408
409 Ok(Transaction {
410 id: tx_id,
411 state: TxState::Active,
412 write_set: HashMap::new(),
413 read_set: Vec::new(),
414 manager: Arc::clone(self),
415 })
416 }
417
418 fn unregister_transaction(&self, tx_id: u64) {
420 let mut active = self.active_transactions_write();
421 active.retain(|&id| id != tx_id);
422 }
423
424 pub fn active_transactions(&self) -> Vec<u64> {
426 self.active_transactions_read().clone()
427 }
428
429 pub fn wal_path(&self) -> &Path {
431 &self.wal_path
432 }
433
434 pub fn pager(&self) -> &Arc<Pager> {
436 &self.pager
437 }
438
439 pub fn sync_wal(&self) -> io::Result<()> {
443 let target = self.coordinator.next_lsn();
444 self.coordinator.commit_at_least(target, &self.wal)
445 }
446
447 pub fn has_active_transactions(&self) -> bool {
449 !self.active_transactions_read().is_empty()
450 }
451}
452
453#[cfg(test)]
454mod tests {
455 use super::*;
456 use crate::storage::engine::PageType;
457 use std::fs;
458 use std::time::{SystemTime, UNIX_EPOCH};
459
460 fn temp_dir() -> PathBuf {
461 let timestamp = SystemTime::now()
462 .duration_since(UNIX_EPOCH)
463 .unwrap()
464 .as_nanos();
465 std::env::temp_dir().join(format!("reddb_tx_test_{}", timestamp))
466 }
467
468 fn cleanup(dir: &Path) {
469 let _ = fs::remove_dir_all(dir);
470 }
471
472 #[test]
473 fn test_transaction_commit() {
474 let dir = temp_dir();
475 let _ = fs::create_dir_all(&dir);
476 let db_path = dir.join("test.db");
477 let wal_path = dir.join("test.wal");
478
479 let pager = Arc::new(Pager::open_default(&db_path).unwrap());
481
482 let page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
484 let page_id = page.page_id();
485
486 let tm = Arc::new(TransactionManager::new(Arc::clone(&pager), &wal_path).unwrap());
488
489 let mut tx = tm.begin().unwrap();
491 assert!(tx.is_active());
492
493 let mut page = Page::new(PageType::BTreeLeaf, page_id);
495 page.as_bytes_mut()[100] = 0xAB;
496 tx.write_page(page_id, page).unwrap();
497
498 let read_page = tx.read_page(page_id).unwrap();
500 assert_eq!(read_page.as_bytes()[100], 0xAB);
501
502 tx.commit().unwrap();
504
505 let final_page = pager.read_page(page_id).unwrap();
507 assert_eq!(final_page.as_bytes()[100], 0xAB);
508
509 cleanup(&dir);
510 }
511
512 #[test]
513 fn test_transaction_rollback() {
514 let dir = temp_dir();
515 let _ = fs::create_dir_all(&dir);
516 let db_path = dir.join("test.db");
517 let wal_path = dir.join("test.wal");
518
519 let pager = Arc::new(Pager::open_default(&db_path).unwrap());
521
522 let mut page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
524 let page_id = page.page_id();
525 page.as_bytes_mut()[100] = 0x11;
526 pager.write_page(page_id, page).unwrap();
527
528 let tm = Arc::new(TransactionManager::new(Arc::clone(&pager), &wal_path).unwrap());
530
531 let mut tx = tm.begin().unwrap();
533
534 let mut page = Page::new(PageType::BTreeLeaf, page_id);
536 page.as_bytes_mut()[100] = 0xAB;
537 tx.write_page(page_id, page).unwrap();
538
539 let read_page = tx.read_page(page_id).unwrap();
541 assert_eq!(read_page.as_bytes()[100], 0xAB);
542
543 tx.rollback().unwrap();
545
546 let final_page = pager.read_page(page_id).unwrap();
548 assert_eq!(final_page.as_bytes()[100], 0x11);
549
550 cleanup(&dir);
551 }
552
553 #[test]
554 fn test_multiple_transactions() {
555 let dir = temp_dir();
556 let _ = fs::create_dir_all(&dir);
557 let db_path = dir.join("test.db");
558 let wal_path = dir.join("test.wal");
559
560 let pager = Arc::new(Pager::open_default(&db_path).unwrap());
562
563 let page1 = pager.allocate_page(PageType::BTreeLeaf).unwrap();
565 let page2 = pager.allocate_page(PageType::BTreeLeaf).unwrap();
566 let page1_id = page1.page_id();
567 let page2_id = page2.page_id();
568
569 let tm = Arc::new(TransactionManager::new(Arc::clone(&pager), &wal_path).unwrap());
571
572 let mut tx1 = tm.begin().unwrap();
574 let mut page1 = Page::new(PageType::BTreeLeaf, page1_id);
575 page1.as_bytes_mut()[100] = 0x11;
576 tx1.write_page(page1_id, page1).unwrap();
577 tx1.commit().unwrap();
578
579 let mut tx2 = tm.begin().unwrap();
581 let mut page2 = Page::new(PageType::BTreeLeaf, page2_id);
582 page2.as_bytes_mut()[100] = 0x22;
583 tx2.write_page(page2_id, page2).unwrap();
584 tx2.commit().unwrap();
585
586 let final_page1 = pager.read_page(page1_id).unwrap();
588 let final_page2 = pager.read_page(page2_id).unwrap();
589 assert_eq!(final_page1.as_bytes()[100], 0x11);
590 assert_eq!(final_page2.as_bytes()[100], 0x22);
591
592 cleanup(&dir);
593 }
594
595 #[test]
596 fn test_transaction_isolation() {
597 let dir = temp_dir();
598 let _ = fs::create_dir_all(&dir);
599 let db_path = dir.join("test.db");
600 let wal_path = dir.join("test.wal");
601
602 let pager = Arc::new(Pager::open_default(&db_path).unwrap());
604
605 let mut page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
607 let page_id = page.page_id();
608 page.as_bytes_mut()[100] = 0x00;
609 pager.write_page(page_id, page).unwrap();
610
611 let tm = Arc::new(TransactionManager::new(Arc::clone(&pager), &wal_path).unwrap());
613
614 let mut tx1 = tm.begin().unwrap();
616 let mut page1 = Page::new(PageType::BTreeLeaf, page_id);
617 page1.as_bytes_mut()[100] = 0x11;
618 tx1.write_page(page_id, page1).unwrap();
619
620 let tx1_read = tx1.read_page(page_id).unwrap();
622 assert_eq!(tx1_read.as_bytes()[100], 0x11);
623
624 let pager_read = pager.read_page(page_id).unwrap();
626 assert_eq!(pager_read.as_bytes()[100], 0x00);
627
628 tx1.commit().unwrap();
630
631 let final_read = pager.read_page(page_id).unwrap();
633 assert_eq!(final_read.as_bytes()[100], 0x11);
634
635 cleanup(&dir);
636 }
637
638 #[test]
639 fn test_active_transaction_tracking() {
640 let dir = temp_dir();
641 let _ = fs::create_dir_all(&dir);
642 let db_path = dir.join("test.db");
643 let wal_path = dir.join("test.wal");
644
645 let pager = Arc::new(Pager::open_default(&db_path).unwrap());
646 let tm = Arc::new(TransactionManager::new(Arc::clone(&pager), &wal_path).unwrap());
647
648 assert!(!tm.has_active_transactions());
649
650 let tx1 = tm.begin().unwrap();
651 let tx1_id = tx1.id();
652 assert!(tm.has_active_transactions());
653 assert!(tm.active_transactions().contains(&tx1_id));
654
655 let tx2 = tm.begin().unwrap();
656 let tx2_id = tx2.id();
657 assert_eq!(tm.active_transactions().len(), 2);
658
659 tx1.commit().unwrap();
660 assert!(!tm.active_transactions().contains(&tx1_id));
661 assert!(tm.active_transactions().contains(&tx2_id));
662
663 tx2.rollback().unwrap();
664 assert!(!tm.has_active_transactions());
665
666 cleanup(&dir);
667 }
668
669 #[test]
670 fn test_transaction_double_commit() {
671 let dir = temp_dir();
672 let _ = fs::create_dir_all(&dir);
673 let db_path = dir.join("test.db");
674 let wal_path = dir.join("test.wal");
675
676 let pager = Arc::new(Pager::open_default(&db_path).unwrap());
677 let tm = Arc::new(TransactionManager::new(Arc::clone(&pager), &wal_path).unwrap());
678
679 let tx = tm.begin().unwrap();
682 tx.commit().unwrap();
683
684 cleanup(&dir);
685 }
686
687 #[test]
688 fn test_begin_succeeds_after_panic_in_lock_holder() {
689 let dir = temp_dir();
696 let _ = fs::create_dir_all(&dir);
697 let db_path = dir.join("test.db");
698 let wal_path = dir.join("test.wal");
699
700 let pager = Arc::new(Pager::open_default(&db_path).unwrap());
701 let tm = Arc::new(TransactionManager::new(Arc::clone(&pager), &wal_path).unwrap());
702
703 let poison_target = Arc::clone(&tm);
704 let _ = std::thread::spawn(move || {
705 let _guard = poison_target.wal.lock();
706 panic!("would-poison the wal mutex on std::sync");
707 })
708 .join();
709
710 match tm.begin() {
712 Ok(_) => {}
713 Err(err) => panic!("begin must succeed despite prior panic: {err:?}"),
714 }
715
716 cleanup(&dir);
717 }
718
719 #[test]
724 fn read_only_commit_does_not_advance_durable_lsn() {
725 let dir = temp_dir();
726 let _ = fs::create_dir_all(&dir);
727 let db_path = dir.join("ro_durable.db");
728 let wal_path = dir.join("ro_durable.wal");
729
730 let pager = Arc::new(Pager::open_default(&db_path).unwrap());
731 let tm = Arc::new(TransactionManager::new(Arc::clone(&pager), &wal_path).unwrap());
732
733 let before = {
735 let wal = tm.wal_writer().unwrap();
736 wal.durable_lsn()
737 };
738
739 let tx = tm.begin().unwrap();
740 tx.commit().unwrap();
742
743 let after = {
746 let wal = tm.wal_writer().unwrap();
747 wal.durable_lsn()
748 };
749 assert_eq!(
750 before, after,
751 "read-only commit must not advance durable_lsn (was {} → {})",
752 before, after
753 );
754
755 cleanup(&dir);
756 }
757
758 #[test]
759 fn read_only_commit_does_not_grow_wal_file() {
760 let dir = temp_dir();
761 let _ = fs::create_dir_all(&dir);
762 let db_path = dir.join("ro_size.db");
763 let wal_path = dir.join("ro_size.wal");
764
765 let pager = Arc::new(Pager::open_default(&db_path).unwrap());
766 let tm = Arc::new(TransactionManager::new(Arc::clone(&pager), &wal_path).unwrap());
767
768 let size_before = std::fs::metadata(&wal_path).unwrap().len();
770 assert_eq!(
771 size_before, 8,
772 "fresh WAL must be exactly the 8-byte header"
773 );
774
775 for _ in 0..100 {
777 let tx = tm.begin().unwrap();
778 tx.commit().unwrap();
779 }
780
781 let size_after = std::fs::metadata(&wal_path).unwrap().len();
782 assert_eq!(
783 size_after, size_before,
784 "100 read-only commits should not have written any WAL bytes"
785 );
786 cleanup(&dir);
787 }
788
789 #[test]
790 fn read_only_commit_marks_transaction_committed() {
791 let dir = temp_dir();
792 let _ = fs::create_dir_all(&dir);
793 let db_path = dir.join("ro_state.db");
794 let wal_path = dir.join("ro_state.wal");
795
796 let pager = Arc::new(Pager::open_default(&db_path).unwrap());
797 let tm = Arc::new(TransactionManager::new(Arc::clone(&pager), &wal_path).unwrap());
798
799 let tx = tm.begin().unwrap();
800 let id = tx.id();
801 tx.commit().unwrap();
802
803 assert!(
806 !tm.active_transactions().contains(&id),
807 "RO-committed txn {id} must no longer be active in the manager"
808 );
809
810 cleanup(&dir);
811 }
812
813 #[test]
814 fn writing_commit_still_syncs_after_ro_fast_path() {
815 let dir = temp_dir();
819 let _ = fs::create_dir_all(&dir);
820 let db_path = dir.join("rw_after_ro.db");
821 let wal_path = dir.join("rw_after_ro.wal");
822
823 let pager = Arc::new(Pager::open_default(&db_path).unwrap());
824 let allocated = pager.allocate_page(PageType::BTreeLeaf).unwrap();
825 let page_id = allocated.page_id();
826 let tm = Arc::new(TransactionManager::new(Arc::clone(&pager), &wal_path).unwrap());
827
828 let ro = tm.begin().unwrap();
830 ro.commit().unwrap();
831
832 let mut rw = tm.begin().unwrap();
834 let mut page = Page::new(PageType::BTreeLeaf, page_id);
835 page.as_bytes_mut()[42] = 0x77;
836 rw.write_page(page_id, page).unwrap();
837 rw.commit().unwrap();
838
839 let size = std::fs::metadata(&wal_path).unwrap().len();
842 assert!(size > 8, "writing commit should grow the WAL");
843
844 let read_back = pager.read_page(page_id).unwrap();
846 assert_eq!(read_back.as_bytes()[42], 0x77);
847
848 cleanup(&dir);
849 }
850}