sqlx_mysql/
transaction.rs1use sqlx_core::sql_str::SqlStr;
2
3use crate::connection::Waiting;
4use crate::error::Error;
5use crate::executor::Executor;
6use crate::protocol::text::Query;
7use crate::{MySql, MySqlConnection};
8
9pub(crate) use sqlx_core::transaction::*;
10
11pub struct MySqlTransactionManager;
13
14impl TransactionManager for MySqlTransactionManager {
15 type Database = MySql;
16
17 async fn begin(conn: &mut MySqlConnection, statement: Option<SqlStr>) -> Result<(), Error> {
18 let depth = conn.inner.transaction_depth;
19
20 let statement = match statement {
21 Some(_) if depth > 0 => return Err(Error::InvalidSavePointStatement),
24 Some(statement) => statement,
25 None => begin_ansi_transaction_sql(depth),
26 };
27 conn.execute(statement).await?;
28 if !conn.in_transaction() {
29 return Err(Error::BeginFailed);
30 }
31 conn.inner.transaction_depth += 1;
32
33 Ok(())
34 }
35
36 async fn commit(conn: &mut MySqlConnection) -> Result<(), Error> {
37 let depth = conn.inner.transaction_depth;
38
39 if depth > 0 {
40 conn.execute(commit_ansi_transaction_sql(depth)).await?;
41 conn.inner.transaction_depth = depth - 1;
42 }
43
44 Ok(())
45 }
46
47 async fn rollback(conn: &mut MySqlConnection) -> Result<(), Error> {
48 let depth = conn.inner.transaction_depth;
49
50 if depth > 0 {
51 conn.execute(rollback_ansi_transaction_sql(depth)).await?;
52 conn.inner.transaction_depth = depth - 1;
53 }
54
55 Ok(())
56 }
57
58 fn start_rollback(conn: &mut MySqlConnection) {
59 let depth = conn.inner.transaction_depth;
60
61 if depth > 0 {
62 conn.inner.stream.waiting.push_back(Waiting::Result);
63 conn.inner.stream.sequence_id = 0;
64 conn.inner
65 .stream
66 .write_packet(Query(rollback_ansi_transaction_sql(depth).as_str()))
67 .expect("BUG: unexpected error queueing ROLLBACK");
68
69 conn.inner.transaction_depth = depth - 1;
70 }
71 }
72
73 fn get_transaction_depth(conn: &MySqlConnection) -> usize {
74 conn.inner.transaction_depth
75 }
76}