sqlx_build_trust_postgres/
transaction.rs1use futures_core::future::BoxFuture;
2
3use crate::error::Error;
4use crate::executor::Executor;
5
6use crate::{PgConnection, Postgres};
7
8pub(crate) use sqlx_core::transaction::*;
9
10pub struct PgTransactionManager;
12
13impl TransactionManager for PgTransactionManager {
14 type Database = Postgres;
15
16 fn begin(conn: &mut PgConnection) -> BoxFuture<'_, Result<(), Error>> {
17 Box::pin(async move {
18 let rollback = Rollback::new(conn);
19 let query = begin_ansi_transaction_sql(rollback.conn.transaction_depth);
20 rollback.conn.queue_simple_query(&query);
21 rollback.conn.transaction_depth += 1;
22 rollback.conn.wait_until_ready().await?;
23 rollback.defuse();
24
25 Ok(())
26 })
27 }
28
29 fn commit(conn: &mut PgConnection) -> BoxFuture<'_, Result<(), Error>> {
30 Box::pin(async move {
31 if conn.transaction_depth > 0 {
32 conn.execute(&*commit_ansi_transaction_sql(conn.transaction_depth))
33 .await?;
34
35 conn.transaction_depth -= 1;
36 }
37
38 Ok(())
39 })
40 }
41
42 fn rollback(conn: &mut PgConnection) -> BoxFuture<'_, Result<(), Error>> {
43 Box::pin(async move {
44 if conn.transaction_depth > 0 {
45 conn.execute(&*rollback_ansi_transaction_sql(conn.transaction_depth))
46 .await?;
47
48 conn.transaction_depth -= 1;
49 }
50
51 Ok(())
52 })
53 }
54
55 fn start_rollback(conn: &mut PgConnection) {
56 if conn.transaction_depth > 0 {
57 conn.queue_simple_query(&rollback_ansi_transaction_sql(conn.transaction_depth));
58
59 conn.transaction_depth -= 1;
60 }
61 }
62}
63
64struct Rollback<'c> {
65 conn: &'c mut PgConnection,
66 defuse: bool,
67}
68
69impl Drop for Rollback<'_> {
70 fn drop(&mut self) {
71 if !self.defuse {
72 PgTransactionManager::start_rollback(self.conn)
73 }
74 }
75}
76
77impl<'c> Rollback<'c> {
78 fn new(conn: &'c mut PgConnection) -> Self {
79 Self {
80 conn,
81 defuse: false,
82 }
83 }
84 fn defuse(mut self) {
85 self.defuse = true;
86 }
87}