vibesql_storage/database/
transaction_api.rs1use super::transactions::TransactionChange;
9use super::Database;
10use crate::wal::{DurabilityMode, TransactionDurability, WalOp};
11use crate::StorageError;
12
13impl Database {
14 pub fn record_change(&mut self, change: TransactionChange) {
20 self.lifecycle.transaction_manager_mut().record_change(change);
21 }
22
23 pub fn begin_transaction(&mut self) -> Result<(), StorageError> {
25 self.begin_transaction_with_durability(TransactionDurability::Default)
26 }
27
28 pub fn begin_transaction_with_durability(
33 &mut self,
34 durability: TransactionDurability,
35 ) -> Result<(), StorageError> {
36 let catalog = &self.catalog.clone();
37 self.lifecycle.transaction_manager_mut().begin_transaction_with_durability(
38 catalog,
39 &self.tables,
40 durability,
41 )?;
42
43 if let Some(txn_id) = self.transaction_id() {
45 self.emit_wal_op(WalOp::TxnBegin { txn_id });
46 }
47
48 Ok(())
49 }
50
51 pub fn commit_transaction(&mut self) -> Result<(), StorageError> {
53 let txn_id = self.transaction_id();
55 let durability_hint = self.lifecycle.transaction_manager().get_durability();
56
57 self.lifecycle.transaction_manager_mut().commit_transaction()?;
58
59 if let Some(txn_id) = txn_id {
61 self.emit_wal_op(WalOp::TxnCommit { txn_id });
62 }
63
64 if let Some(hint) = durability_hint {
66 let db_mode = self
67 .persistence_engine
68 .as_ref()
69 .map(|e| e.durability_mode())
70 .unwrap_or(DurabilityMode::Lazy);
71
72 let resolved_mode = hint.resolve(db_mode);
73 if resolved_mode.sync_on_commit() {
74 self.sync_persistence()?;
75 }
76 }
77
78 Ok(())
79 }
80
81 pub fn rollback_transaction(&mut self) -> Result<(), StorageError> {
83 let txn_id = self.transaction_id();
85
86 self.lifecycle.perform_rollback(&mut self.catalog, &mut self.tables)?;
87
88 if let Some(txn_id) = txn_id {
90 self.emit_wal_op(WalOp::TxnRollback { txn_id });
91 }
92
93 Ok(())
94 }
95
96 pub fn in_transaction(&self) -> bool {
98 self.lifecycle.transaction_manager().in_transaction()
99 }
100
101 pub fn transaction_id(&self) -> Option<u64> {
103 self.lifecycle.transaction_manager().transaction_id()
104 }
105
106 pub fn create_savepoint(&mut self, name: String) -> Result<(), StorageError> {
108 self.lifecycle.transaction_manager_mut().create_savepoint(name)
109 }
110
111 pub fn rollback_to_savepoint(&mut self, name: String) -> Result<(), StorageError> {
113 let changes_to_undo =
114 self.lifecycle.transaction_manager_mut().rollback_to_savepoint(name)?;
115
116 for change in changes_to_undo.into_iter().rev() {
117 self.undo_change(change)?;
118 }
119
120 Ok(())
121 }
122
123 fn undo_change(&mut self, change: TransactionChange) -> Result<(), StorageError> {
125 match change {
126 TransactionChange::Insert { table_name, row } => {
127 let table = self
128 .get_table_mut(&table_name)
129 .ok_or_else(|| StorageError::TableNotFound(table_name.clone()))?;
130 table.remove_row(&row)?;
131 }
132 TransactionChange::Update { table_name, old_row, new_row: _ } => {
133 let table = self
134 .get_table_mut(&table_name)
135 .ok_or_else(|| StorageError::TableNotFound(table_name.clone()))?;
136 table.remove_row(&old_row)?;
137 table.insert(old_row)?;
138 }
139 TransactionChange::Delete { table_name, row } => {
140 let table = self
141 .get_table_mut(&table_name)
142 .ok_or_else(|| StorageError::TableNotFound(table_name.clone()))?;
143 table.insert(row)?;
144 }
145 }
146 Ok(())
147 }
148
149 pub fn release_savepoint(&mut self, name: String) -> Result<(), StorageError> {
151 self.lifecycle.transaction_manager_mut().release_savepoint(name)
152 }
153}