Skip to main content

sqlx_core/
transaction.rs

1use std::fmt::{self, Debug, Formatter};
2use std::future::{self, Future};
3use std::ops::{Deref, DerefMut};
4
5use futures_core::future::BoxFuture;
6
7use crate::database::Database;
8use crate::error::Error;
9use crate::pool::MaybePoolConnection;
10use crate::sql_str::{AssertSqlSafe, SqlSafeStr, SqlStr};
11
12/// Generic management of database transactions.
13///
14/// This trait should not be used, except when implementing [`Connection`].
15pub trait TransactionManager {
16    type Database: Database;
17
18    /// Begin a new transaction or establish a savepoint within the active transaction.
19    ///
20    /// If this is a new transaction, `statement` may be used instead of the
21    /// default "BEGIN" statement.
22    ///
23    /// If we are already inside a transaction and `statement.is_some()`, then
24    /// `Error::InvalidSavePoint` is returned without running any statements.
25    fn begin(
26        conn: &mut <Self::Database as Database>::Connection,
27        statement: Option<SqlStr>,
28    ) -> impl Future<Output = Result<(), Error>> + Send + '_;
29
30    /// Commit the active transaction or release the most recent savepoint.
31    fn commit(
32        conn: &mut <Self::Database as Database>::Connection,
33    ) -> impl Future<Output = Result<(), Error>> + Send + '_;
34
35    /// Abort the active transaction or restore from the most recent savepoint.
36    fn rollback(
37        conn: &mut <Self::Database as Database>::Connection,
38    ) -> impl Future<Output = Result<(), Error>> + Send + '_;
39
40    /// Starts to abort the active transaction or restore from the most recent snapshot.
41    fn start_rollback(conn: &mut <Self::Database as Database>::Connection);
42
43    /// Returns the current transaction depth.
44    ///
45    /// Transaction depth indicates the level of nested transactions:
46    /// - Level 0: No active transaction.
47    /// - Level 1: A transaction is active.
48    /// - Level 2 or higher: A transaction is active and one or more SAVEPOINTs have been created within it.
49    fn get_transaction_depth(conn: &<Self::Database as Database>::Connection) -> usize;
50}
51
52/// An in-progress database transaction or savepoint.
53///
54/// A transaction starts with a call to [`Pool::begin`] or [`Connection::begin`].
55///
56/// A transaction should end with a call to [`commit`] or [`rollback`]. If neither are called
57/// before the transaction goes out-of-scope, [`rollback`] is called. In other
58/// words, [`rollback`] is called on `drop` if the transaction is still in-progress.
59///
60/// A savepoint is a special mark inside a transaction that allows all commands that are
61/// executed after it was established to be rolled back, restoring the transaction state to
62/// what it was at the time of the savepoint.
63///
64/// A transaction can be used as an [`Executor`] when performing queries:
65/// ```rust,no_run
66/// # use sqlx_core::acquire::Acquire;
67/// # async fn example() -> sqlx::Result<()> {
68/// # let id = 1;
69/// # let mut conn: sqlx::PgConnection = unimplemented!();
70/// let mut tx = conn.begin().await?;
71///
72/// let result = sqlx::query("DELETE FROM \"testcases\" WHERE id = $1")
73///     .bind(id)
74///     .execute(&mut *tx)
75///     .await?
76///     .rows_affected();
77///
78/// tx.commit().await
79/// # }
80/// ```
81/// [`Executor`]: crate::executor::Executor
82/// [`Connection::begin`]: crate::connection::Connection::begin()
83/// [`Pool::begin`]: crate::pool::Pool::begin()
84/// [`commit`]: Self::commit()
85/// [`rollback`]: Self::rollback()
86pub struct Transaction<'c, DB>
87where
88    DB: Database,
89{
90    connection: MaybePoolConnection<'c, DB>,
91    open: bool,
92}
93
94impl<'c, DB> Transaction<'c, DB>
95where
96    DB: Database,
97{
98    #[doc(hidden)]
99    pub fn begin(
100        conn: impl Into<MaybePoolConnection<'c, DB>>,
101        statement: Option<SqlStr>,
102    ) -> BoxFuture<'c, Result<Self, Error>> {
103        let conn = conn.into();
104
105        Box::pin(async move {
106            let mut tx = Self {
107                connection: conn,
108
109                // If the call to `begin` fails or doesn't complete we want to attempt a rollback in case the transaction was started.
110                open: true,
111            };
112
113            DB::TransactionManager::begin(&mut tx.connection, statement).await?;
114
115            Ok(tx)
116        })
117    }
118
119    /// Commits this transaction or savepoint.
120    pub async fn commit(mut self) -> Result<(), Error> {
121        DB::TransactionManager::commit(&mut self.connection).await?;
122        self.open = false;
123
124        Ok(())
125    }
126
127    /// Aborts this transaction or savepoint.
128    pub async fn rollback(mut self) -> Result<(), Error> {
129        DB::TransactionManager::rollback(&mut self.connection).await?;
130        self.open = false;
131
132        Ok(())
133    }
134}
135
136// NOTE: fails to compile due to lack of lazy normalization
137// impl<'c, 't, DB: Database> crate::executor::Executor<'t>
138//     for &'t mut crate::transaction::Transaction<'c, DB>
139// where
140//     &'c mut DB::Connection: Executor<'c, Database = DB>,
141// {
142//     type Database = DB;
143//
144//
145//
146//     fn fetch_many<'e, 'q: 'e, E: 'q>(
147//         self,
148//         query: E,
149//     ) -> futures_core::stream::BoxStream<
150//         'e,
151//         Result<
152//             crate::Either<<DB as crate::database::Database>::QueryResult, DB::Row>,
153//             crate::error::Error,
154//         >,
155//     >
156//     where
157//         't: 'e,
158//         E: crate::executor::Execute<'q, Self::Database>,
159//     {
160//         (&mut **self).fetch_many(query)
161//     }
162//
163//     fn fetch_optional<'e, 'q: 'e, E: 'q>(
164//         self,
165//         query: E,
166//     ) -> futures_core::future::BoxFuture<'e, Result<Option<DB::Row>, crate::error::Error>>
167//     where
168//         't: 'e,
169//         E: crate::executor::Execute<'q, Self::Database>,
170//     {
171//         (&mut **self).fetch_optional(query)
172//     }
173//
174//     fn prepare_with<'e, 'q: 'e>(
175//         self,
176//         sql: &'q str,
177//         parameters: &'e [<Self::Database as crate::database::Database>::TypeInfo],
178//     ) -> futures_core::future::BoxFuture<
179//         'e,
180//         Result<
181//             <Self::Database as crate::database::Database>::Statement<'q>,
182//             crate::error::Error,
183//         >,
184//     >
185//     where
186//         't: 'e,
187//     {
188//         (&mut **self).prepare_with(sql, parameters)
189//     }
190//
191//     #[doc(hidden)]
192//     #[cfg(feature = "offline")]
193//     fn describe<'e, 'q: 'e>(
194//         self,
195//         query: &'q str,
196//     ) -> futures_core::future::BoxFuture<
197//         'e,
198//         Result<crate::describe::Describe<Self::Database>, crate::error::Error>,
199//     >
200//     where
201//         't: 'e,
202//     {
203//         (&mut **self).describe(query)
204//     }
205// }
206
207impl<DB> Debug for Transaction<'_, DB>
208where
209    DB: Database,
210{
211    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
212        // TODO: Show the full type <..<..<..
213        f.debug_struct("Transaction").finish()
214    }
215}
216
217impl<DB> Deref for Transaction<'_, DB>
218where
219    DB: Database,
220{
221    type Target = DB::Connection;
222
223    #[inline]
224    fn deref(&self) -> &Self::Target {
225        &self.connection
226    }
227}
228
229impl<DB> DerefMut for Transaction<'_, DB>
230where
231    DB: Database,
232{
233    #[inline]
234    fn deref_mut(&mut self) -> &mut Self::Target {
235        &mut self.connection
236    }
237}
238
239// Implement `AsMut<DB::Connection>` so `Transaction` can be given to a
240// `PgAdvisoryLockGuard`.
241//
242// See: https://github.com/launchbadge/sqlx/issues/2520
243impl<DB: Database> AsMut<DB::Connection> for Transaction<'_, DB> {
244    fn as_mut(&mut self) -> &mut DB::Connection {
245        &mut self.connection
246    }
247}
248
249impl<'t, DB: Database> crate::acquire::Acquire<'t> for &'t mut Transaction<'_, DB> {
250    type Database = DB;
251
252    type Connection = &'t mut <DB as Database>::Connection;
253
254    #[inline]
255    fn acquire(self) -> BoxFuture<'t, Result<Self::Connection, Error>> {
256        Box::pin(future::ready(Ok(&mut **self)))
257    }
258
259    #[inline]
260    fn begin(self) -> BoxFuture<'t, Result<Transaction<'t, DB>, Error>> {
261        Transaction::begin(&mut **self, None)
262    }
263}
264
265impl<DB> Drop for Transaction<'_, DB>
266where
267    DB: Database,
268{
269    fn drop(&mut self) {
270        if self.open {
271            // starts a rollback operation
272
273            // what this does depends on the database but generally this means we queue a rollback
274            // operation that will happen on the next asynchronous invocation of the underlying
275            // connection (including if the connection is returned to a pool)
276
277            DB::TransactionManager::start_rollback(&mut self.connection);
278        }
279    }
280}
281
282pub fn begin_ansi_transaction_sql(depth: usize) -> SqlStr {
283    if depth == 0 {
284        "BEGIN".into_sql_str()
285    } else {
286        AssertSqlSafe(format!("SAVEPOINT _sqlx_savepoint_{depth}")).into_sql_str()
287    }
288}
289
290pub fn commit_ansi_transaction_sql(depth: usize) -> SqlStr {
291    if depth == 1 {
292        "COMMIT".into_sql_str()
293    } else {
294        AssertSqlSafe(format!("RELEASE SAVEPOINT _sqlx_savepoint_{}", depth - 1)).into_sql_str()
295    }
296}
297
298pub fn rollback_ansi_transaction_sql(depth: usize) -> SqlStr {
299    if depth == 1 {
300        "ROLLBACK".into_sql_str()
301    } else {
302        AssertSqlSafe(format!(
303            "ROLLBACK TO SAVEPOINT _sqlx_savepoint_{}",
304            depth - 1
305        ))
306        .into_sql_str()
307    }
308}