1use futures_core::future::BoxFuture;
9use sqlx_core::error::Error;
10use sqlx_core::transaction::TransactionManager;
11
12use crate::connection::SpgConnection;
13use crate::database::Spg;
14use crate::error::engine_to_sqlx;
15
16#[derive(Debug)]
20pub struct SpgTransactionManager;
21
22impl TransactionManager for SpgTransactionManager {
23 type Database = Spg;
24
25 fn begin<'conn>(
26 conn: &'conn mut SpgConnection,
27 statement: Option<std::borrow::Cow<'static, str>>,
28 ) -> BoxFuture<'conn, Result<(), Error>> {
29 Box::pin(async move {
30 let sql = statement
31 .as_deref()
32 .map(std::string::ToString::to_string)
33 .unwrap_or_else(|| "BEGIN".to_string());
34 conn.inner.execute(&sql).await.map_err(engine_to_sqlx)?;
35 conn.tx_depth = conn.tx_depth.saturating_add(1);
36 Ok(())
37 })
38 }
39
40 fn commit(conn: &mut SpgConnection) -> BoxFuture<'_, Result<(), Error>> {
41 Box::pin(async move {
42 if conn.tx_depth == 0 {
43 return Err(engine_to_sqlx(
44 spg_embedded::EngineError::NoActiveTransaction,
45 ));
46 }
47 conn.inner.execute("COMMIT").await.map_err(engine_to_sqlx)?;
48 conn.tx_depth = conn.tx_depth.saturating_sub(1);
49 Ok(())
50 })
51 }
52
53 fn rollback(conn: &mut SpgConnection) -> BoxFuture<'_, Result<(), Error>> {
54 Box::pin(async move {
55 if conn.tx_depth == 0 {
56 return Err(engine_to_sqlx(
57 spg_embedded::EngineError::NoActiveTransaction,
58 ));
59 }
60 conn.inner
61 .execute("ROLLBACK")
62 .await
63 .map_err(engine_to_sqlx)?;
64 conn.tx_depth = conn.tx_depth.saturating_sub(1);
65 Ok(())
66 })
67 }
68
69 fn start_rollback(conn: &mut SpgConnection) {
70 if conn.tx_depth > 0 {
77 conn.tx_depth = conn.tx_depth.saturating_sub(1);
78 conn.pending_rollback = true;
79 }
80 }
81
82 fn get_transaction_depth(conn: &SpgConnection) -> usize {
83 conn.tx_depth
84 }
85}