elif_orm/
transaction.rs

1//! Transaction Management
2//!
3//! Provides high-level transaction management with automatic cleanup,
4//! scoped operations, and comprehensive error handling.
5
6use crate::backends::DatabaseTransaction;
7use crate::database::ManagedPool;
8use crate::error::{ModelError, ModelResult};
9use tracing::{debug, warn};
10
11/// Transaction isolation levels supported by PostgreSQL
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum IsolationLevel {
14    /// Read Uncommitted - lowest isolation level
15    ReadUncommitted,
16    /// Read Committed - default PostgreSQL isolation level
17    ReadCommitted,
18    /// Repeatable Read - stronger consistency guarantees
19    RepeatableRead,
20    /// Serializable - highest isolation level
21    Serializable,
22}
23
24impl IsolationLevel {
25    /// Convert to SQL string for SET TRANSACTION ISOLATION LEVEL command
26    pub fn as_sql(&self) -> &'static str {
27        match self {
28            IsolationLevel::ReadUncommitted => "READ UNCOMMITTED",
29            IsolationLevel::ReadCommitted => "READ COMMITTED",
30            IsolationLevel::RepeatableRead => "REPEATABLE READ",
31            IsolationLevel::Serializable => "SERIALIZABLE",
32        }
33    }
34}
35
36/// Transaction configuration options
37#[derive(Debug, Clone)]
38pub struct TransactionConfig {
39    /// Transaction isolation level
40    pub isolation_level: Option<IsolationLevel>,
41    /// Whether the transaction is read-only
42    pub read_only: bool,
43    /// Enable automatic retry on serialization failures
44    pub auto_retry: bool,
45    /// Maximum number of retry attempts
46    pub max_retries: u32,
47}
48
49impl Default for TransactionConfig {
50    fn default() -> Self {
51        Self {
52            isolation_level: None, // Use PostgreSQL default (READ COMMITTED)
53            read_only: false,
54            auto_retry: false,
55            max_retries: 3,
56        }
57    }
58}
59
60/// High-level transaction wrapper with automatic cleanup and enhanced functionality
61pub struct Transaction {
62    inner: Option<Box<dyn DatabaseTransaction>>,
63    config: TransactionConfig,
64    committed: bool,
65}
66
67impl Transaction {
68    /// Create a new transaction with the given configuration
69    pub async fn begin(
70        pool: &ManagedPool,
71        config: TransactionConfig,
72    ) -> Result<Transaction, ModelError> {
73        debug!("Beginning transaction with config: {:?}", config);
74
75        let mut tx = pool
76            .begin_transaction()
77            .await
78            .map_err(|e| ModelError::Transaction(format!("Failed to begin transaction: {}", e)))?;
79
80        // Set isolation level if specified
81        if let Some(isolation_level) = config.isolation_level {
82            let sql = format!(
83                "SET TRANSACTION ISOLATION LEVEL {}",
84                isolation_level.as_sql()
85            );
86            tx.execute(&sql, &[]).await.map_err(|e| {
87                ModelError::Transaction(format!("Failed to set isolation level: {}", e))
88            })?;
89            debug!("Transaction isolation level set to: {:?}", isolation_level);
90        }
91
92        // Set read-only mode if specified
93        if config.read_only {
94            tx.execute("SET TRANSACTION READ ONLY", &[])
95                .await
96                .map_err(|e| {
97                    ModelError::Transaction(format!("Failed to set read-only mode: {}", e))
98                })?;
99            debug!("Transaction set to read-only mode");
100        }
101
102        Ok(Transaction {
103            inner: Some(tx),
104            config,
105            committed: false,
106        })
107    }
108
109    /// Create a transaction with default configuration
110    pub async fn begin_default(pool: &ManagedPool) -> Result<Transaction, ModelError> {
111        Self::begin(pool, TransactionConfig::default()).await
112    }
113
114    /// Create a read-only transaction
115    pub async fn begin_read_only(pool: &ManagedPool) -> Result<Transaction, ModelError> {
116        let config = TransactionConfig {
117            read_only: true,
118            ..Default::default()
119        };
120        Self::begin(pool, config).await
121    }
122
123    /// Create a serializable transaction
124    pub async fn begin_serializable(pool: &ManagedPool) -> Result<Transaction, ModelError> {
125        let config = TransactionConfig {
126            isolation_level: Some(IsolationLevel::Serializable),
127            auto_retry: true, // Enable auto-retry for serializable transactions
128            ..Default::default()
129        };
130        Self::begin(pool, config).await
131    }
132
133    /// Check if transaction is still active
134    pub fn is_active(&self) -> bool {
135        self.inner.is_some() && !self.committed
136    }
137
138    /// Get a mutable reference to the underlying database transaction
139    ///
140    /// # Safety
141    /// This method provides direct access to the underlying transaction.
142    /// Care should be taken not to commit or rollback the transaction directly
143    /// as this will invalidate the Transaction wrapper.
144    pub fn as_mut(&mut self) -> Option<&mut Box<dyn DatabaseTransaction>> {
145        self.inner.as_mut()
146    }
147
148    /// Get an immutable reference to the underlying database transaction
149    pub fn as_ref(&self) -> Option<&Box<dyn DatabaseTransaction>> {
150        self.inner.as_ref()
151    }
152
153    /// Execute a query within this transaction
154    pub async fn execute(
155        &mut self,
156        sql: &str,
157        params: &[crate::backends::DatabaseValue],
158    ) -> Result<u64, ModelError> {
159        if let Some(tx) = &mut self.inner {
160            tx.execute(sql, params)
161                .await
162                .map_err(|e| ModelError::Transaction(format!("Transaction query failed: {}", e)))
163        } else {
164            Err(ModelError::Transaction(
165                "Transaction has been consumed".to_string(),
166            ))
167        }
168    }
169
170    /// Fetch all rows from a query within this transaction
171    pub async fn fetch_all(
172        &mut self,
173        sql: &str,
174        params: &[crate::backends::DatabaseValue],
175    ) -> Result<Vec<Box<dyn crate::backends::DatabaseRow>>, ModelError> {
176        if let Some(tx) = &mut self.inner {
177            tx.fetch_all(sql, params)
178                .await
179                .map_err(|e| ModelError::Transaction(format!("Transaction query failed: {}", e)))
180        } else {
181            Err(ModelError::Transaction(
182                "Transaction has been consumed".to_string(),
183            ))
184        }
185    }
186
187    /// Fetch optional single row from a query within this transaction
188    pub async fn fetch_optional(
189        &mut self,
190        sql: &str,
191        params: &[crate::backends::DatabaseValue],
192    ) -> Result<Option<Box<dyn crate::backends::DatabaseRow>>, ModelError> {
193        if let Some(tx) = &mut self.inner {
194            tx.fetch_optional(sql, params)
195                .await
196                .map_err(|e| ModelError::Transaction(format!("Transaction query failed: {}", e)))
197        } else {
198            Err(ModelError::Transaction(
199                "Transaction has been consumed".to_string(),
200            ))
201        }
202    }
203
204    /// Execute a closure within the transaction scope with a borrowed transaction
205    ///
206    /// This is a safe way to execute database operations within a transaction.
207    /// The closure receives access to execute queries against the transaction.
208    pub async fn execute_with<F, Fut, R>(&mut self, f: F) -> Result<R, ModelError>
209    where
210        F: FnOnce(&mut Self) -> Fut,
211        Fut: std::future::Future<Output = Result<R, ModelError>>,
212    {
213        if self.inner.is_some() {
214            f(self).await
215        } else {
216            Err(ModelError::Transaction(
217                "Transaction has been consumed".to_string(),
218            ))
219        }
220    }
221
222    /// Commit the transaction
223    pub async fn commit(mut self) -> ModelResult<()> {
224        if let Some(tx) = self.inner.take() {
225            debug!("Committing transaction");
226            tx.commit().await.map_err(|e| {
227                ModelError::Transaction(format!("Failed to commit transaction: {}", e))
228            })?;
229            debug!("Transaction committed successfully");
230            Ok(())
231        } else {
232            Err(ModelError::Transaction(
233                "Transaction has already been consumed".to_string(),
234            ))
235        }
236    }
237
238    /// Rollback the transaction
239    pub async fn rollback(mut self) -> ModelResult<()> {
240        if let Some(tx) = self.inner.take() {
241            debug!("Rolling back transaction");
242            tx.rollback().await.map_err(|e| {
243                ModelError::Transaction(format!("Failed to rollback transaction: {}", e))
244            })?;
245            debug!("Transaction rolled back successfully");
246            Ok(())
247        } else {
248            Err(ModelError::Transaction(
249                "Transaction has already been consumed".to_string(),
250            ))
251        }
252    }
253
254    /// Check if the transaction has been committed
255    pub fn is_committed(&self) -> bool {
256        self.committed
257    }
258
259    /// Get the transaction configuration
260    pub fn config(&self) -> &TransactionConfig {
261        &self.config
262    }
263}
264
265impl Drop for Transaction {
266    /// Automatic cleanup: rollback the transaction if it hasn't been committed or rolled back
267    fn drop(&mut self) {
268        if let Some(tx) = self.inner.take() {
269            if !self.committed {
270                warn!("Transaction dropped without explicit commit or rollback - this will cause an automatic rollback");
271                // Note: We can't await in Drop, so we log a warning
272                // The sqlx::Transaction will handle the rollback in its own Drop impl
273                std::mem::drop(tx);
274            }
275        }
276    }
277}
278
279/// Execute a closure within a transaction scope with automatic commit/rollback
280///
281/// This is a convenience function that handles transaction lifecycle automatically:
282/// - If the closure succeeds, the transaction is committed
283/// - If the closure fails, the transaction is rolled back
284/// - Supports automatic retry for serializable transactions
285pub async fn with_transaction<F, Fut, R>(
286    pool: &ManagedPool,
287    config: TransactionConfig,
288    f: F,
289) -> Result<R, ModelError>
290where
291    F: Fn(&mut Transaction) -> Fut,
292    Fut: std::future::Future<Output = Result<R, ModelError>>,
293{
294    let mut attempts = 0;
295    let max_attempts = if config.auto_retry {
296        config.max_retries + 1
297    } else {
298        1
299    };
300
301    loop {
302        attempts += 1;
303        debug!(
304            "Starting transaction attempt {} of {}",
305            attempts, max_attempts
306        );
307
308        let mut tx = Transaction::begin(pool, config.clone()).await?;
309
310        match f(&mut tx).await {
311            Ok(result) => {
312                tx.commit().await?;
313                return Ok(result);
314            }
315            Err(e) => {
316                // Check if this is a serialization failure that can be retried
317                let should_retry =
318                    config.auto_retry && attempts < max_attempts && is_serialization_failure(&e);
319
320                if should_retry {
321                    warn!(
322                        "Serialization failure on attempt {}, retrying: {}",
323                        attempts, e
324                    );
325                    tx.rollback().await.ok(); // Best effort rollback
326                    continue;
327                } else {
328                    tx.rollback().await.ok(); // Best effort rollback
329                    return Err(e);
330                }
331            }
332        }
333    }
334}
335
336/// Execute a closure within a default transaction scope
337pub async fn with_transaction_default<F, Fut, R>(pool: &ManagedPool, f: F) -> Result<R, ModelError>
338where
339    F: Fn(&mut Transaction) -> Fut,
340    Fut: std::future::Future<Output = Result<R, ModelError>>,
341{
342    with_transaction(pool, TransactionConfig::default(), f).await
343}
344
345/// Check if an error represents a serialization failure that can be retried
346pub fn is_serialization_failure(error: &ModelError) -> bool {
347    match error {
348        ModelError::Database(msg) | ModelError::Transaction(msg) => {
349            // PostgreSQL serialization failure error codes
350            msg.contains("40001") || // serialization_failure
351            msg.contains("40P01") || // deadlock_detected
352            msg.contains("could not serialize access")
353        }
354        _ => false,
355    }
356}
357
358#[cfg(test)]
359mod tests {
360    use super::*;
361
362    #[test]
363    fn test_isolation_level_sql() {
364        assert_eq!(IsolationLevel::ReadUncommitted.as_sql(), "READ UNCOMMITTED");
365        assert_eq!(IsolationLevel::ReadCommitted.as_sql(), "READ COMMITTED");
366        assert_eq!(IsolationLevel::RepeatableRead.as_sql(), "REPEATABLE READ");
367        assert_eq!(IsolationLevel::Serializable.as_sql(), "SERIALIZABLE");
368    }
369
370    #[test]
371    fn test_transaction_config_default() {
372        let config = TransactionConfig::default();
373        assert!(config.isolation_level.is_none());
374        assert!(!config.read_only);
375        assert!(!config.auto_retry);
376        assert_eq!(config.max_retries, 3);
377    }
378
379    #[test]
380    fn test_serialization_failure_detection() {
381        let err1 = ModelError::Database(
382            "ERROR: could not serialize access due to concurrent update".to_string(),
383        );
384        assert!(is_serialization_failure(&err1));
385
386        let err2 = ModelError::Transaction("ERROR: 40001".to_string());
387        assert!(is_serialization_failure(&err2));
388
389        let err3 = ModelError::Validation("Invalid input".to_string());
390        assert!(!is_serialization_failure(&err3));
391    }
392}