vibesql_storage/database/
transactions.rs

1// ============================================================================
2// Transaction Management
3// ============================================================================
4
5use std::collections::HashMap;
6
7use crate::{wal::TransactionDurability, Row, StorageError, Table};
8
9/// A single change made during a transaction
10#[derive(Debug, Clone)]
11#[allow(clippy::large_enum_variant)]
12pub enum TransactionChange {
13    Insert { table_name: String, row: Row },
14    Update { table_name: String, old_row: Row, new_row: Row },
15    Delete { table_name: String, row: Row },
16}
17
18/// A savepoint within a transaction
19#[derive(Debug, Clone)]
20pub struct Savepoint {
21    pub name: String,
22    /// Index in the changes vector where this savepoint was created
23    pub snapshot_index: usize,
24}
25
26/// Transaction state
27#[derive(Debug, Clone)]
28#[allow(clippy::large_enum_variant)]
29pub enum TransactionState {
30    /// No active transaction
31    None,
32    /// Transaction is active
33    Active {
34        /// Transaction ID for debugging
35        id: u64,
36        /// Original catalog snapshot for full rollback
37        original_catalog: vibesql_catalog::Catalog,
38        /// Original table snapshots for full rollback
39        original_tables: HashMap<String, Table>,
40        /// Stack of savepoints (newest at end)
41        savepoints: Vec<Savepoint>,
42        /// All changes made since transaction start (for incremental undo)
43        changes: Vec<TransactionChange>,
44        /// Durability hint for this transaction
45        durability: TransactionDurability,
46    },
47}
48
49/// Transaction manager - handles all transaction lifecycle and savepoint operations
50#[derive(Debug, Clone)]
51pub struct TransactionManager {
52    /// Current transaction state
53    transaction_state: TransactionState,
54    /// Next transaction ID
55    next_transaction_id: u64,
56}
57
58impl TransactionManager {
59    /// Create a new transaction manager
60    pub fn new() -> Self {
61        TransactionManager { transaction_state: TransactionState::None, next_transaction_id: 1 }
62    }
63
64    /// Record a change in the current transaction (if any)
65    pub fn record_change(&mut self, change: TransactionChange) {
66        if let TransactionState::Active { changes, .. } = &mut self.transaction_state {
67            changes.push(change);
68        }
69    }
70
71    /// Begin a new transaction with default durability
72    pub fn begin_transaction(
73        &mut self,
74        catalog: &vibesql_catalog::Catalog,
75        tables: &HashMap<String, Table>,
76    ) -> Result<(), StorageError> {
77        self.begin_transaction_with_durability(catalog, tables, TransactionDurability::Default)
78    }
79
80    /// Begin a new transaction with a specific durability hint
81    pub fn begin_transaction_with_durability(
82        &mut self,
83        catalog: &vibesql_catalog::Catalog,
84        tables: &HashMap<String, Table>,
85        durability: TransactionDurability,
86    ) -> Result<(), StorageError> {
87        match self.transaction_state {
88            TransactionState::None => {
89                // Create snapshots of catalog and all current tables
90                let original_catalog = catalog.clone();
91                let original_tables = tables.clone();
92
93                let transaction_id = self.next_transaction_id;
94                self.next_transaction_id += 1;
95
96                self.transaction_state = TransactionState::Active {
97                    id: transaction_id,
98                    original_catalog,
99                    original_tables,
100                    savepoints: Vec::new(),
101                    changes: Vec::new(),
102                    durability,
103                };
104                Ok(())
105            }
106            TransactionState::Active { .. } => {
107                Err(StorageError::TransactionError("Transaction already active".to_string()))
108            }
109        }
110    }
111
112    /// Commit the current transaction
113    pub fn commit_transaction(&mut self) -> Result<(), StorageError> {
114        match self.transaction_state {
115            TransactionState::None => {
116                Err(StorageError::TransactionError("No active transaction to commit".to_string()))
117            }
118            TransactionState::Active { .. } => {
119                // Transaction committed - just clear the state
120                // Changes are already in the tables
121                self.transaction_state = TransactionState::None;
122                Ok(())
123            }
124        }
125    }
126
127    /// Rollback the current transaction
128    pub fn rollback_transaction(
129        &mut self,
130        catalog: &mut vibesql_catalog::Catalog,
131        tables: &mut HashMap<String, Table>,
132    ) -> Result<(), StorageError> {
133        match &self.transaction_state {
134            TransactionState::None => {
135                Err(StorageError::TransactionError("No active transaction to rollback".to_string()))
136            }
137            TransactionState::Active { original_catalog, original_tables, .. } => {
138                // Restore catalog and all tables from snapshots
139                *catalog = original_catalog.clone();
140                *tables = original_tables.clone();
141                self.transaction_state = TransactionState::None;
142                Ok(())
143            }
144        }
145    }
146
147    /// Check if we're currently in a transaction
148    pub fn in_transaction(&self) -> bool {
149        matches!(self.transaction_state, TransactionState::Active { .. })
150    }
151
152    /// Get current transaction ID (for debugging)
153    pub fn transaction_id(&self) -> Option<u64> {
154        match &self.transaction_state {
155            TransactionState::Active { id, .. } => Some(*id),
156            TransactionState::None => None,
157        }
158    }
159
160    /// Get the durability hint for the current transaction (if any)
161    pub fn get_durability(&self) -> Option<TransactionDurability> {
162        match &self.transaction_state {
163            TransactionState::Active { durability, .. } => Some(*durability),
164            TransactionState::None => None,
165        }
166    }
167
168    /// Create a savepoint within the current transaction
169    pub fn create_savepoint(&mut self, name: String) -> Result<(), StorageError> {
170        match &mut self.transaction_state {
171            TransactionState::None => {
172                Err(StorageError::TransactionError("No active transaction".to_string()))
173            }
174            TransactionState::Active { savepoints, changes, .. } => {
175                let savepoint = Savepoint { name, snapshot_index: changes.len() };
176                savepoints.push(savepoint);
177                Ok(())
178            }
179        }
180    }
181
182    /// Rollback to a named savepoint - returns the changes that need to be undone
183    pub fn rollback_to_savepoint(
184        &mut self,
185        name: String,
186    ) -> Result<Vec<TransactionChange>, StorageError> {
187        match &mut self.transaction_state {
188            TransactionState::None => {
189                Err(StorageError::TransactionError("No active transaction".to_string()))
190            }
191            TransactionState::Active { savepoints, changes, .. } => {
192                // Find the savepoint
193                let savepoint_idx =
194                    savepoints.iter().position(|sp| sp.name == name).ok_or_else(|| {
195                        StorageError::TransactionError(format!("Savepoint '{}' not found", name))
196                    })?;
197
198                let snapshot_index = savepoints[savepoint_idx].snapshot_index;
199
200                // Collect changes to undo (from snapshot_index to end)
201                let changes_to_undo: Vec<_> = changes.drain(snapshot_index..).collect();
202
203                // Destroy later savepoints
204                savepoints.truncate(savepoint_idx + 1);
205
206                Ok(changes_to_undo)
207            }
208        }
209    }
210
211    /// Release (destroy) a named savepoint
212    pub fn release_savepoint(&mut self, name: String) -> Result<(), StorageError> {
213        match &mut self.transaction_state {
214            TransactionState::None => {
215                Err(StorageError::TransactionError("No active transaction".to_string()))
216            }
217            TransactionState::Active { savepoints, .. } => {
218                let savepoint_idx =
219                    savepoints.iter().position(|sp| sp.name == name).ok_or_else(|| {
220                        StorageError::TransactionError(format!("Savepoint '{}' not found", name))
221                    })?;
222
223                // Remove the savepoint
224                savepoints.remove(savepoint_idx);
225
226                Ok(())
227            }
228        }
229    }
230}
231
232impl Default for TransactionManager {
233    fn default() -> Self {
234        Self::new()
235    }
236}