1use crate::backends::DatabaseTransaction;
7use crate::database::ManagedPool;
8use crate::error::{ModelError, ModelResult};
9use tracing::{debug, warn};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum IsolationLevel {
14 ReadUncommitted,
16 ReadCommitted,
18 RepeatableRead,
20 Serializable,
22}
23
24impl IsolationLevel {
25 pub fn as_sql(&self) -> &'static str {
27 match self {
28 IsolationLevel::ReadUncommitted => "READ UNCOMMITTED",
29 IsolationLevel::ReadCommitted => "READ COMMITTED",
30 IsolationLevel::RepeatableRead => "REPEATABLE READ",
31 IsolationLevel::Serializable => "SERIALIZABLE",
32 }
33 }
34}
35
36#[derive(Debug, Clone)]
38pub struct TransactionConfig {
39 pub isolation_level: Option<IsolationLevel>,
41 pub read_only: bool,
43 pub auto_retry: bool,
45 pub max_retries: u32,
47}
48
49impl Default for TransactionConfig {
50 fn default() -> Self {
51 Self {
52 isolation_level: None, read_only: false,
54 auto_retry: false,
55 max_retries: 3,
56 }
57 }
58}
59
60pub struct Transaction {
62 inner: Option<Box<dyn DatabaseTransaction>>,
63 config: TransactionConfig,
64 committed: bool,
65}
66
67impl Transaction {
68 pub async fn begin(
70 pool: &ManagedPool,
71 config: TransactionConfig,
72 ) -> Result<Transaction, ModelError> {
73 debug!("Beginning transaction with config: {:?}", config);
74
75 let mut tx = pool
76 .begin_transaction()
77 .await
78 .map_err(|e| ModelError::Transaction(format!("Failed to begin transaction: {}", e)))?;
79
80 if let Some(isolation_level) = config.isolation_level {
82 let sql = format!(
83 "SET TRANSACTION ISOLATION LEVEL {}",
84 isolation_level.as_sql()
85 );
86 tx.execute(&sql, &[]).await.map_err(|e| {
87 ModelError::Transaction(format!("Failed to set isolation level: {}", e))
88 })?;
89 debug!("Transaction isolation level set to: {:?}", isolation_level);
90 }
91
92 if config.read_only {
94 tx.execute("SET TRANSACTION READ ONLY", &[])
95 .await
96 .map_err(|e| {
97 ModelError::Transaction(format!("Failed to set read-only mode: {}", e))
98 })?;
99 debug!("Transaction set to read-only mode");
100 }
101
102 Ok(Transaction {
103 inner: Some(tx),
104 config,
105 committed: false,
106 })
107 }
108
109 pub async fn begin_default(pool: &ManagedPool) -> Result<Transaction, ModelError> {
111 Self::begin(pool, TransactionConfig::default()).await
112 }
113
114 pub async fn begin_read_only(pool: &ManagedPool) -> Result<Transaction, ModelError> {
116 let config = TransactionConfig {
117 read_only: true,
118 ..Default::default()
119 };
120 Self::begin(pool, config).await
121 }
122
123 pub async fn begin_serializable(pool: &ManagedPool) -> Result<Transaction, ModelError> {
125 let config = TransactionConfig {
126 isolation_level: Some(IsolationLevel::Serializable),
127 auto_retry: true, ..Default::default()
129 };
130 Self::begin(pool, config).await
131 }
132
133 pub fn is_active(&self) -> bool {
135 self.inner.is_some() && !self.committed
136 }
137
138 pub fn as_mut(&mut self) -> Option<&mut Box<dyn DatabaseTransaction>> {
145 self.inner.as_mut()
146 }
147
148 pub fn as_ref(&self) -> Option<&Box<dyn DatabaseTransaction>> {
150 self.inner.as_ref()
151 }
152
153 pub async fn execute(
155 &mut self,
156 sql: &str,
157 params: &[crate::backends::DatabaseValue],
158 ) -> Result<u64, ModelError> {
159 if let Some(tx) = &mut self.inner {
160 tx.execute(sql, params)
161 .await
162 .map_err(|e| ModelError::Transaction(format!("Transaction query failed: {}", e)))
163 } else {
164 Err(ModelError::Transaction(
165 "Transaction has been consumed".to_string(),
166 ))
167 }
168 }
169
170 pub async fn fetch_all(
172 &mut self,
173 sql: &str,
174 params: &[crate::backends::DatabaseValue],
175 ) -> Result<Vec<Box<dyn crate::backends::DatabaseRow>>, ModelError> {
176 if let Some(tx) = &mut self.inner {
177 tx.fetch_all(sql, params)
178 .await
179 .map_err(|e| ModelError::Transaction(format!("Transaction query failed: {}", e)))
180 } else {
181 Err(ModelError::Transaction(
182 "Transaction has been consumed".to_string(),
183 ))
184 }
185 }
186
187 pub async fn fetch_optional(
189 &mut self,
190 sql: &str,
191 params: &[crate::backends::DatabaseValue],
192 ) -> Result<Option<Box<dyn crate::backends::DatabaseRow>>, ModelError> {
193 if let Some(tx) = &mut self.inner {
194 tx.fetch_optional(sql, params)
195 .await
196 .map_err(|e| ModelError::Transaction(format!("Transaction query failed: {}", e)))
197 } else {
198 Err(ModelError::Transaction(
199 "Transaction has been consumed".to_string(),
200 ))
201 }
202 }
203
204 pub async fn execute_with<F, Fut, R>(&mut self, f: F) -> Result<R, ModelError>
209 where
210 F: FnOnce(&mut Self) -> Fut,
211 Fut: std::future::Future<Output = Result<R, ModelError>>,
212 {
213 if self.inner.is_some() {
214 f(self).await
215 } else {
216 Err(ModelError::Transaction(
217 "Transaction has been consumed".to_string(),
218 ))
219 }
220 }
221
222 pub async fn commit(mut self) -> ModelResult<()> {
224 if let Some(tx) = self.inner.take() {
225 debug!("Committing transaction");
226 tx.commit().await.map_err(|e| {
227 ModelError::Transaction(format!("Failed to commit transaction: {}", e))
228 })?;
229 debug!("Transaction committed successfully");
230 Ok(())
231 } else {
232 Err(ModelError::Transaction(
233 "Transaction has already been consumed".to_string(),
234 ))
235 }
236 }
237
238 pub async fn rollback(mut self) -> ModelResult<()> {
240 if let Some(tx) = self.inner.take() {
241 debug!("Rolling back transaction");
242 tx.rollback().await.map_err(|e| {
243 ModelError::Transaction(format!("Failed to rollback transaction: {}", e))
244 })?;
245 debug!("Transaction rolled back successfully");
246 Ok(())
247 } else {
248 Err(ModelError::Transaction(
249 "Transaction has already been consumed".to_string(),
250 ))
251 }
252 }
253
254 pub fn is_committed(&self) -> bool {
256 self.committed
257 }
258
259 pub fn config(&self) -> &TransactionConfig {
261 &self.config
262 }
263}
264
265impl Drop for Transaction {
266 fn drop(&mut self) {
268 if let Some(tx) = self.inner.take() {
269 if !self.committed {
270 warn!("Transaction dropped without explicit commit or rollback - this will cause an automatic rollback");
271 std::mem::drop(tx);
274 }
275 }
276 }
277}
278
279pub async fn with_transaction<F, Fut, R>(
286 pool: &ManagedPool,
287 config: TransactionConfig,
288 f: F,
289) -> Result<R, ModelError>
290where
291 F: Fn(&mut Transaction) -> Fut,
292 Fut: std::future::Future<Output = Result<R, ModelError>>,
293{
294 let mut attempts = 0;
295 let max_attempts = if config.auto_retry {
296 config.max_retries + 1
297 } else {
298 1
299 };
300
301 loop {
302 attempts += 1;
303 debug!(
304 "Starting transaction attempt {} of {}",
305 attempts, max_attempts
306 );
307
308 let mut tx = Transaction::begin(pool, config.clone()).await?;
309
310 match f(&mut tx).await {
311 Ok(result) => {
312 tx.commit().await?;
313 return Ok(result);
314 }
315 Err(e) => {
316 let should_retry =
318 config.auto_retry && attempts < max_attempts && is_serialization_failure(&e);
319
320 if should_retry {
321 warn!(
322 "Serialization failure on attempt {}, retrying: {}",
323 attempts, e
324 );
325 tx.rollback().await.ok(); continue;
327 } else {
328 tx.rollback().await.ok(); return Err(e);
330 }
331 }
332 }
333 }
334}
335
336pub async fn with_transaction_default<F, Fut, R>(pool: &ManagedPool, f: F) -> Result<R, ModelError>
338where
339 F: Fn(&mut Transaction) -> Fut,
340 Fut: std::future::Future<Output = Result<R, ModelError>>,
341{
342 with_transaction(pool, TransactionConfig::default(), f).await
343}
344
345pub fn is_serialization_failure(error: &ModelError) -> bool {
347 match error {
348 ModelError::Database(msg) | ModelError::Transaction(msg) => {
349 msg.contains("40001") || msg.contains("40P01") || msg.contains("could not serialize access")
353 }
354 _ => false,
355 }
356}
357
358#[cfg(test)]
359mod tests {
360 use super::*;
361
362 #[test]
363 fn test_isolation_level_sql() {
364 assert_eq!(IsolationLevel::ReadUncommitted.as_sql(), "READ UNCOMMITTED");
365 assert_eq!(IsolationLevel::ReadCommitted.as_sql(), "READ COMMITTED");
366 assert_eq!(IsolationLevel::RepeatableRead.as_sql(), "REPEATABLE READ");
367 assert_eq!(IsolationLevel::Serializable.as_sql(), "SERIALIZABLE");
368 }
369
370 #[test]
371 fn test_transaction_config_default() {
372 let config = TransactionConfig::default();
373 assert!(config.isolation_level.is_none());
374 assert!(!config.read_only);
375 assert!(!config.auto_retry);
376 assert_eq!(config.max_retries, 3);
377 }
378
379 #[test]
380 fn test_serialization_failure_detection() {
381 let err1 = ModelError::Database(
382 "ERROR: could not serialize access due to concurrent update".to_string(),
383 );
384 assert!(is_serialization_failure(&err1));
385
386 let err2 = ModelError::Transaction("ERROR: 40001".to_string());
387 assert!(is_serialization_failure(&err2));
388
389 let err3 = ModelError::Validation("Invalid input".to_string());
390 assert!(!is_serialization_failure(&err3));
391 }
392}