arcly-http 0.4.0

Enterprise-grade NestJS-inspired web framework on axum: zero-lock DI, declarative controllers, multi-tenant data routing, transactional outbox, ABAC, and a self-documenting OpenAPI surface
Documentation
//! Diesel adapter — sync core fully isolated behind `spawn_blocking`.
//!
//! ## The blocking rule
//!
//! Classic Diesel (and its `r2d2` pool) is synchronous: `pool.get()` can
//! park a thread and queries block until complete. **Neither may ever run on
//! a tokio worker.** This adapter ships the *whole job* — pool acquisition,
//! queries, and the transaction — to the blocking thread pool as one
//! closure, so HTTP workers are never starved:
//!
//! ```ignore
//! let users = diesel_pool.run(|conn| {
//!     users::table.limit(10).load::<User>(conn)
//! }).await?;
//!
//! // Whole transaction in one closure — commit on Ok, rollback on Err:
//! diesel_pool.transaction(|conn| {
//!     diesel::insert_into(notes::table).values(&new_note).execute(conn)?;
//!     diesel::insert_into(outbox::table).values(&event).execute(conn)
//! }).await?;
//! ```
//!
//! Because a sync transaction cannot be held across `.await`,
//! `#[Transactional]` deliberately rejects Diesel pools (see `data::tx`) —
//! [`DieselBlockingPool::transaction`] is the supported equivalent, with the
//! same commit-on-Ok / rollback-on-Err contract enforced by Diesel itself.
//!
//! ## Sizing invariant
//!
//! Keep `r2d2` pool size ≤ tokio's blocking-thread budget (default 512);
//! otherwise jobs queue behind thread availability instead of pool
//! availability and the backpressure signal lands in the wrong place.

use std::sync::Arc;

use crate::data::DataError;

#[cfg(feature = "db-diesel-sqlite")]
pub type DieselConn = diesel::sqlite::SqliteConnection;
#[cfg(all(feature = "db-diesel-postgres", not(feature = "db-diesel-sqlite")))]
pub type DieselConn = diesel::pg::PgConnection;
#[cfg(all(
    feature = "db-diesel",
    not(any(feature = "db-diesel-sqlite", feature = "db-diesel-postgres")),
))]
compile_error!("enable a Diesel backend: db-diesel-sqlite or db-diesel-postgres");

type Pool = r2d2::Pool<diesel::r2d2::ConnectionManager<DieselConn>>;

/// Cloneable (Arc-backed) handle around a Diesel `r2d2` pool.
#[derive(Clone)]
pub struct DieselBlockingPool {
    pool: Arc<Pool>,
}

impl DieselBlockingPool {
    /// Build the pool. Connection establishment happens lazily per checkout.
    pub fn new(database_url: &str, max_size: u32) -> Result<Self, DataError> {
        let manager = diesel::r2d2::ConnectionManager::<DieselConn>::new(database_url);
        let pool = r2d2::Pool::builder()
            .max_size(max_size)
            .build(manager)
            .map_err(|e| DataError::config(format!("diesel pool build failed: {e}")))?;
        Ok(Self {
            pool: Arc::new(pool),
        })
    }

    /// Run `job` on the blocking pool. Pool checkout AND the queries execute
    /// off the async worker; the worker only awaits the join handle.
    pub async fn run<R, F>(&self, job: F) -> Result<R, DataError>
    where
        F: FnOnce(&mut DieselConn) -> Result<R, diesel::result::Error> + Send + 'static,
        R: Send + 'static,
    {
        let pool = self.pool.clone();
        tokio::task::spawn_blocking(move || {
            let mut conn = pool
                .get()
                .map_err(|e| DataError::connection(e.to_string()))?;
            job(&mut conn).map_err(|e| DataError::query(e.to_string()))
        })
        .await
        .map_err(|e| DataError::other(format!("blocking task failed: {e}")))?
    }

    /// Health ping: a pooled checkout on the blocking pool. `r2d2` validates
    /// connections on checkout, so success means the database answered.
    pub async fn ping(&self) -> Result<(), DataError> {
        let pool = self.pool.clone();
        tokio::task::spawn_blocking(move || {
            pool.get()
                .map(|_| ())
                .map_err(|e| DataError::connection(e.to_string()))
        })
        .await
        .map_err(|e| DataError::other(format!("blocking task failed: {e}")))?
    }

    /// Run `job` inside one Diesel transaction on the blocking pool:
    /// commit on `Ok`, rollback on `Err` — the closure-scoped equivalent of
    /// `#[Transactional]` for the sync ecosystem.
    pub async fn transaction<R, F>(&self, job: F) -> Result<R, DataError>
    where
        F: FnOnce(&mut DieselConn) -> Result<R, diesel::result::Error> + Send + 'static,
        R: Send + 'static,
    {
        use diesel::Connection;
        self.run(move |conn| conn.transaction(job)).await
    }
}