sqlx_postgres/
transaction.rs1use sqlx_core::database::Database;
2use sqlx_core::sql_str::SqlStr;
3
4use crate::error::Error;
5use crate::executor::Executor;
6
7use crate::{PgConnection, Postgres};
8
9pub(crate) use sqlx_core::transaction::*;
10
11pub struct PgTransactionManager;
13
14impl TransactionManager for PgTransactionManager {
15 type Database = Postgres;
16
17 async fn begin(conn: &mut PgConnection, statement: Option<SqlStr>) -> Result<(), Error> {
18 let depth = conn.inner.transaction_depth;
19
20 let statement = match statement {
21 Some(_) if depth > 0 => return Err(Error::InvalidSavePointStatement),
24 Some(statement) => statement,
25 None => begin_ansi_transaction_sql(depth),
26 };
27
28 let rollback = Rollback::new(conn);
29 rollback.conn.queue_simple_query(statement.as_str())?;
30 rollback.conn.wait_until_ready().await?;
31 if !rollback.conn.in_transaction() {
32 return Err(Error::BeginFailed);
33 }
34 rollback.conn.inner.transaction_depth += 1;
35 rollback.defuse();
36
37 Ok(())
38 }
39
40 async fn commit(conn: &mut PgConnection) -> Result<(), Error> {
41 if conn.inner.transaction_depth > 0 {
42 conn.execute(commit_ansi_transaction_sql(conn.inner.transaction_depth))
43 .await?;
44
45 conn.inner.transaction_depth -= 1;
46 }
47
48 Ok(())
49 }
50
51 async fn rollback(conn: &mut PgConnection) -> Result<(), Error> {
52 if conn.inner.transaction_depth > 0 {
53 conn.execute(rollback_ansi_transaction_sql(conn.inner.transaction_depth))
54 .await?;
55
56 conn.inner.transaction_depth -= 1;
57 }
58
59 Ok(())
60 }
61
62 fn start_rollback(conn: &mut PgConnection) {
63 if conn.inner.transaction_depth > 0 {
64 conn.queue_simple_query(
65 rollback_ansi_transaction_sql(conn.inner.transaction_depth).as_str(),
66 )
67 .expect("BUG: Rollback query somehow too large for protocol");
68
69 conn.inner.transaction_depth -= 1;
70 }
71 }
72
73 fn get_transaction_depth(conn: &<Self::Database as Database>::Connection) -> usize {
74 conn.inner.transaction_depth
75 }
76}
77
78struct Rollback<'c> {
79 conn: &'c mut PgConnection,
80 defuse: bool,
81}
82
83impl Drop for Rollback<'_> {
84 fn drop(&mut self) {
85 if !self.defuse {
86 PgTransactionManager::start_rollback(self.conn)
87 }
88 }
89}
90
91impl<'c> Rollback<'c> {
92 fn new(conn: &'c mut PgConnection) -> Self {
93 Self {
94 conn,
95 defuse: false,
96 }
97 }
98 fn defuse(mut self) {
99 self.defuse = true;
100 }
101}