Skip to main content

arcly_http/data/
tx.rs

1//! Request-scoped transactions for `#[Transactional]`.
2//!
3//! ## Contract
4//!
5//! - `begin` on entering the handler, against the **primary** of the
6//!   request-tenant's pool.
7//! - **Commit** when the handler returns `Ok(..)`.
8//! - **Rollback** when it returns `Err(..)` — and on panic / `#[Timeout]`
9//!   expiry, because dropping an uncommitted driver transaction rolls back
10//!   in every supported ecosystem.
11//!
12//! ## Zero-lock mechanics
13//!
14//! The live transaction rides a `tokio::task_local!` slot — strictly
15//! per-request-task state, no global registry, no mutex. Services reach it
16//! through [`with_current_tx`], which *takes* the transaction out of the
17//! slot, awaits the closure, and puts it back — so no `RefCell` borrow is
18//! ever held across an `.await` (keeping handler futures `Send`).
19//!
20//! Diesel's sync core cannot hold a transaction across `.await` by design;
21//! `#[Transactional]` therefore rejects Diesel-backed pools at runtime with
22//! a clear error — use `DieselBlockingPool::transaction` (whole tx inside
23//! one blocking closure) instead.
24
25use std::cell::RefCell;
26
27use crate::data::db::ArclyDbPool;
28#[cfg(any(feature = "db-sqlx", feature = "db-seaorm", feature = "db-diesel"))]
29use crate::data::db::DbDriver;
30use crate::data::DataError;
31use crate::web::context::RequestContext;
32use crate::web::error::{HttpException, Internal};
33
34// ─── Transaction wrapper ──────────────────────────────────────────────────────
35
36/// One live driver transaction. Dropping without commit rolls back.
37pub enum ArclyTransaction {
38    #[cfg(feature = "db-sqlx")]
39    Sqlx(sqlx::Transaction<'static, sqlx::Any>),
40    #[cfg(feature = "db-seaorm")]
41    SeaOrm(sea_orm::DatabaseTransaction),
42}
43
44impl ArclyTransaction {
45    pub async fn commit(self) -> Result<(), DataError> {
46        match self {
47            #[cfg(feature = "db-sqlx")]
48            ArclyTransaction::Sqlx(tx) => tx.commit().await.map_err(|e| DataError(e.to_string())),
49            #[cfg(feature = "db-seaorm")]
50            ArclyTransaction::SeaOrm(tx) => tx.commit().await.map_err(|e| DataError(e.to_string())),
51            #[allow(unreachable_patterns)]
52            _ => Ok(()),
53        }
54    }
55
56    pub async fn rollback(self) -> Result<(), DataError> {
57        match self {
58            #[cfg(feature = "db-sqlx")]
59            ArclyTransaction::Sqlx(tx) => tx.rollback().await.map_err(|e| DataError(e.to_string())),
60            #[cfg(feature = "db-seaorm")]
61            ArclyTransaction::SeaOrm(tx) => {
62                tx.rollback().await.map_err(|e| DataError(e.to_string()))
63            }
64            #[allow(unreachable_patterns)]
65            _ => Ok(()),
66        }
67    }
68}
69
70impl ArclyDbPool {
71    /// Open a transaction on the **primary** driver of this pool.
72    #[allow(unreachable_code)]
73    pub async fn begin(&self) -> Result<ArclyTransaction, DataError> {
74        match self.primary() {
75            #[cfg(feature = "db-sqlx")]
76            DbDriver::Sqlx(pool) => Ok(ArclyTransaction::Sqlx(
77                pool.begin().await.map_err(|e| DataError(e.to_string()))?,
78            )),
79            #[cfg(feature = "db-seaorm")]
80            DbDriver::SeaOrm(conn) => {
81                use sea_orm::TransactionTrait;
82                Ok(ArclyTransaction::SeaOrm(
83                    conn.begin().await.map_err(|e| DataError(e.to_string()))?,
84                ))
85            }
86            #[cfg(feature = "db-diesel")]
87            DbDriver::Diesel(_) => Err(DataError(
88                "#[Transactional] is not supported on sync Diesel pools — \
89                 run the whole transaction inside DieselBlockingPool::transaction(…)"
90                    .into(),
91            )),
92            #[allow(unreachable_patterns)]
93            _ => Err(DataError("no database driver feature enabled".into())),
94        }
95    }
96}
97
98// ─── Task-local slot ──────────────────────────────────────────────────────────
99
100tokio::task_local! {
101    /// The current request's transaction. Per-task, never shared, no locks.
102    static CURRENT_TX: RefCell<Option<ArclyTransaction>>;
103}
104
105/// Run `work` with this request's transaction (if `#[Transactional]` opened
106/// one). The transaction is moved out of the slot for the duration of the
107/// closure and put back afterwards, so the future stays `Send`.
108///
109/// Returns `Ok(None)` when called outside a `#[Transactional]` scope —
110/// callers fall back to autocommit through the pool.
111pub async fn with_current_tx<R, F, Fut>(work: F) -> Result<Option<R>, DataError>
112where
113    F: FnOnce(ArclyTransaction) -> Fut,
114    Fut: std::future::Future<Output = (ArclyTransaction, Result<R, DataError>)>,
115{
116    // Take (not borrow) — the RefCell guard ends before any await.
117    let taken = CURRENT_TX
118        .try_with(|slot| slot.borrow_mut().take())
119        .ok()
120        .flatten();
121
122    let Some(tx) = taken else { return Ok(None) };
123
124    let (tx, result) = work(tx).await;
125
126    // Put back for the rest of the handler (and the final commit/rollback).
127    let _ = CURRENT_TX.try_with(|slot| *slot.borrow_mut() = Some(tx));
128    result.map(Some)
129}
130
131/// `true` when running inside a `#[Transactional]` scope.
132pub fn in_transaction() -> bool {
133    CURRENT_TX
134        .try_with(|slot| slot.borrow().is_some())
135        .unwrap_or(false)
136}
137
138// ─── Macro entry point ────────────────────────────────────────────────────────
139
140/// Called by the `#[Transactional]` expansion. Not public API.
141///
142/// Opens a transaction on the request-tenant's pool, scopes it into
143/// `CURRENT_TX`, runs the handler body, then commits on `Ok` / rolls back
144/// on `Err`. If the body's future is cancelled (`#[Timeout]`, client
145/// disconnect), the scoped transaction drops uncommitted → driver rollback.
146#[doc(hidden)]
147pub async fn run_transactional<T, Fut>(ctx: &RequestContext, body: Fut) -> Result<T, HttpException>
148where
149    Fut: std::future::Future<Output = Result<T, HttpException>>,
150{
151    let registry = ctx
152        .try_inject::<crate::data::DataSourceRegistry<ArclyDbPool>>()
153        .ok_or_else(|| {
154            Internal::new(
155                "#[Transactional] requires DataSourceRegistry<ArclyDbPool> in the DI container",
156            )
157        })?;
158
159    let pool = registry.for_tenant(ctx.tenant());
160    let tx = pool
161        .begin()
162        .await
163        .map_err(|e| Internal::new(format!("failed to begin transaction: {e}")))?;
164
165    CURRENT_TX
166        .scope(RefCell::new(Some(tx)), async move {
167            let outcome = body.await;
168
169            let tx = CURRENT_TX
170                .try_with(|slot| slot.borrow_mut().take())
171                .ok()
172                .flatten();
173
174            match (outcome, tx) {
175                (Ok(v), Some(tx)) => {
176                    tx.commit()
177                        .await
178                        .map_err(|e| Internal::new(format!("commit failed: {e}")))?;
179                    Ok(v)
180                }
181                // A service legitimately consumed/closed the tx itself.
182                (Ok(v), None) => Ok(v),
183                (Err(e), Some(tx)) => {
184                    if let Err(rb) = tx.rollback().await {
185                        tracing::error!(error = %rb, "rollback failed after handler error");
186                    }
187                    Err(e)
188                }
189                (Err(e), None) => Err(e),
190            }
191        })
192        .await
193}