Skip to main content

spg_sqlx/
transaction.rs

1//! v7.16.0 — `sqlx::TransactionManager` for SPG.
2//!
3//! SPG-engine has single-writer BEGIN/COMMIT/ROLLBACK; the
4//! adapter wraps that. Nested transactions are NOT supported —
5//! same restriction as `spg-embedded::Database::with_transaction`.
6//! Save-points map to PG's SAVEPOINT semantics in v7.16.x.
7
8use futures_core::future::BoxFuture;
9use sqlx_core::error::Error;
10use sqlx_core::transaction::TransactionManager;
11
12use crate::connection::SpgConnection;
13use crate::database::Spg;
14use crate::error::engine_to_sqlx;
15
16/// Wires `Connection::begin` / `Transaction::commit` /
17/// `Transaction::rollback` to engine-side
18/// BEGIN/COMMIT/ROLLBACK statements.
19#[derive(Debug)]
20pub struct SpgTransactionManager;
21
22impl TransactionManager for SpgTransactionManager {
23    type Database = Spg;
24
25    fn begin<'conn>(
26        conn: &'conn mut SpgConnection,
27        statement: Option<std::borrow::Cow<'static, str>>,
28    ) -> BoxFuture<'conn, Result<(), Error>> {
29        Box::pin(async move {
30            let sql = statement
31                .as_deref()
32                .map(std::string::ToString::to_string)
33                .unwrap_or_else(|| "BEGIN".to_string());
34            conn.inner.execute(&sql).await.map_err(engine_to_sqlx)?;
35            conn.tx_depth = conn.tx_depth.saturating_add(1);
36            Ok(())
37        })
38    }
39
40    fn commit(conn: &mut SpgConnection) -> BoxFuture<'_, Result<(), Error>> {
41        Box::pin(async move {
42            if conn.tx_depth == 0 {
43                return Err(engine_to_sqlx(
44                    spg_embedded::EngineError::NoActiveTransaction,
45                ));
46            }
47            conn.inner.execute("COMMIT").await.map_err(engine_to_sqlx)?;
48            conn.tx_depth = conn.tx_depth.saturating_sub(1);
49            Ok(())
50        })
51    }
52
53    fn rollback(conn: &mut SpgConnection) -> BoxFuture<'_, Result<(), Error>> {
54        Box::pin(async move {
55            if conn.tx_depth == 0 {
56                return Err(engine_to_sqlx(
57                    spg_embedded::EngineError::NoActiveTransaction,
58                ));
59            }
60            conn.inner
61                .execute("ROLLBACK")
62                .await
63                .map_err(engine_to_sqlx)?;
64            conn.tx_depth = conn.tx_depth.saturating_sub(1);
65            Ok(())
66        })
67    }
68
69    fn start_rollback(conn: &mut SpgConnection) {
70        // v7.16.0 — best-effort sync rollback for the Drop
71        // path. We can't await here so the rollback is
72        // queued by clearing the depth; the next async-aware
73        // entry-point on the connection re-issues ROLLBACK if
74        // needed. mailrs's transaction code always reaches
75        // explicit commit/rollback under normal control flow.
76        if conn.tx_depth > 0 {
77            conn.tx_depth = conn.tx_depth.saturating_sub(1);
78            conn.pending_rollback = true;
79        }
80    }
81
82    fn get_transaction_depth(conn: &SpgConnection) -> usize {
83        conn.tx_depth
84    }
85}