sqlx_mysql/
transaction.rs

1use sqlx_core::sql_str::SqlStr;
2
3use crate::connection::Waiting;
4use crate::error::Error;
5use crate::executor::Executor;
6use crate::protocol::text::Query;
7use crate::{MySql, MySqlConnection};
8
9pub(crate) use sqlx_core::transaction::*;
10
11/// Implementation of [`TransactionManager`] for MySQL.
12pub struct MySqlTransactionManager;
13
14impl TransactionManager for MySqlTransactionManager {
15    type Database = MySql;
16
17    async fn begin(conn: &mut MySqlConnection, statement: Option<SqlStr>) -> Result<(), Error> {
18        let depth = conn.inner.transaction_depth;
19
20        let statement = match statement {
21            // custom `BEGIN` statements are not allowed if we're already in a transaction
22            // (we need to issue a `SAVEPOINT` instead)
23            Some(_) if depth > 0 => return Err(Error::InvalidSavePointStatement),
24            Some(statement) => statement,
25            None => begin_ansi_transaction_sql(depth),
26        };
27        conn.execute(statement).await?;
28        if !conn.in_transaction() {
29            return Err(Error::BeginFailed);
30        }
31        conn.inner.transaction_depth += 1;
32
33        Ok(())
34    }
35
36    async fn commit(conn: &mut MySqlConnection) -> Result<(), Error> {
37        let depth = conn.inner.transaction_depth;
38
39        if depth > 0 {
40            conn.execute(commit_ansi_transaction_sql(depth)).await?;
41            conn.inner.transaction_depth = depth - 1;
42        }
43
44        Ok(())
45    }
46
47    async fn rollback(conn: &mut MySqlConnection) -> Result<(), Error> {
48        let depth = conn.inner.transaction_depth;
49
50        if depth > 0 {
51            conn.execute(rollback_ansi_transaction_sql(depth)).await?;
52            conn.inner.transaction_depth = depth - 1;
53        }
54
55        Ok(())
56    }
57
58    fn start_rollback(conn: &mut MySqlConnection) {
59        let depth = conn.inner.transaction_depth;
60
61        if depth > 0 {
62            conn.inner.stream.waiting.push_back(Waiting::Result);
63            conn.inner.stream.sequence_id = 0;
64            conn.inner
65                .stream
66                .write_packet(Query(rollback_ansi_transaction_sql(depth).as_str()))
67                .expect("BUG: unexpected error queueing ROLLBACK");
68
69            conn.inner.transaction_depth = depth - 1;
70        }
71    }
72
73    fn get_transaction_depth(conn: &MySqlConnection) -> usize {
74        conn.inner.transaction_depth
75    }
76}