1use futures_core::future::BoxFuture;
9use sqlx_core::error::Error;
10use sqlx_core::transaction::TransactionManager;
11
12use crate::connection::SpgConnection;
13use crate::database::Spg;
14use crate::error::engine_to_sqlx;
15
16#[derive(Debug)]
20pub struct SpgTransactionManager;
21
22impl TransactionManager for SpgTransactionManager {
23 type Database = Spg;
24
25 fn begin<'conn>(
26 conn: &'conn mut SpgConnection,
27 statement: Option<std::borrow::Cow<'static, str>>,
28 ) -> BoxFuture<'conn, Result<(), Error>> {
29 Box::pin(async move {
30 let sql = statement
31 .as_deref()
32 .map(std::string::ToString::to_string)
33 .unwrap_or_else(|| "BEGIN".to_string());
34 conn.inner
35 .execute(&sql)
36 .await
37 .map_err(engine_to_sqlx)?;
38 conn.tx_depth = conn.tx_depth.saturating_add(1);
39 Ok(())
40 })
41 }
42
43 fn commit(conn: &mut SpgConnection) -> BoxFuture<'_, Result<(), Error>> {
44 Box::pin(async move {
45 if conn.tx_depth == 0 {
46 return Err(engine_to_sqlx(spg_embedded::EngineError::NoActiveTransaction));
47 }
48 conn.inner
49 .execute("COMMIT")
50 .await
51 .map_err(engine_to_sqlx)?;
52 conn.tx_depth = conn.tx_depth.saturating_sub(1);
53 Ok(())
54 })
55 }
56
57 fn rollback(conn: &mut SpgConnection) -> BoxFuture<'_, Result<(), Error>> {
58 Box::pin(async move {
59 if conn.tx_depth == 0 {
60 return Err(engine_to_sqlx(spg_embedded::EngineError::NoActiveTransaction));
61 }
62 conn.inner
63 .execute("ROLLBACK")
64 .await
65 .map_err(engine_to_sqlx)?;
66 conn.tx_depth = conn.tx_depth.saturating_sub(1);
67 Ok(())
68 })
69 }
70
71 fn start_rollback(conn: &mut SpgConnection) {
72 if conn.tx_depth > 0 {
79 conn.tx_depth = conn.tx_depth.saturating_sub(1);
80 conn.pending_rollback = true;
81 }
82 }
83
84 fn get_transaction_depth(conn: &SpgConnection) -> usize {
85 conn.tx_depth
86 }
87}