vibesql_storage/database/
transactions.rs1use std::collections::HashMap;
6
7use crate::{wal::TransactionDurability, Row, StorageError, Table};
8
9#[derive(Debug, Clone)]
11#[allow(clippy::large_enum_variant)]
12pub enum TransactionChange {
13 Insert { table_name: String, row: Row },
14 Update { table_name: String, old_row: Row, new_row: Row },
15 Delete { table_name: String, row: Row },
16}
17
18#[derive(Debug, Clone)]
20pub struct Savepoint {
21 pub name: String,
22 pub snapshot_index: usize,
24}
25
26#[derive(Debug, Clone)]
28#[allow(clippy::large_enum_variant)]
29pub enum TransactionState {
30 None,
32 Active {
34 id: u64,
36 original_catalog: vibesql_catalog::Catalog,
38 original_tables: HashMap<String, Table>,
40 savepoints: Vec<Savepoint>,
42 changes: Vec<TransactionChange>,
44 durability: TransactionDurability,
46 },
47}
48
49#[derive(Debug, Clone)]
51pub struct TransactionManager {
52 transaction_state: TransactionState,
54 next_transaction_id: u64,
56}
57
58impl TransactionManager {
59 pub fn new() -> Self {
61 TransactionManager { transaction_state: TransactionState::None, next_transaction_id: 1 }
62 }
63
64 pub fn record_change(&mut self, change: TransactionChange) {
66 if let TransactionState::Active { changes, .. } = &mut self.transaction_state {
67 changes.push(change);
68 }
69 }
70
71 pub fn begin_transaction(
73 &mut self,
74 catalog: &vibesql_catalog::Catalog,
75 tables: &HashMap<String, Table>,
76 ) -> Result<(), StorageError> {
77 self.begin_transaction_with_durability(catalog, tables, TransactionDurability::Default)
78 }
79
80 pub fn begin_transaction_with_durability(
82 &mut self,
83 catalog: &vibesql_catalog::Catalog,
84 tables: &HashMap<String, Table>,
85 durability: TransactionDurability,
86 ) -> Result<(), StorageError> {
87 match self.transaction_state {
88 TransactionState::None => {
89 let original_catalog = catalog.clone();
91 let original_tables = tables.clone();
92
93 let transaction_id = self.next_transaction_id;
94 self.next_transaction_id += 1;
95
96 self.transaction_state = TransactionState::Active {
97 id: transaction_id,
98 original_catalog,
99 original_tables,
100 savepoints: Vec::new(),
101 changes: Vec::new(),
102 durability,
103 };
104 Ok(())
105 }
106 TransactionState::Active { .. } => {
107 Err(StorageError::TransactionError("Transaction already active".to_string()))
108 }
109 }
110 }
111
112 pub fn commit_transaction(&mut self) -> Result<(), StorageError> {
114 match self.transaction_state {
115 TransactionState::None => {
116 Err(StorageError::TransactionError("No active transaction to commit".to_string()))
117 }
118 TransactionState::Active { .. } => {
119 self.transaction_state = TransactionState::None;
122 Ok(())
123 }
124 }
125 }
126
127 pub fn rollback_transaction(
129 &mut self,
130 catalog: &mut vibesql_catalog::Catalog,
131 tables: &mut HashMap<String, Table>,
132 ) -> Result<(), StorageError> {
133 match &self.transaction_state {
134 TransactionState::None => {
135 Err(StorageError::TransactionError("No active transaction to rollback".to_string()))
136 }
137 TransactionState::Active { original_catalog, original_tables, .. } => {
138 *catalog = original_catalog.clone();
140 *tables = original_tables.clone();
141 self.transaction_state = TransactionState::None;
142 Ok(())
143 }
144 }
145 }
146
147 pub fn in_transaction(&self) -> bool {
149 matches!(self.transaction_state, TransactionState::Active { .. })
150 }
151
152 pub fn transaction_id(&self) -> Option<u64> {
154 match &self.transaction_state {
155 TransactionState::Active { id, .. } => Some(*id),
156 TransactionState::None => None,
157 }
158 }
159
160 pub fn get_durability(&self) -> Option<TransactionDurability> {
162 match &self.transaction_state {
163 TransactionState::Active { durability, .. } => Some(*durability),
164 TransactionState::None => None,
165 }
166 }
167
168 pub fn create_savepoint(&mut self, name: String) -> Result<(), StorageError> {
170 match &mut self.transaction_state {
171 TransactionState::None => {
172 Err(StorageError::TransactionError("No active transaction".to_string()))
173 }
174 TransactionState::Active { savepoints, changes, .. } => {
175 let savepoint = Savepoint { name, snapshot_index: changes.len() };
176 savepoints.push(savepoint);
177 Ok(())
178 }
179 }
180 }
181
182 pub fn rollback_to_savepoint(
184 &mut self,
185 name: String,
186 ) -> Result<Vec<TransactionChange>, StorageError> {
187 match &mut self.transaction_state {
188 TransactionState::None => {
189 Err(StorageError::TransactionError("No active transaction".to_string()))
190 }
191 TransactionState::Active { savepoints, changes, .. } => {
192 let savepoint_idx =
194 savepoints.iter().position(|sp| sp.name == name).ok_or_else(|| {
195 StorageError::TransactionError(format!("Savepoint '{}' not found", name))
196 })?;
197
198 let snapshot_index = savepoints[savepoint_idx].snapshot_index;
199
200 let changes_to_undo: Vec<_> = changes.drain(snapshot_index..).collect();
202
203 savepoints.truncate(savepoint_idx + 1);
205
206 Ok(changes_to_undo)
207 }
208 }
209 }
210
211 pub fn release_savepoint(&mut self, name: String) -> Result<(), StorageError> {
213 match &mut self.transaction_state {
214 TransactionState::None => {
215 Err(StorageError::TransactionError("No active transaction".to_string()))
216 }
217 TransactionState::Active { savepoints, .. } => {
218 let savepoint_idx =
219 savepoints.iter().position(|sp| sp.name == name).ok_or_else(|| {
220 StorageError::TransactionError(format!("Savepoint '{}' not found", name))
221 })?;
222
223 savepoints.remove(savepoint_idx);
225
226 Ok(())
227 }
228 }
229 }
230}
231
232impl Default for TransactionManager {
233 fn default() -> Self {
234 Self::new()
235 }
236}