sqlx_build_trust_core/
transaction.rs

1use std::borrow::Cow;
2use std::fmt::{self, Debug, Formatter};
3use std::ops::{Deref, DerefMut};
4
5use futures_core::future::BoxFuture;
6
7use crate::database::Database;
8use crate::error::Error;
9use crate::pool::MaybePoolConnection;
10
11/// Generic management of database transactions.
12///
13/// This trait should not be used, except when implementing [`Connection`].
14#[doc(hidden)]
15pub trait TransactionManager {
16    type Database: Database;
17
18    /// Begin a new transaction or establish a savepoint within the active transaction.
19    fn begin(
20        conn: &mut <Self::Database as Database>::Connection,
21    ) -> BoxFuture<'_, Result<(), Error>>;
22
23    /// Commit the active transaction or release the most recent savepoint.
24    fn commit(
25        conn: &mut <Self::Database as Database>::Connection,
26    ) -> BoxFuture<'_, Result<(), Error>>;
27
28    /// Abort the active transaction or restore from the most recent savepoint.
29    fn rollback(
30        conn: &mut <Self::Database as Database>::Connection,
31    ) -> BoxFuture<'_, Result<(), Error>>;
32
33    /// Starts to abort the active transaction or restore from the most recent snapshot.
34    fn start_rollback(conn: &mut <Self::Database as Database>::Connection);
35}
36
37/// An in-progress database transaction or savepoint.
38///
39/// A transaction starts with a call to [`Pool::begin`] or [`Connection::begin`].
40///
41/// A transaction should end with a call to [`commit`] or [`rollback`]. If neither are called
42/// before the transaction goes out-of-scope, [`rollback`] is called. In other
43/// words, [`rollback`] is called on `drop` if the transaction is still in-progress.
44///
45/// A savepoint is a special mark inside a transaction that allows all commands that are
46/// executed after it was established to be rolled back, restoring the transaction state to
47/// what it was at the time of the savepoint.
48///
49/// [`Connection::begin`]: crate::connection::Connection::begin()
50/// [`Pool::begin`]: crate::pool::Pool::begin()
51/// [`commit`]: Self::commit()
52/// [`rollback`]: Self::rollback()
53pub struct Transaction<'c, DB>
54where
55    DB: Database,
56{
57    connection: MaybePoolConnection<'c, DB>,
58    open: bool,
59}
60
61impl<'c, DB> Transaction<'c, DB>
62where
63    DB: Database,
64{
65    #[doc(hidden)]
66    pub fn begin(
67        conn: impl Into<MaybePoolConnection<'c, DB>>,
68    ) -> BoxFuture<'c, Result<Self, Error>> {
69        let mut conn = conn.into();
70
71        Box::pin(async move {
72            DB::TransactionManager::begin(&mut conn).await?;
73
74            Ok(Self {
75                connection: conn,
76                open: true,
77            })
78        })
79    }
80
81    /// Commits this transaction or savepoint.
82    pub async fn commit(mut self) -> Result<(), Error> {
83        DB::TransactionManager::commit(&mut self.connection).await?;
84        self.open = false;
85
86        Ok(())
87    }
88
89    /// Aborts this transaction or savepoint.
90    pub async fn rollback(mut self) -> Result<(), Error> {
91        DB::TransactionManager::rollback(&mut self.connection).await?;
92        self.open = false;
93
94        Ok(())
95    }
96}
97
98// NOTE: fails to compile due to lack of lazy normalization
99// impl<'c, 't, DB: Database> crate::executor::Executor<'t>
100//     for &'t mut crate::transaction::Transaction<'c, DB>
101// where
102//     &'c mut DB::Connection: Executor<'c, Database = DB>,
103// {
104//     type Database = DB;
105//
106//
107//
108//     fn fetch_many<'e, 'q: 'e, E: 'q>(
109//         self,
110//         query: E,
111//     ) -> futures_core::stream::BoxStream<
112//         'e,
113//         Result<
114//             crate::Either<<DB as crate::database::Database>::QueryResult, DB::Row>,
115//             crate::error::Error,
116//         >,
117//     >
118//     where
119//         't: 'e,
120//         E: crate::executor::Execute<'q, Self::Database>,
121//     {
122//         (&mut **self).fetch_many(query)
123//     }
124//
125//     fn fetch_optional<'e, 'q: 'e, E: 'q>(
126//         self,
127//         query: E,
128//     ) -> futures_core::future::BoxFuture<'e, Result<Option<DB::Row>, crate::error::Error>>
129//     where
130//         't: 'e,
131//         E: crate::executor::Execute<'q, Self::Database>,
132//     {
133//         (&mut **self).fetch_optional(query)
134//     }
135//
136//     fn prepare_with<'e, 'q: 'e>(
137//         self,
138//         sql: &'q str,
139//         parameters: &'e [<Self::Database as crate::database::Database>::TypeInfo],
140//     ) -> futures_core::future::BoxFuture<
141//         'e,
142//         Result<
143//             <Self::Database as crate::database::HasStatement<'q>>::Statement,
144//             crate::error::Error,
145//         >,
146//     >
147//     where
148//         't: 'e,
149//     {
150//         (&mut **self).prepare_with(sql, parameters)
151//     }
152//
153//     #[doc(hidden)]
154//     fn describe<'e, 'q: 'e>(
155//         self,
156//         query: &'q str,
157//     ) -> futures_core::future::BoxFuture<
158//         'e,
159//         Result<crate::describe::Describe<Self::Database>, crate::error::Error>,
160//     >
161//     where
162//         't: 'e,
163//     {
164//         (&mut **self).describe(query)
165//     }
166// }
167
168impl<'c, DB> Debug for Transaction<'c, DB>
169where
170    DB: Database,
171{
172    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
173        // TODO: Show the full type <..<..<..
174        f.debug_struct("Transaction").finish()
175    }
176}
177
178impl<'c, DB> Deref for Transaction<'c, DB>
179where
180    DB: Database,
181{
182    type Target = DB::Connection;
183
184    #[inline]
185    fn deref(&self) -> &Self::Target {
186        &self.connection
187    }
188}
189
190impl<'c, DB> DerefMut for Transaction<'c, DB>
191where
192    DB: Database,
193{
194    #[inline]
195    fn deref_mut(&mut self) -> &mut Self::Target {
196        &mut self.connection
197    }
198}
199
200// Implement `AsMut<DB::Connection>` so `Transaction` can be given to a
201// `PgAdvisoryLockGuard`.
202//
203// See: https://github.com/launchbadge/sqlx/issues/2520
204impl<'c, DB: Database> AsMut<DB::Connection> for Transaction<'c, DB> {
205    fn as_mut(&mut self) -> &mut DB::Connection {
206        &mut self.connection
207    }
208}
209
210impl<'c, 't, DB: Database> crate::acquire::Acquire<'t> for &'t mut Transaction<'c, DB> {
211    type Database = DB;
212
213    type Connection = &'t mut <DB as Database>::Connection;
214
215    #[inline]
216    fn acquire(self) -> BoxFuture<'t, Result<Self::Connection, Error>> {
217        Box::pin(futures_util::future::ok(&mut **self))
218    }
219
220    #[inline]
221    fn begin(self) -> BoxFuture<'t, Result<Transaction<'t, DB>, Error>> {
222        Transaction::begin(&mut **self)
223    }
224}
225
226impl<'c, DB> Drop for Transaction<'c, DB>
227where
228    DB: Database,
229{
230    fn drop(&mut self) {
231        if self.open {
232            // starts a rollback operation
233
234            // what this does depends on the database but generally this means we queue a rollback
235            // operation that will happen on the next asynchronous invocation of the underlying
236            // connection (including if the connection is returned to a pool)
237
238            DB::TransactionManager::start_rollback(&mut self.connection);
239        }
240    }
241}
242
243pub fn begin_ansi_transaction_sql(depth: usize) -> Cow<'static, str> {
244    if depth == 0 {
245        Cow::Borrowed("BEGIN")
246    } else {
247        Cow::Owned(format!("SAVEPOINT _sqlx_savepoint_{depth}"))
248    }
249}
250
251pub fn commit_ansi_transaction_sql(depth: usize) -> Cow<'static, str> {
252    if depth == 1 {
253        Cow::Borrowed("COMMIT")
254    } else {
255        Cow::Owned(format!("RELEASE SAVEPOINT _sqlx_savepoint_{}", depth - 1))
256    }
257}
258
259pub fn rollback_ansi_transaction_sql(depth: usize) -> Cow<'static, str> {
260    if depth == 1 {
261        Cow::Borrowed("ROLLBACK")
262    } else {
263        Cow::Owned(format!(
264            "ROLLBACK TO SAVEPOINT _sqlx_savepoint_{}",
265            depth - 1
266        ))
267    }
268}