sqlx_postgres/
transaction.rs

1use futures_core::future::BoxFuture;
2use sqlx_core::database::Database;
3use std::borrow::Cow;
4
5use crate::error::Error;
6use crate::executor::Executor;
7
8use crate::{PgConnection, Postgres};
9
10pub(crate) use sqlx_core::transaction::*;
11
12/// Implementation of [`TransactionManager`] for PostgreSQL.
13pub struct PgTransactionManager;
14
15impl TransactionManager for PgTransactionManager {
16    type Database = Postgres;
17
18    fn begin<'conn>(
19        conn: &'conn mut PgConnection,
20        statement: Option<Cow<'static, str>>,
21    ) -> BoxFuture<'conn, Result<(), Error>> {
22        Box::pin(async move {
23            let depth = conn.inner.transaction_depth;
24            let statement = match statement {
25                // custom `BEGIN` statements are not allowed if we're already in
26                // a transaction (we need to issue a `SAVEPOINT` instead)
27                Some(_) if depth > 0 => return Err(Error::InvalidSavePointStatement),
28                Some(statement) => statement,
29                None => begin_ansi_transaction_sql(depth),
30            };
31
32            let rollback = Rollback::new(conn);
33            rollback.conn.queue_simple_query(&statement)?;
34            rollback.conn.wait_until_ready().await?;
35            if !rollback.conn.in_transaction() {
36                return Err(Error::BeginFailed);
37            }
38            rollback.conn.inner.transaction_depth += 1;
39            rollback.defuse();
40
41            Ok(())
42        })
43    }
44
45    fn commit(conn: &mut PgConnection) -> BoxFuture<'_, Result<(), Error>> {
46        Box::pin(async move {
47            if conn.inner.transaction_depth > 0 {
48                conn.execute(&*commit_ansi_transaction_sql(conn.inner.transaction_depth))
49                    .await?;
50
51                conn.inner.transaction_depth -= 1;
52            }
53
54            Ok(())
55        })
56    }
57
58    fn rollback(conn: &mut PgConnection) -> BoxFuture<'_, Result<(), Error>> {
59        Box::pin(async move {
60            if conn.inner.transaction_depth > 0 {
61                conn.execute(&*rollback_ansi_transaction_sql(
62                    conn.inner.transaction_depth,
63                ))
64                .await?;
65
66                conn.inner.transaction_depth -= 1;
67            }
68
69            Ok(())
70        })
71    }
72
73    fn start_rollback(conn: &mut PgConnection) {
74        if conn.inner.transaction_depth > 0 {
75            conn.queue_simple_query(&rollback_ansi_transaction_sql(conn.inner.transaction_depth))
76                .expect("BUG: Rollback query somehow too large for protocol");
77
78            conn.inner.transaction_depth -= 1;
79        }
80    }
81
82    fn get_transaction_depth(conn: &<Self::Database as Database>::Connection) -> usize {
83        conn.inner.transaction_depth
84    }
85}
86
87struct Rollback<'c> {
88    conn: &'c mut PgConnection,
89    defuse: bool,
90}
91
92impl Drop for Rollback<'_> {
93    fn drop(&mut self) {
94        if !self.defuse {
95            PgTransactionManager::start_rollback(self.conn)
96        }
97    }
98}
99
100impl<'c> Rollback<'c> {
101    fn new(conn: &'c mut PgConnection) -> Self {
102        Self {
103            conn,
104            defuse: false,
105        }
106    }
107    fn defuse(mut self) {
108        self.defuse = true;
109    }
110}