sqlx_postgres/
transaction.rs1use sqlx_core::database::Database;
2use sqlx_core::sql_str::SqlStr;
3
4use crate::error::Error;
5use crate::executor::Executor;
6
7use crate::{PgConnection, Postgres};
8
9pub(crate) use sqlx_core::transaction::*;
10
11pub struct PgTransactionManager;
13
14impl TransactionManager for PgTransactionManager {
15    type Database = Postgres;
16
17    async fn begin(conn: &mut PgConnection, statement: Option<SqlStr>) -> Result<(), Error> {
18        let depth = conn.inner.transaction_depth;
19
20        let statement = match statement {
21            Some(_) if depth > 0 => return Err(Error::InvalidSavePointStatement),
24            Some(statement) => statement,
25            None => begin_ansi_transaction_sql(depth),
26        };
27
28        let rollback = Rollback::new(conn);
29        rollback.conn.queue_simple_query(statement.as_str())?;
30        rollback.conn.wait_until_ready().await?;
31        if !rollback.conn.in_transaction() {
32            return Err(Error::BeginFailed);
33        }
34        rollback.conn.inner.transaction_depth += 1;
35        rollback.defuse();
36
37        Ok(())
38    }
39
40    async fn commit(conn: &mut PgConnection) -> Result<(), Error> {
41        if conn.inner.transaction_depth > 0 {
42            conn.execute(commit_ansi_transaction_sql(conn.inner.transaction_depth))
43                .await?;
44
45            conn.inner.transaction_depth -= 1;
46        }
47
48        Ok(())
49    }
50
51    async fn rollback(conn: &mut PgConnection) -> Result<(), Error> {
52        if conn.inner.transaction_depth > 0 {
53            conn.execute(rollback_ansi_transaction_sql(conn.inner.transaction_depth))
54                .await?;
55
56            conn.inner.transaction_depth -= 1;
57        }
58
59        Ok(())
60    }
61
62    fn start_rollback(conn: &mut PgConnection) {
63        if conn.inner.transaction_depth > 0 {
64            conn.queue_simple_query(
65                rollback_ansi_transaction_sql(conn.inner.transaction_depth).as_str(),
66            )
67            .expect("BUG: Rollback query somehow too large for protocol");
68
69            conn.inner.transaction_depth -= 1;
70        }
71    }
72
73    fn get_transaction_depth(conn: &<Self::Database as Database>::Connection) -> usize {
74        conn.inner.transaction_depth
75    }
76}
77
78struct Rollback<'c> {
79    conn: &'c mut PgConnection,
80    defuse: bool,
81}
82
83impl Drop for Rollback<'_> {
84    fn drop(&mut self) {
85        if !self.defuse {
86            PgTransactionManager::start_rollback(self.conn)
87        }
88    }
89}
90
91impl<'c> Rollback<'c> {
92    fn new(conn: &'c mut PgConnection) -> Self {
93        Self {
94            conn,
95            defuse: false,
96        }
97    }
98    fn defuse(mut self) {
99        self.defuse = true;
100    }
101}