Skip to main content

spg_sqlx/
transaction.rs

1//! v7.16.0 — `sqlx::TransactionManager` for SPG.
2//!
3//! SPG-engine has single-writer BEGIN/COMMIT/ROLLBACK; the
4//! adapter wraps that. Nested transactions are NOT supported —
5//! same restriction as `spg-embedded::Database::with_transaction`.
6//! Save-points map to PG's SAVEPOINT semantics in v7.16.x.
7
8use futures_core::future::BoxFuture;
9use sqlx_core::error::Error;
10use sqlx_core::transaction::TransactionManager;
11
12use crate::connection::SpgConnection;
13use crate::database::Spg;
14use crate::error::engine_to_sqlx;
15
16/// Wires `Connection::begin` / `Transaction::commit` /
17/// `Transaction::rollback` to engine-side
18/// BEGIN/COMMIT/ROLLBACK statements.
19#[derive(Debug)]
20pub struct SpgTransactionManager;
21
22impl TransactionManager for SpgTransactionManager {
23    type Database = Spg;
24
25    fn begin<'conn>(
26        conn: &'conn mut SpgConnection,
27        statement: Option<std::borrow::Cow<'static, str>>,
28    ) -> BoxFuture<'conn, Result<(), Error>> {
29        Box::pin(async move {
30            let sql = statement
31                .as_deref()
32                .map(std::string::ToString::to_string)
33                .unwrap_or_else(|| "BEGIN".to_string());
34            conn.inner
35                .execute(&sql)
36                .await
37                .map_err(engine_to_sqlx)?;
38            conn.tx_depth = conn.tx_depth.saturating_add(1);
39            Ok(())
40        })
41    }
42
43    fn commit(conn: &mut SpgConnection) -> BoxFuture<'_, Result<(), Error>> {
44        Box::pin(async move {
45            if conn.tx_depth == 0 {
46                return Err(engine_to_sqlx(spg_embedded::EngineError::NoActiveTransaction));
47            }
48            conn.inner
49                .execute("COMMIT")
50                .await
51                .map_err(engine_to_sqlx)?;
52            conn.tx_depth = conn.tx_depth.saturating_sub(1);
53            Ok(())
54        })
55    }
56
57    fn rollback(conn: &mut SpgConnection) -> BoxFuture<'_, Result<(), Error>> {
58        Box::pin(async move {
59            if conn.tx_depth == 0 {
60                return Err(engine_to_sqlx(spg_embedded::EngineError::NoActiveTransaction));
61            }
62            conn.inner
63                .execute("ROLLBACK")
64                .await
65                .map_err(engine_to_sqlx)?;
66            conn.tx_depth = conn.tx_depth.saturating_sub(1);
67            Ok(())
68        })
69    }
70
71    fn start_rollback(conn: &mut SpgConnection) {
72        // v7.16.0 — best-effort sync rollback for the Drop
73        // path. We can't await here so the rollback is
74        // queued by clearing the depth; the next async-aware
75        // entry-point on the connection re-issues ROLLBACK if
76        // needed. mailrs's transaction code always reaches
77        // explicit commit/rollback under normal control flow.
78        if conn.tx_depth > 0 {
79            conn.tx_depth = conn.tx_depth.saturating_sub(1);
80            conn.pending_rollback = true;
81        }
82    }
83
84    fn get_transaction_depth(conn: &SpgConnection) -> usize {
85        conn.tx_depth
86    }
87}