sqlx_xugu/
transaction.rs

1use crate::connection::XuguConnection;
2use crate::protocol::text::Query;
3use crate::Xugu;
4use futures_core::future::BoxFuture;
5use sqlx_core::executor::Executor;
6use sqlx_core::transaction::*;
7use sqlx_core::Error;
8use std::borrow::Cow;
9
10/// Implementation of [`TransactionManager`] for Xugu.
11pub struct XuguTransactionManager;
12
13impl TransactionManager for XuguTransactionManager {
14    type Database = Xugu;
15
16    fn begin<'conn>(
17        conn: &'conn mut XuguConnection,
18        statement: Option<Cow<'static, str>>,
19    ) -> BoxFuture<'conn, Result<(), Error>> {
20        Box::pin(async move {
21            let depth = conn.inner.transaction_depth;
22            let statement = match statement {
23                // custom `BEGIN` statements are not allowed if we're already in a transaction
24                // (we need to issue a `SAVEPOINT` instead)
25                Some(_) if depth > 0 => return Err(Error::InvalidSavePointStatement),
26                Some(statement) => statement,
27                None => begin_ansi_transaction_sql(depth),
28            };
29            conn.execute(&*statement).await?;
30            if !conn.in_transaction() {
31                return Err(Error::BeginFailed);
32            }
33            conn.inner.transaction_depth += 1;
34
35            Ok(())
36        })
37    }
38
39    fn commit(conn: &mut XuguConnection) -> BoxFuture<'_, Result<(), Error>> {
40        Box::pin(async move {
41            let depth = conn.inner.transaction_depth;
42            if depth > 0 {
43                // 虚谷 v11 不支持 事务保存点的释放 RELEASE SAVEPOINT _sqlx_savepoint_1
44                // 所以忽略  RELEASE SAVEPOINT 的执行,只执行最后的的 COMMIT
45                if depth == 1 {
46                    conn.execute(&*commit_ansi_transaction_sql(depth)).await?;
47                }
48
49                conn.inner.transaction_depth = depth - 1;
50            }
51
52            Ok(())
53        })
54    }
55
56    fn rollback(conn: &mut XuguConnection) -> BoxFuture<'_, Result<(), Error>> {
57        Box::pin(async move {
58            let depth = conn.inner.transaction_depth;
59
60            if depth > 0 {
61                conn.execute(&*rollback_ansi_transaction_sql(depth)).await?;
62                conn.inner.transaction_depth = depth - 1;
63            }
64
65            Ok(())
66        })
67    }
68
69    /// starts a rollback operation
70    ///
71    /// what this does depends on the database but generally this means we queue a rollback
72    /// operation that will happen on the next asynchronous invocation of the underlying
73    /// connection (including if the connection is returned to a pool)
74    fn start_rollback(conn: &mut XuguConnection) {
75        let depth = conn.inner.transaction_depth;
76
77        if depth > 0 {
78            conn.inner
79                .stream
80                .write_packet(Query(&rollback_ansi_transaction_sql(depth)))
81                .expect("BUG: unexpected error queueing ROLLBACK");
82            // Queue a simple query (not prepared) to execute the next time this connection is used.
83            conn.inner.pending_ready_for_query_count += 1;
84
85            conn.inner.transaction_depth = depth - 1;
86        }
87    }
88
89    fn get_transaction_depth(conn: &XuguConnection) -> usize {
90        conn.inner.transaction_depth
91    }
92}