sqlx_build_trust_postgres/
transaction.rs

1use futures_core::future::BoxFuture;
2
3use crate::error::Error;
4use crate::executor::Executor;
5
6use crate::{PgConnection, Postgres};
7
8pub(crate) use sqlx_core::transaction::*;
9
10/// Implementation of [`TransactionManager`] for PostgreSQL.
11pub struct PgTransactionManager;
12
13impl TransactionManager for PgTransactionManager {
14    type Database = Postgres;
15
16    fn begin(conn: &mut PgConnection) -> BoxFuture<'_, Result<(), Error>> {
17        Box::pin(async move {
18            let rollback = Rollback::new(conn);
19            let query = begin_ansi_transaction_sql(rollback.conn.transaction_depth);
20            rollback.conn.queue_simple_query(&query);
21            rollback.conn.transaction_depth += 1;
22            rollback.conn.wait_until_ready().await?;
23            rollback.defuse();
24
25            Ok(())
26        })
27    }
28
29    fn commit(conn: &mut PgConnection) -> BoxFuture<'_, Result<(), Error>> {
30        Box::pin(async move {
31            if conn.transaction_depth > 0 {
32                conn.execute(&*commit_ansi_transaction_sql(conn.transaction_depth))
33                    .await?;
34
35                conn.transaction_depth -= 1;
36            }
37
38            Ok(())
39        })
40    }
41
42    fn rollback(conn: &mut PgConnection) -> BoxFuture<'_, Result<(), Error>> {
43        Box::pin(async move {
44            if conn.transaction_depth > 0 {
45                conn.execute(&*rollback_ansi_transaction_sql(conn.transaction_depth))
46                    .await?;
47
48                conn.transaction_depth -= 1;
49            }
50
51            Ok(())
52        })
53    }
54
55    fn start_rollback(conn: &mut PgConnection) {
56        if conn.transaction_depth > 0 {
57            conn.queue_simple_query(&rollback_ansi_transaction_sql(conn.transaction_depth));
58
59            conn.transaction_depth -= 1;
60        }
61    }
62}
63
64struct Rollback<'c> {
65    conn: &'c mut PgConnection,
66    defuse: bool,
67}
68
69impl Drop for Rollback<'_> {
70    fn drop(&mut self) {
71        if !self.defuse {
72            PgTransactionManager::start_rollback(self.conn)
73        }
74    }
75}
76
77impl<'c> Rollback<'c> {
78    fn new(conn: &'c mut PgConnection) -> Self {
79        Self {
80            conn,
81            defuse: false,
82        }
83    }
84    fn defuse(mut self) {
85        self.defuse = true;
86    }
87}