sqlx_transaction_manager/executor.rs
1use super::context::TransactionContext;
2use sqlx::MySqlPool;
3use std::future::Future;
4use std::pin::Pin;
5
6/// Executes a function within a database transaction.
7///
8/// This function handles the transaction lifecycle automatically:
9/// - Begins a transaction
10/// - Executes the provided function
11/// - Commits on success
12/// - Rolls back on error
13///
14/// # Type Parameters
15///
16/// * `F` - A function that takes a mutable `TransactionContext` and returns a pinned future
17/// * `T` - The return type of the function (must be `Send`)
18///
19/// # Arguments
20///
21/// * `pool` - The MySQL connection pool
22/// * `f` - The function to execute within the transaction
23///
24/// # Returns
25///
26/// Returns the result of the function execution, or an error if the transaction fails.
27///
28/// # Examples
29///
30/// ## Basic Usage
31///
32/// ```rust,no_run
33/// use sqlx::MySqlPool;
34/// use sqlx_transaction_manager::with_transaction;
35///
36/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
37/// # let pool = MySqlPool::connect("mysql://localhost/test").await?;
38/// with_transaction(&pool, |tx| {
39/// Box::pin(async move {
40/// sqlx::query("INSERT INTO users (name) VALUES (?)")
41/// .bind("Alice")
42/// .execute(tx.as_executor())
43/// .await?;
44/// Ok::<_, sqlx::Error>(())
45/// })
46/// }).await?;
47/// # Ok(())
48/// # }
49/// ```
50///
51/// ## Multiple Operations
52///
53/// ```rust,no_run
54/// use sqlx::MySqlPool;
55/// use sqlx_transaction_manager::with_transaction;
56///
57/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
58/// # let pool = MySqlPool::connect("mysql://localhost/test").await?;
59/// let user_id = with_transaction(&pool, |tx| {
60/// Box::pin(async move {
61/// let result = sqlx::query("INSERT INTO users (name) VALUES (?)")
62/// .bind("Bob")
63/// .execute(tx.as_executor())
64/// .await?;
65///
66/// let user_id = result.last_insert_id() as i64;
67///
68/// sqlx::query("INSERT INTO profiles (user_id, bio) VALUES (?, ?)")
69/// .bind(user_id)
70/// .bind("Software Developer")
71/// .execute(tx.as_executor())
72/// .await?;
73///
74/// Ok::<_, sqlx::Error>(user_id)
75/// })
76/// }).await?;
77/// # Ok(())
78/// # }
79/// ```
80///
81/// ## Error Handling
82///
83/// ```rust,no_run
84/// use sqlx::MySqlPool;
85/// use sqlx_transaction_manager::with_transaction;
86///
87/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
88/// # let pool = MySqlPool::connect("mysql://localhost/test").await?;
89/// let result = with_transaction(&pool, |tx| {
90/// Box::pin(async move {
91/// sqlx::query("INSERT INTO users (name) VALUES (?)")
92/// .bind("Charlie")
93/// .execute(tx.as_executor())
94/// .await?;
95///
96/// // If this fails, the entire transaction is rolled back
97/// sqlx::query("INVALID SQL")
98/// .execute(tx.as_executor())
99/// .await?;
100///
101/// Ok::<_, sqlx::Error>(())
102/// })
103/// }).await;
104///
105/// assert!(result.is_err()); // Transaction was rolled back
106/// # Ok(())
107/// # }
108/// ```
109pub async fn with_transaction<F, T>(pool: &MySqlPool, f: F) -> crate::Result<T>
110where
111 F: for<'a> FnOnce(
112 &'a mut TransactionContext<'_>,
113 ) -> Pin<Box<dyn Future<Output = crate::Result<T>> + Send + 'a>>,
114 T: Send,
115{
116 let mut tx_ctx = TransactionContext::begin(pool).await?;
117
118 match f(&mut tx_ctx).await {
119 Ok(result) => {
120 tx_ctx.commit().await?;
121 Ok(result)
122 }
123 Err(e) => {
124 // Explicitly rollback on error
125 // (Transaction would auto-rollback on drop anyway, but this makes it clearer)
126 let _ = tx_ctx.rollback().await;
127 Err(e)
128 }
129 }
130}
131
132/// Executes a nested transaction using savepoints.
133///
134/// This function allows you to create a transaction within an existing transaction
135/// by using MySQL savepoints. If the nested transaction fails, only operations
136/// since the savepoint are rolled back.
137///
138/// # Type Parameters
139///
140/// * `F` - A function that takes a mutable `TransactionContext` and returns a future
141/// * `Fut` - The future type returned by the function
142/// * `T` - The return type (must be `Send`)
143///
144/// # Arguments
145///
146/// * `tx_ctx` - The existing transaction context
147/// * `f` - The function to execute within the nested transaction
148///
149/// # Returns
150///
151/// Returns the result of the function execution, or an error if the savepoint operation fails.
152///
153/// # Examples
154///
155/// ```rust,no_run
156/// use sqlx::MySqlPool;
157/// use sqlx_transaction_manager::{with_transaction, with_nested_transaction};
158///
159/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
160/// # let pool = MySqlPool::connect("mysql://localhost/test").await?;
161/// with_transaction(&pool, |tx| {
162/// Box::pin(async move {
163/// // Outer transaction operations
164/// sqlx::query("INSERT INTO users (name) VALUES (?)")
165/// .bind("Alice")
166/// .execute(tx.as_executor())
167/// .await?;
168///
169/// // Nested transaction with savepoint
170/// let nested_result = with_nested_transaction(tx, |nested_tx| {
171/// Box::pin(async move {
172/// sqlx::query("INSERT INTO logs (message) VALUES (?)")
173/// .bind("User created")
174/// .execute(nested_tx.as_executor())
175/// .await?;
176/// Ok::<_, sqlx::Error>(())
177/// })
178/// }).await;
179///
180/// // If nested transaction fails, outer transaction can still succeed
181/// if nested_result.is_err() {
182/// println!("Logging failed, but user creation will still commit");
183/// }
184///
185/// Ok::<_, sqlx::Error>(())
186/// })
187/// }).await?;
188/// # Ok(())
189/// # }
190/// ```
191///
192/// # Note
193///
194/// MySQL doesn't support true nested transactions. This function uses SAVEPOINTs
195/// to simulate nested transaction behavior. The savepoint name is `nested_tx`.
196pub async fn with_nested_transaction<F, T>(
197 tx_ctx: &mut TransactionContext<'_>,
198 f: F,
199) -> crate::Result<T>
200where
201 F: for<'a> FnOnce(&'a mut TransactionContext<'_>) -> Pin<Box<dyn Future<Output = crate::Result<T>> + Send + 'a>>,
202 T: Send,
203{
204 // Create a savepoint
205 sqlx::query("SAVEPOINT nested_tx")
206 .execute(tx_ctx.as_executor())
207 .await?;
208
209 match f(tx_ctx).await {
210 Ok(result) => {
211 // Release savepoint (equivalent to commit)
212 sqlx::query("RELEASE SAVEPOINT nested_tx")
213 .execute(tx_ctx.as_executor())
214 .await?;
215 Ok(result)
216 }
217 Err(e) => {
218 // Rollback to savepoint
219 let _ = sqlx::query("ROLLBACK TO SAVEPOINT nested_tx")
220 .execute(tx_ctx.as_executor())
221 .await;
222 Err(e)
223 }
224 }
225}
226
227#[cfg(test)]
228mod tests {
229 use super::*;
230
231 #[test]
232 fn test_executor_functions_exist() {
233 // This test just ensures the functions are properly defined
234 // Actual database tests require a connection pool
235 }
236}