arcly-http 0.2.1

Enterprise-grade NestJS-inspired web framework on axum: zero-lock DI, declarative controllers, multi-tenant data routing, transactional outbox, ABAC, and a self-documenting OpenAPI surface
Documentation
//! Request-scoped transactions for `#[Transactional]`.
//!
//! ## Contract
//!
//! - `begin` on entering the handler, against the **primary** of the
//!   request-tenant's pool.
//! - **Commit** when the handler returns `Ok(..)`.
//! - **Rollback** when it returns `Err(..)` — and on panic / `#[Timeout]`
//!   expiry, because dropping an uncommitted driver transaction rolls back
//!   in every supported ecosystem.
//!
//! ## Zero-lock mechanics
//!
//! The live transaction rides a `tokio::task_local!` slot — strictly
//! per-request-task state, no global registry, no mutex. Services reach it
//! through [`with_current_tx`], which *takes* the transaction out of the
//! slot, awaits the closure, and puts it back — so no `RefCell` borrow is
//! ever held across an `.await` (keeping handler futures `Send`).
//!
//! Diesel's sync core cannot hold a transaction across `.await` by design;
//! `#[Transactional]` therefore rejects Diesel-backed pools at runtime with
//! a clear error — use `DieselBlockingPool::transaction` (whole tx inside
//! one blocking closure) instead.

use std::cell::RefCell;

use crate::data::db::ArclyDbPool;
#[cfg(any(feature = "db-sqlx", feature = "db-seaorm", feature = "db-diesel"))]
use crate::data::db::DbDriver;
use crate::data::DataError;
use crate::web::context::RequestContext;
use crate::web::error::{HttpException, Internal};

// ─── Transaction wrapper ──────────────────────────────────────────────────────

/// One live driver transaction. Dropping without commit rolls back.
pub enum ArclyTransaction {
    #[cfg(feature = "db-sqlx")]
    Sqlx(sqlx::Transaction<'static, sqlx::Any>),
    #[cfg(feature = "db-seaorm")]
    SeaOrm(sea_orm::DatabaseTransaction),
}

impl ArclyTransaction {
    pub async fn commit(self) -> Result<(), DataError> {
        match self {
            #[cfg(feature = "db-sqlx")]
            ArclyTransaction::Sqlx(tx) => tx
                .commit()
                .await
                .map_err(|e| DataError::query(e.to_string())),
            #[cfg(feature = "db-seaorm")]
            ArclyTransaction::SeaOrm(tx) => tx
                .commit()
                .await
                .map_err(|e| DataError::query(e.to_string())),
            #[allow(unreachable_patterns)]
            _ => Ok(()),
        }
    }

    pub async fn rollback(self) -> Result<(), DataError> {
        match self {
            #[cfg(feature = "db-sqlx")]
            ArclyTransaction::Sqlx(tx) => tx
                .rollback()
                .await
                .map_err(|e| DataError::query(e.to_string())),
            #[cfg(feature = "db-seaorm")]
            ArclyTransaction::SeaOrm(tx) => tx
                .rollback()
                .await
                .map_err(|e| DataError::query(e.to_string())),
            #[allow(unreachable_patterns)]
            _ => Ok(()),
        }
    }
}

impl ArclyDbPool {
    /// Open a transaction on the **primary** driver of this pool.
    #[allow(unreachable_code)]
    pub async fn begin(&self) -> Result<ArclyTransaction, DataError> {
        match self.primary() {
            #[cfg(feature = "db-sqlx")]
            DbDriver::Sqlx(pool) => Ok(ArclyTransaction::Sqlx(
                pool.begin()
                    .await
                    .map_err(|e| DataError::connection(e.to_string()))?,
            )),
            #[cfg(feature = "db-seaorm")]
            DbDriver::SeaOrm(conn) => {
                use sea_orm::TransactionTrait;
                Ok(ArclyTransaction::SeaOrm(
                    conn.begin()
                        .await
                        .map_err(|e| DataError::connection(e.to_string()))?,
                ))
            }
            #[cfg(feature = "db-diesel")]
            DbDriver::Diesel(_) => Err(DataError::config(
                "#[Transactional] is not supported on sync Diesel pools — \
                 run the whole transaction inside DieselBlockingPool::transaction(…)",
            )),
            #[allow(unreachable_patterns)]
            _ => Err(DataError::config("no database driver feature enabled")),
        }
    }
}

// ─── Task-local slot ──────────────────────────────────────────────────────────

tokio::task_local! {
    /// The current request's transaction. Per-task, never shared, no locks.
    static CURRENT_TX: RefCell<Option<ArclyTransaction>>;
}

/// Run `work` with this request's transaction (if `#[Transactional]` opened
/// one). The transaction is moved out of the slot for the duration of the
/// closure and put back afterwards, so the future stays `Send`.
///
/// Returns `Ok(None)` when called outside a `#[Transactional]` scope —
/// callers fall back to autocommit through the pool.
pub async fn with_current_tx<R, F, Fut>(work: F) -> Result<Option<R>, DataError>
where
    F: FnOnce(ArclyTransaction) -> Fut,
    Fut: std::future::Future<Output = (ArclyTransaction, Result<R, DataError>)>,
{
    // Take (not borrow) — the RefCell guard ends before any await.
    let taken = CURRENT_TX
        .try_with(|slot| slot.borrow_mut().take())
        .ok()
        .flatten();

    let Some(tx) = taken else { return Ok(None) };

    let (tx, result) = work(tx).await;

    // Put back for the rest of the handler (and the final commit/rollback).
    let _ = CURRENT_TX.try_with(|slot| *slot.borrow_mut() = Some(tx));
    result.map(Some)
}

/// `true` when running inside a `#[Transactional]` scope.
pub fn in_transaction() -> bool {
    CURRENT_TX
        .try_with(|slot| slot.borrow().is_some())
        .unwrap_or(false)
}

// ─── Macro entry point ────────────────────────────────────────────────────────

/// Called by the `#[Transactional]` expansion. Not public API.
///
/// Opens a transaction on the request-tenant's pool, scopes it into
/// `CURRENT_TX`, runs the handler body, then commits on `Ok` / rolls back
/// on `Err`. If the body's future is cancelled (`#[Timeout]`, client
/// disconnect), the scoped transaction drops uncommitted → driver rollback.
#[doc(hidden)]
pub async fn run_transactional<T, Fut>(ctx: &RequestContext, body: Fut) -> Result<T, HttpException>
where
    Fut: std::future::Future<Output = Result<T, HttpException>>,
{
    let registry = ctx
        .try_inject::<crate::data::DataSourceRegistry<ArclyDbPool>>()
        .ok_or_else(|| {
            Internal::new(
                "#[Transactional] requires DataSourceRegistry<ArclyDbPool> in the DI container",
            )
        })?;

    let pool = registry.for_tenant(ctx.tenant());
    let tx = pool
        .begin()
        .await
        .map_err(|e| Internal::new(format!("failed to begin transaction: {e}")))?;

    CURRENT_TX
        .scope(RefCell::new(Some(tx)), async move {
            let outcome = body.await;

            let tx = CURRENT_TX
                .try_with(|slot| slot.borrow_mut().take())
                .ok()
                .flatten();

            match (outcome, tx) {
                (Ok(v), Some(tx)) => {
                    tx.commit()
                        .await
                        .map_err(|e| Internal::new(format!("commit failed: {e}")))?;
                    Ok(v)
                }
                // A service legitimately consumed/closed the tx itself.
                (Ok(v), None) => Ok(v),
                (Err(e), Some(tx)) => {
                    if let Err(rb) = tx.rollback().await {
                        tracing::error!(error = %rb, "rollback failed after handler error");
                    }
                    Err(e)
                }
                (Err(e), None) => Err(e),
            }
        })
        .await
}