1use std::collections::HashMap;
25use std::io;
26use std::path::{Path, PathBuf};
27use std::sync::atomic::{AtomicU64, Ordering};
28use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
29
30use parking_lot::{Mutex, MutexGuard};
31
32use super::append_coordinator::WalAppendCoordinator;
33use super::record::WalRecord;
34use super::writer::WalWriter;
35use crate::storage::engine::{Page, Pager, PAGE_SIZE};
36
37static NEXT_TX_ID: AtomicU64 = AtomicU64::new(1);
39
40fn next_transaction_id() -> u64 {
42 NEXT_TX_ID.fetch_add(1, Ordering::SeqCst)
43}
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum TxState {
48 Active,
50 Committed,
52 Aborted,
54}
55
56#[derive(Debug)]
58pub enum TxError {
59 Io(io::Error),
61 Pager(String),
63 LockPoisoned(&'static str),
65 NotActive,
67 AlreadyCommitted,
69 AlreadyAborted,
71 WriteConflict(u32),
73 InvalidPage(String),
75}
76
77impl std::fmt::Display for TxError {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 match self {
80 Self::Io(e) => write!(f, "I/O error: {}", e),
81 Self::Pager(msg) => write!(f, "Pager error: {}", msg),
82 Self::LockPoisoned(name) => write!(f, "Lock poisoned: {}", name),
83 Self::NotActive => write!(f, "Transaction is not active"),
84 Self::AlreadyCommitted => write!(f, "Transaction already committed"),
85 Self::AlreadyAborted => write!(f, "Transaction already aborted"),
86 Self::WriteConflict(page_id) => write!(f, "Write conflict on page {}", page_id),
87 Self::InvalidPage(msg) => write!(f, "Invalid page: {}", msg),
88 }
89 }
90}
91
92impl std::error::Error for TxError {}
93
94impl From<io::Error> for TxError {
95 fn from(e: io::Error) -> Self {
96 Self::Io(e)
97 }
98}
99
100#[derive(Clone)]
102struct BufferedWrite {
103 page_id: u32,
104 data: [u8; PAGE_SIZE],
105}
106
107pub struct Transaction {
111 id: u64,
113 state: TxState,
115 write_set: HashMap<u32, BufferedWrite>,
117 read_set: Vec<u32>,
119 manager: Arc<TransactionManager>,
121}
122
123impl Transaction {
124 pub fn id(&self) -> u64 {
126 self.id
127 }
128
129 pub fn state(&self) -> TxState {
131 self.state
132 }
133
134 pub fn is_active(&self) -> bool {
136 self.state == TxState::Active
137 }
138
139 pub fn read_page(&mut self, page_id: u32) -> Result<Page, TxError> {
144 if self.state != TxState::Active {
145 return Err(TxError::NotActive);
146 }
147
148 if let Some(buffered) = self.write_set.get(&page_id) {
150 return Ok(Page::from_bytes(buffered.data));
151 }
152
153 self.read_set.push(page_id);
155
156 self.manager
158 .pager
159 .read_page(page_id)
160 .map_err(|e| TxError::Pager(e.to_string()))
161 }
162
163 pub fn write_page(&mut self, page_id: u32, page: Page) -> Result<(), TxError> {
167 if self.state != TxState::Active {
168 return Err(TxError::NotActive);
169 }
170
171 let mut data = [0u8; PAGE_SIZE];
173 data.copy_from_slice(page.as_bytes());
174
175 self.write_set
176 .insert(page_id, BufferedWrite { page_id, data });
177
178 Ok(())
179 }
180
181 pub fn commit(mut self) -> Result<(), TxError> {
194 if self.state != TxState::Active {
195 return match self.state {
196 TxState::Committed => Err(TxError::AlreadyCommitted),
197 TxState::Aborted => Err(TxError::AlreadyAborted),
198 _ => Err(TxError::NotActive),
199 };
200 }
201
202 if self.write_set.is_empty() {
207 self.state = TxState::Committed;
208 self.manager.unregister_transaction(self.id);
209 return Ok(());
210 }
211
212 let mut blob = Vec::with_capacity(64 + self.write_set.len() * (PAGE_SIZE + 32));
220 for (page_id, buffered) in &self.write_set {
221 let record = WalRecord::PageWrite {
222 tx_id: self.id,
223 page_id: *page_id,
224 data: buffered.data.to_vec(),
225 };
226 record.encode_into(&mut blob);
230 }
231 WalRecord::Commit { tx_id: self.id }.encode_into(&mut blob);
232
233 let commit_lsn = self.manager.coordinator.reserve_and_enqueue(blob);
238
239 self.manager
246 .coordinator
247 .commit_at_least(commit_lsn, &self.manager.wal)
248 .map_err(TxError::Io)?;
249
250 for (page_id, buffered) in &self.write_set {
252 let page = Page::from_bytes(buffered.data);
253 self.manager
254 .pager
255 .write_page(*page_id, page)
256 .map_err(|e| TxError::Pager(e.to_string()))?;
257 }
258
259 self.state = TxState::Committed;
260
261 self.manager.unregister_transaction(self.id);
263
264 Ok(())
265 }
266
267 pub fn rollback(mut self) -> Result<(), TxError> {
271 if self.state != TxState::Active {
272 return match self.state {
273 TxState::Committed => Err(TxError::AlreadyCommitted),
274 TxState::Aborted => Err(TxError::AlreadyAborted),
275 _ => Err(TxError::NotActive),
276 };
277 }
278
279 let blob = WalRecord::Rollback { tx_id: self.id }.encode();
284 let target = self.manager.coordinator.reserve_and_enqueue(blob);
285 self.manager
286 .coordinator
287 .commit_at_least(target, &self.manager.wal)
288 .map_err(TxError::Io)?;
289
290 self.write_set.clear();
292 self.state = TxState::Aborted;
293
294 self.manager.unregister_transaction(self.id);
296
297 Ok(())
298 }
299}
300
301impl Drop for Transaction {
302 fn drop(&mut self) {
303 if self.state == TxState::Active {
306 let blob = WalRecord::Rollback { tx_id: self.id }.encode();
312 let target = self.manager.coordinator.reserve_and_enqueue(blob);
313 let _ = self
314 .manager
315 .coordinator
316 .commit_at_least(target, &self.manager.wal);
317 self.manager.unregister_transaction(self.id);
318 }
319 }
320}
321
322pub struct TransactionManager {
326 pager: Arc<Pager>,
328 wal: Mutex<WalWriter>,
333 wal_path: PathBuf,
335 active_transactions: RwLock<Vec<u64>>,
337 coordinator: WalAppendCoordinator,
346}
347
348impl TransactionManager {
349 pub fn new(pager: Arc<Pager>, wal_path: impl AsRef<Path>) -> io::Result<Self> {
356 let wal_path = wal_path.as_ref().to_path_buf();
357 let wal = WalWriter::open(&wal_path)?;
358 let initial_current = wal.current_lsn();
359 let initial_durable = wal.durable_lsn();
360
361 Ok(Self {
362 pager,
363 wal: Mutex::new(wal),
364 wal_path,
365 active_transactions: RwLock::new(Vec::new()),
366 coordinator: WalAppendCoordinator::new(initial_current, initial_durable),
367 })
368 }
369
370 fn wal_writer(&self) -> Result<MutexGuard<'_, WalWriter>, TxError> {
371 Ok(self.wal.lock())
378 }
379
380 fn active_transactions_write(&self) -> RwLockWriteGuard<'_, Vec<u64>> {
381 self.active_transactions
382 .write()
383 .unwrap_or_else(|poisoned| poisoned.into_inner())
384 }
385
386 fn active_transactions_read(&self) -> RwLockReadGuard<'_, Vec<u64>> {
387 self.active_transactions
388 .read()
389 .unwrap_or_else(|poisoned| poisoned.into_inner())
390 }
391
392 pub fn begin(self: &Arc<Self>) -> Result<Transaction, TxError> {
394 let tx_id = next_transaction_id();
395
396 let blob = WalRecord::Begin { tx_id }.encode();
404 let _begin_lsn = self.coordinator.reserve_and_enqueue(blob);
405
406 {
408 let mut active = self.active_transactions_write();
409 active.push(tx_id);
410 }
411
412 Ok(Transaction {
413 id: tx_id,
414 state: TxState::Active,
415 write_set: HashMap::new(),
416 read_set: Vec::new(),
417 manager: Arc::clone(self),
418 })
419 }
420
421 fn unregister_transaction(&self, tx_id: u64) {
423 let mut active = self.active_transactions_write();
424 active.retain(|&id| id != tx_id);
425 }
426
427 pub fn active_transactions(&self) -> Vec<u64> {
429 self.active_transactions_read().clone()
430 }
431
432 pub fn wal_path(&self) -> &Path {
434 &self.wal_path
435 }
436
437 pub fn pager(&self) -> &Arc<Pager> {
439 &self.pager
440 }
441
442 pub fn sync_wal(&self) -> io::Result<()> {
446 let target = self.coordinator.next_lsn();
447 self.coordinator.commit_at_least(target, &self.wal)
448 }
449
450 pub fn has_active_transactions(&self) -> bool {
452 !self.active_transactions_read().is_empty()
453 }
454}
455
456#[cfg(test)]
457mod tests {
458 use super::*;
459 use crate::storage::engine::PageType;
460 use std::fs;
461 use std::time::{SystemTime, UNIX_EPOCH};
462
463 fn temp_dir() -> PathBuf {
464 let timestamp = SystemTime::now()
465 .duration_since(UNIX_EPOCH)
466 .expect("value is present")
467 .as_nanos();
468 std::env::temp_dir().join(format!("reddb_tx_test_{}", timestamp))
469 }
470
471 fn cleanup(dir: &Path) {
472 let _ = fs::remove_dir_all(dir);
473 }
474
475 fn temp_wal_path(dir: &Path, name: &str) -> PathBuf {
476 reddb_file::layout::wal_component_temp_path(dir, "transaction", name, std::process::id())
477 }
478
479 #[test]
480 fn test_transaction_commit() {
481 let dir = temp_dir();
482 let _ = fs::create_dir_all(&dir);
483 let db_path = dir.join("test.db");
484 let wal_path = temp_wal_path(&dir, "commit");
485
486 let pager = Arc::new(Pager::open_default(&db_path).expect("open_default() should succeed"));
488
489 let page = pager
491 .allocate_page(PageType::BTreeLeaf)
492 .expect("allocate_page() should succeed");
493 let page_id = page.page_id();
494
495 let tm = Arc::new(
497 TransactionManager::new(Arc::clone(&pager), &wal_path).expect("new() should succeed"),
498 );
499
500 let mut tx = tm.begin().expect("begin() should succeed");
502 assert!(tx.is_active());
503
504 let mut page = Page::new(PageType::BTreeLeaf, page_id);
506 page.as_bytes_mut()[100] = 0xAB;
507 tx.write_page(page_id, page)
508 .expect("write_page() should succeed");
509
510 let read_page = tx.read_page(page_id).expect("read_page() should succeed");
512 assert_eq!(read_page.as_bytes()[100], 0xAB);
513
514 tx.commit().expect("commit() should succeed");
516
517 let final_page = pager
519 .read_page(page_id)
520 .expect("read_page() should succeed");
521 assert_eq!(final_page.as_bytes()[100], 0xAB);
522
523 cleanup(&dir);
524 }
525
526 #[test]
527 fn test_transaction_rollback() {
528 let dir = temp_dir();
529 let _ = fs::create_dir_all(&dir);
530 let db_path = dir.join("test.db");
531 let wal_path = temp_wal_path(&dir, "rollback");
532
533 let pager = Arc::new(Pager::open_default(&db_path).expect("open_default() should succeed"));
535
536 let mut page = pager
538 .allocate_page(PageType::BTreeLeaf)
539 .expect("allocate_page() should succeed");
540 let page_id = page.page_id();
541 page.as_bytes_mut()[100] = 0x11;
542 pager
543 .write_page(page_id, page)
544 .expect("write_page() should succeed");
545
546 let tm = Arc::new(
548 TransactionManager::new(Arc::clone(&pager), &wal_path).expect("new() should succeed"),
549 );
550
551 let mut tx = tm.begin().expect("begin() should succeed");
553
554 let mut page = Page::new(PageType::BTreeLeaf, page_id);
556 page.as_bytes_mut()[100] = 0xAB;
557 tx.write_page(page_id, page)
558 .expect("write_page() should succeed");
559
560 let read_page = tx.read_page(page_id).expect("read_page() should succeed");
562 assert_eq!(read_page.as_bytes()[100], 0xAB);
563
564 tx.rollback().expect("rollback() should succeed");
566
567 let final_page = pager
569 .read_page(page_id)
570 .expect("read_page() should succeed");
571 assert_eq!(final_page.as_bytes()[100], 0x11);
572
573 cleanup(&dir);
574 }
575
576 #[test]
577 fn test_multiple_transactions() {
578 let dir = temp_dir();
579 let _ = fs::create_dir_all(&dir);
580 let db_path = dir.join("test.db");
581 let wal_path = temp_wal_path(&dir, "multiple");
582
583 let pager = Arc::new(Pager::open_default(&db_path).expect("open_default() should succeed"));
585
586 let page1 = pager
588 .allocate_page(PageType::BTreeLeaf)
589 .expect("allocate_page() should succeed");
590 let page2 = pager
591 .allocate_page(PageType::BTreeLeaf)
592 .expect("allocate_page() should succeed");
593 let page1_id = page1.page_id();
594 let page2_id = page2.page_id();
595
596 let tm = Arc::new(
598 TransactionManager::new(Arc::clone(&pager), &wal_path).expect("new() should succeed"),
599 );
600
601 let mut tx1 = tm.begin().expect("begin() should succeed");
603 let mut page1 = Page::new(PageType::BTreeLeaf, page1_id);
604 page1.as_bytes_mut()[100] = 0x11;
605 tx1.write_page(page1_id, page1)
606 .expect("write_page() should succeed");
607 tx1.commit().expect("commit() should succeed");
608
609 let mut tx2 = tm.begin().expect("begin() should succeed");
611 let mut page2 = Page::new(PageType::BTreeLeaf, page2_id);
612 page2.as_bytes_mut()[100] = 0x22;
613 tx2.write_page(page2_id, page2)
614 .expect("write_page() should succeed");
615 tx2.commit().expect("commit() should succeed");
616
617 let final_page1 = pager
619 .read_page(page1_id)
620 .expect("read_page() should succeed");
621 let final_page2 = pager
622 .read_page(page2_id)
623 .expect("read_page() should succeed");
624 assert_eq!(final_page1.as_bytes()[100], 0x11);
625 assert_eq!(final_page2.as_bytes()[100], 0x22);
626
627 cleanup(&dir);
628 }
629
630 #[test]
631 fn test_transaction_isolation() {
632 let dir = temp_dir();
633 let _ = fs::create_dir_all(&dir);
634 let db_path = dir.join("test.db");
635 let wal_path = temp_wal_path(&dir, "isolation");
636
637 let pager = Arc::new(Pager::open_default(&db_path).expect("open_default() should succeed"));
639
640 let mut page = pager
642 .allocate_page(PageType::BTreeLeaf)
643 .expect("allocate_page() should succeed");
644 let page_id = page.page_id();
645 page.as_bytes_mut()[100] = 0x00;
646 pager
647 .write_page(page_id, page)
648 .expect("write_page() should succeed");
649
650 let tm = Arc::new(
652 TransactionManager::new(Arc::clone(&pager), &wal_path).expect("new() should succeed"),
653 );
654
655 let mut tx1 = tm.begin().expect("begin() should succeed");
657 let mut page1 = Page::new(PageType::BTreeLeaf, page_id);
658 page1.as_bytes_mut()[100] = 0x11;
659 tx1.write_page(page_id, page1)
660 .expect("write_page() should succeed");
661
662 let tx1_read = tx1.read_page(page_id).expect("read_page() should succeed");
664 assert_eq!(tx1_read.as_bytes()[100], 0x11);
665
666 let pager_read = pager
668 .read_page(page_id)
669 .expect("read_page() should succeed");
670 assert_eq!(pager_read.as_bytes()[100], 0x00);
671
672 tx1.commit().expect("commit() should succeed");
674
675 let final_read = pager
677 .read_page(page_id)
678 .expect("read_page() should succeed");
679 assert_eq!(final_read.as_bytes()[100], 0x11);
680
681 cleanup(&dir);
682 }
683
684 #[test]
685 fn test_active_transaction_tracking() {
686 let dir = temp_dir();
687 let _ = fs::create_dir_all(&dir);
688 let db_path = dir.join("test.db");
689 let wal_path = temp_wal_path(&dir, "tracking");
690
691 let pager = Arc::new(Pager::open_default(&db_path).expect("open_default() should succeed"));
692 let tm = Arc::new(
693 TransactionManager::new(Arc::clone(&pager), &wal_path).expect("new() should succeed"),
694 );
695
696 assert!(!tm.has_active_transactions());
697
698 let tx1 = tm.begin().expect("begin() should succeed");
699 let tx1_id = tx1.id();
700 assert!(tm.has_active_transactions());
701 assert!(tm.active_transactions().contains(&tx1_id));
702
703 let tx2 = tm.begin().expect("begin() should succeed");
704 let tx2_id = tx2.id();
705 assert_eq!(tm.active_transactions().len(), 2);
706
707 tx1.commit().expect("commit() should succeed");
708 assert!(!tm.active_transactions().contains(&tx1_id));
709 assert!(tm.active_transactions().contains(&tx2_id));
710
711 tx2.rollback().expect("rollback() should succeed");
712 assert!(!tm.has_active_transactions());
713
714 cleanup(&dir);
715 }
716
717 #[test]
718 fn test_transaction_double_commit() {
719 let dir = temp_dir();
720 let _ = fs::create_dir_all(&dir);
721 let db_path = dir.join("test.db");
722 let wal_path = temp_wal_path(&dir, "double-commit");
723
724 let pager = Arc::new(Pager::open_default(&db_path).expect("open_default() should succeed"));
725 let tm = Arc::new(
726 TransactionManager::new(Arc::clone(&pager), &wal_path).expect("new() should succeed"),
727 );
728
729 let tx = tm.begin().expect("begin() should succeed");
732 tx.commit().expect("commit() should succeed");
733
734 cleanup(&dir);
735 }
736
737 #[test]
738 fn test_begin_succeeds_after_panic_in_lock_holder() {
739 let dir = temp_dir();
746 let _ = fs::create_dir_all(&dir);
747 let db_path = dir.join("test.db");
748 let wal_path = temp_wal_path(&dir, "panic-lock-holder");
749
750 let pager = Arc::new(Pager::open_default(&db_path).expect("open_default() should succeed"));
751 let tm = Arc::new(
752 TransactionManager::new(Arc::clone(&pager), &wal_path).expect("new() should succeed"),
753 );
754
755 let poison_target = Arc::clone(&tm);
756 let _ = std::thread::spawn(move || {
757 let _guard = poison_target.wal.lock();
758 panic!("would-poison the wal mutex on std::sync");
759 })
760 .join();
761
762 match tm.begin() {
764 Ok(_) => {}
765 Err(err) => panic!("begin must succeed despite prior panic: {err:?}"),
766 }
767
768 cleanup(&dir);
769 }
770
771 #[test]
776 fn read_only_commit_does_not_advance_durable_lsn() {
777 let dir = temp_dir();
778 let _ = fs::create_dir_all(&dir);
779 let db_path = dir.join("ro_durable.db");
780 let wal_path = temp_wal_path(&dir, "ro-durable");
781
782 let pager = Arc::new(Pager::open_default(&db_path).expect("open_default() should succeed"));
783 let tm = Arc::new(
784 TransactionManager::new(Arc::clone(&pager), &wal_path).expect("new() should succeed"),
785 );
786
787 let before = {
789 let wal = tm.wal_writer().expect("wal_writer() should succeed");
790 wal.durable_lsn()
791 };
792
793 let tx = tm.begin().expect("begin() should succeed");
794 tx.commit().expect("commit() should succeed");
796
797 let after = {
800 let wal = tm.wal_writer().expect("wal_writer() should succeed");
801 wal.durable_lsn()
802 };
803 assert_eq!(
804 before, after,
805 "read-only commit must not advance durable_lsn (was {} → {})",
806 before, after
807 );
808
809 cleanup(&dir);
810 }
811
812 #[test]
813 fn read_only_commit_does_not_grow_wal_file() {
814 let dir = temp_dir();
815 let _ = fs::create_dir_all(&dir);
816 let db_path = dir.join("ro_size.db");
817 let wal_path = temp_wal_path(&dir, "ro-size");
818
819 let pager = Arc::new(Pager::open_default(&db_path).expect("open_default() should succeed"));
820 let tm = Arc::new(
821 TransactionManager::new(Arc::clone(&pager), &wal_path).expect("new() should succeed"),
822 );
823
824 let size_before = std::fs::metadata(&wal_path)
826 .expect("metadata() should succeed")
827 .len();
828 assert_eq!(
829 size_before,
830 reddb_file::WAL_FILE_HEADER_BYTES as u64,
831 "fresh WAL must be exactly the 8-byte header"
832 );
833
834 for _ in 0..100 {
836 let tx = tm.begin().expect("begin() should succeed");
837 tx.commit().expect("commit() should succeed");
838 }
839
840 let size_after = std::fs::metadata(&wal_path)
841 .expect("metadata() should succeed")
842 .len();
843 assert_eq!(
844 size_after, size_before,
845 "100 read-only commits should not have written any WAL bytes"
846 );
847 cleanup(&dir);
848 }
849
850 #[test]
851 fn read_only_commit_marks_transaction_committed() {
852 let dir = temp_dir();
853 let _ = fs::create_dir_all(&dir);
854 let db_path = dir.join("ro_state.db");
855 let wal_path = temp_wal_path(&dir, "ro-state");
856
857 let pager = Arc::new(Pager::open_default(&db_path).expect("open_default() should succeed"));
858 let tm = Arc::new(
859 TransactionManager::new(Arc::clone(&pager), &wal_path).expect("new() should succeed"),
860 );
861
862 let tx = tm.begin().expect("begin() should succeed");
863 let id = tx.id();
864 tx.commit().expect("commit() should succeed");
865
866 assert!(
869 !tm.active_transactions().contains(&id),
870 "RO-committed txn {id} must no longer be active in the manager"
871 );
872
873 cleanup(&dir);
874 }
875
876 #[test]
877 fn writing_commit_still_syncs_after_ro_fast_path() {
878 let dir = temp_dir();
882 let _ = fs::create_dir_all(&dir);
883 let db_path = dir.join("rw_after_ro.db");
884 let wal_path = temp_wal_path(&dir, "rw-after-ro");
885
886 let pager = Arc::new(Pager::open_default(&db_path).expect("open_default() should succeed"));
887 let allocated = pager
888 .allocate_page(PageType::BTreeLeaf)
889 .expect("allocate_page() should succeed");
890 let page_id = allocated.page_id();
891 let tm = Arc::new(
892 TransactionManager::new(Arc::clone(&pager), &wal_path).expect("new() should succeed"),
893 );
894
895 let ro = tm.begin().expect("begin() should succeed");
897 ro.commit().expect("commit() should succeed");
898
899 let mut rw = tm.begin().expect("begin() should succeed");
901 let mut page = Page::new(PageType::BTreeLeaf, page_id);
902 page.as_bytes_mut()[42] = 0x77;
903 rw.write_page(page_id, page)
904 .expect("write_page() should succeed");
905 rw.commit().expect("commit() should succeed");
906
907 let size = std::fs::metadata(&wal_path)
910 .expect("metadata() should succeed")
911 .len();
912 assert!(size > 8, "writing commit should grow the WAL");
913
914 let read_back = pager
916 .read_page(page_id)
917 .expect("read_page() should succeed");
918 assert_eq!(read_back.as_bytes()[42], 0x77);
919
920 cleanup(&dir);
921 }
922}