sqlx_transaction_manager/
executor.rs

1use super::context::TransactionContext;
2use sqlx::MySqlPool;
3use std::future::Future;
4use std::pin::Pin;
5
6/// Executes a function within a database transaction.
7///
8/// This function handles the transaction lifecycle automatically:
9/// - Begins a transaction
10/// - Executes the provided function
11/// - Commits on success
12/// - Rolls back on error
13///
14/// # Type Parameters
15///
16/// * `F` - A function that takes a mutable `TransactionContext` and returns a pinned future
17/// * `T` - The return type of the function (must be `Send`)
18///
19/// # Arguments
20///
21/// * `pool` - The MySQL connection pool
22/// * `f` - The function to execute within the transaction
23///
24/// # Returns
25///
26/// Returns the result of the function execution, or an error if the transaction fails.
27///
28/// # Examples
29///
30/// ## Basic Usage
31///
32/// ```rust,no_run
33/// use sqlx::MySqlPool;
34/// use sqlx_transaction_manager::with_transaction;
35///
36/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
37/// # let pool = MySqlPool::connect("mysql://localhost/test").await?;
38/// with_transaction(&pool, |tx| {
39///     Box::pin(async move {
40///         sqlx::query("INSERT INTO users (name) VALUES (?)")
41///             .bind("Alice")
42///             .execute(tx.as_executor())
43///             .await?;
44///         Ok::<_, sqlx::Error>(())
45///     })
46/// }).await?;
47/// # Ok(())
48/// # }
49/// ```
50///
51/// ## Multiple Operations
52///
53/// ```rust,no_run
54/// use sqlx::MySqlPool;
55/// use sqlx_transaction_manager::with_transaction;
56///
57/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
58/// # let pool = MySqlPool::connect("mysql://localhost/test").await?;
59/// let user_id = with_transaction(&pool, |tx| {
60///     Box::pin(async move {
61///         let result = sqlx::query("INSERT INTO users (name) VALUES (?)")
62///             .bind("Bob")
63///             .execute(tx.as_executor())
64///             .await?;
65///
66///         let user_id = result.last_insert_id() as i64;
67///
68///         sqlx::query("INSERT INTO profiles (user_id, bio) VALUES (?, ?)")
69///             .bind(user_id)
70///             .bind("Software Developer")
71///             .execute(tx.as_executor())
72///             .await?;
73///
74///         Ok::<_, sqlx::Error>(user_id)
75///     })
76/// }).await?;
77/// # Ok(())
78/// # }
79/// ```
80///
81/// ## Error Handling
82///
83/// ```rust,no_run
84/// use sqlx::MySqlPool;
85/// use sqlx_transaction_manager::with_transaction;
86///
87/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
88/// # let pool = MySqlPool::connect("mysql://localhost/test").await?;
89/// let result = with_transaction(&pool, |tx| {
90///     Box::pin(async move {
91///         sqlx::query("INSERT INTO users (name) VALUES (?)")
92///             .bind("Charlie")
93///             .execute(tx.as_executor())
94///             .await?;
95///
96///         // If this fails, the entire transaction is rolled back
97///         sqlx::query("INVALID SQL")
98///             .execute(tx.as_executor())
99///             .await?;
100///
101///         Ok::<_, sqlx::Error>(())
102///     })
103/// }).await;
104///
105/// assert!(result.is_err()); // Transaction was rolled back
106/// # Ok(())
107/// # }
108/// ```
109pub async fn with_transaction<F, T>(pool: &MySqlPool, f: F) -> crate::Result<T>
110where
111    F: for<'a> FnOnce(
112        &'a mut TransactionContext<'_>,
113    ) -> Pin<Box<dyn Future<Output = crate::Result<T>> + Send + 'a>>,
114    T: Send,
115{
116    let mut tx_ctx = TransactionContext::begin(pool).await?;
117
118    match f(&mut tx_ctx).await {
119        Ok(result) => {
120            tx_ctx.commit().await?;
121            Ok(result)
122        }
123        Err(e) => {
124            // Explicitly rollback on error
125            // (Transaction would auto-rollback on drop anyway, but this makes it clearer)
126            let _ = tx_ctx.rollback().await;
127            Err(e)
128        }
129    }
130}
131
132/// Executes a nested transaction using savepoints.
133///
134/// This function allows you to create a transaction within an existing transaction
135/// by using MySQL savepoints. If the nested transaction fails, only operations
136/// since the savepoint are rolled back.
137///
138/// # Type Parameters
139///
140/// * `F` - A function that takes a mutable `TransactionContext` and returns a future
141/// * `Fut` - The future type returned by the function
142/// * `T` - The return type (must be `Send`)
143///
144/// # Arguments
145///
146/// * `tx_ctx` - The existing transaction context
147/// * `f` - The function to execute within the nested transaction
148///
149/// # Returns
150///
151/// Returns the result of the function execution, or an error if the savepoint operation fails.
152///
153/// # Examples
154///
155/// ```rust,no_run
156/// use sqlx::MySqlPool;
157/// use sqlx_transaction_manager::{with_transaction, with_nested_transaction};
158///
159/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
160/// # let pool = MySqlPool::connect("mysql://localhost/test").await?;
161/// with_transaction(&pool, |tx| {
162///     Box::pin(async move {
163///         // Outer transaction operations
164///         sqlx::query("INSERT INTO users (name) VALUES (?)")
165///             .bind("Alice")
166///             .execute(tx.as_executor())
167///             .await?;
168///
169///         // Nested transaction with savepoint
170///         let nested_result = with_nested_transaction(tx, |nested_tx| {
171///             Box::pin(async move {
172///                 sqlx::query("INSERT INTO logs (message) VALUES (?)")
173///                     .bind("User created")
174///                     .execute(nested_tx.as_executor())
175///                     .await?;
176///                 Ok::<_, sqlx::Error>(())
177///             })
178///         }).await;
179///
180///         // If nested transaction fails, outer transaction can still succeed
181///         if nested_result.is_err() {
182///             println!("Logging failed, but user creation will still commit");
183///         }
184///
185///         Ok::<_, sqlx::Error>(())
186///     })
187/// }).await?;
188/// # Ok(())
189/// # }
190/// ```
191///
192/// # Note
193///
194/// MySQL doesn't support true nested transactions. This function uses SAVEPOINTs
195/// to simulate nested transaction behavior. The savepoint name is `nested_tx`.
196pub async fn with_nested_transaction<F, T>(
197    tx_ctx: &mut TransactionContext<'_>,
198    f: F,
199) -> crate::Result<T>
200where
201    F: for<'a> FnOnce(&'a mut TransactionContext<'_>) -> Pin<Box<dyn Future<Output = crate::Result<T>> + Send + 'a>>,
202    T: Send,
203{
204    // Create a savepoint
205    sqlx::query("SAVEPOINT nested_tx")
206        .execute(tx_ctx.as_executor())
207        .await?;
208
209    match f(tx_ctx).await {
210        Ok(result) => {
211            // Release savepoint (equivalent to commit)
212            sqlx::query("RELEASE SAVEPOINT nested_tx")
213                .execute(tx_ctx.as_executor())
214                .await?;
215            Ok(result)
216        }
217        Err(e) => {
218            // Rollback to savepoint
219            let _ = sqlx::query("ROLLBACK TO SAVEPOINT nested_tx")
220                .execute(tx_ctx.as_executor())
221                .await;
222            Err(e)
223        }
224    }
225}
226
227#[cfg(test)]
228mod tests {
229    use super::*;
230
231    #[test]
232    fn test_executor_functions_exist() {
233        // This test just ensures the functions are properly defined
234        // Actual database tests require a connection pool
235    }
236}