sqlx_postgres/
transaction.rs1use futures_core::future::BoxFuture;
2use sqlx_core::database::Database;
3use std::borrow::Cow;
4
5use crate::error::Error;
6use crate::executor::Executor;
7
8use crate::{PgConnection, Postgres};
9
10pub(crate) use sqlx_core::transaction::*;
11
12pub struct PgTransactionManager;
14
15impl TransactionManager for PgTransactionManager {
16 type Database = Postgres;
17
18 fn begin<'conn>(
19 conn: &'conn mut PgConnection,
20 statement: Option<Cow<'static, str>>,
21 ) -> BoxFuture<'conn, Result<(), Error>> {
22 Box::pin(async move {
23 let depth = conn.inner.transaction_depth;
24 let statement = match statement {
25 Some(_) if depth > 0 => return Err(Error::InvalidSavePointStatement),
28 Some(statement) => statement,
29 None => begin_ansi_transaction_sql(depth),
30 };
31
32 let rollback = Rollback::new(conn);
33 rollback.conn.queue_simple_query(&statement)?;
34 rollback.conn.wait_until_ready().await?;
35 if !rollback.conn.in_transaction() {
36 return Err(Error::BeginFailed);
37 }
38 rollback.conn.inner.transaction_depth += 1;
39 rollback.defuse();
40
41 Ok(())
42 })
43 }
44
45 fn commit(conn: &mut PgConnection) -> BoxFuture<'_, Result<(), Error>> {
46 Box::pin(async move {
47 if conn.inner.transaction_depth > 0 {
48 conn.execute(&*commit_ansi_transaction_sql(conn.inner.transaction_depth))
49 .await?;
50
51 conn.inner.transaction_depth -= 1;
52 }
53
54 Ok(())
55 })
56 }
57
58 fn rollback(conn: &mut PgConnection) -> BoxFuture<'_, Result<(), Error>> {
59 Box::pin(async move {
60 if conn.inner.transaction_depth > 0 {
61 conn.execute(&*rollback_ansi_transaction_sql(
62 conn.inner.transaction_depth,
63 ))
64 .await?;
65
66 conn.inner.transaction_depth -= 1;
67 }
68
69 Ok(())
70 })
71 }
72
73 fn start_rollback(conn: &mut PgConnection) {
74 if conn.inner.transaction_depth > 0 {
75 conn.queue_simple_query(&rollback_ansi_transaction_sql(conn.inner.transaction_depth))
76 .expect("BUG: Rollback query somehow too large for protocol");
77
78 conn.inner.transaction_depth -= 1;
79 }
80 }
81
82 fn get_transaction_depth(conn: &<Self::Database as Database>::Connection) -> usize {
83 conn.inner.transaction_depth
84 }
85}
86
87struct Rollback<'c> {
88 conn: &'c mut PgConnection,
89 defuse: bool,
90}
91
92impl Drop for Rollback<'_> {
93 fn drop(&mut self) {
94 if !self.defuse {
95 PgTransactionManager::start_rollback(self.conn)
96 }
97 }
98}
99
100impl<'c> Rollback<'c> {
101 fn new(conn: &'c mut PgConnection) -> Self {
102 Self {
103 conn,
104 defuse: false,
105 }
106 }
107 fn defuse(mut self) {
108 self.defuse = true;
109 }
110}