Skip to main content

arcly_http/data/drivers/
diesel.rs

1//! Diesel adapter — sync core fully isolated behind `spawn_blocking`.
2//!
3//! ## The blocking rule
4//!
5//! Classic Diesel (and its `r2d2` pool) is synchronous: `pool.get()` can
6//! park a thread and queries block until complete. **Neither may ever run on
7//! a tokio worker.** This adapter ships the *whole job* — pool acquisition,
8//! queries, and the transaction — to the blocking thread pool as one
9//! closure, so HTTP workers are never starved:
10//!
11//! ```ignore
12//! let users = diesel_pool.run(|conn| {
13//!     users::table.limit(10).load::<User>(conn)
14//! }).await?;
15//!
16//! // Whole transaction in one closure — commit on Ok, rollback on Err:
17//! diesel_pool.transaction(|conn| {
18//!     diesel::insert_into(notes::table).values(&new_note).execute(conn)?;
19//!     diesel::insert_into(outbox::table).values(&event).execute(conn)
20//! }).await?;
21//! ```
22//!
23//! Because a sync transaction cannot be held across `.await`,
24//! `#[Transactional]` deliberately rejects Diesel pools (see `data::tx`) —
25//! [`DieselBlockingPool::transaction`] is the supported equivalent, with the
26//! same commit-on-Ok / rollback-on-Err contract enforced by Diesel itself.
27//!
28//! ## Sizing invariant
29//!
30//! Keep `r2d2` pool size ≤ tokio's blocking-thread budget (default 512);
31//! otherwise jobs queue behind thread availability instead of pool
32//! availability and the backpressure signal lands in the wrong place.
33
34use std::sync::Arc;
35
36use crate::data::DataError;
37
38#[cfg(feature = "db-diesel-sqlite")]
39pub type DieselConn = diesel::sqlite::SqliteConnection;
40#[cfg(all(feature = "db-diesel-postgres", not(feature = "db-diesel-sqlite")))]
41pub type DieselConn = diesel::pg::PgConnection;
42#[cfg(all(
43    feature = "db-diesel",
44    not(any(feature = "db-diesel-sqlite", feature = "db-diesel-postgres")),
45))]
46compile_error!("enable a Diesel backend: db-diesel-sqlite or db-diesel-postgres");
47
48type Pool = r2d2::Pool<diesel::r2d2::ConnectionManager<DieselConn>>;
49
50/// Cloneable (Arc-backed) handle around a Diesel `r2d2` pool.
51#[derive(Clone)]
52pub struct DieselBlockingPool {
53    pool: Arc<Pool>,
54}
55
56impl DieselBlockingPool {
57    /// Build the pool. Connection establishment happens lazily per checkout.
58    pub fn new(database_url: &str, max_size: u32) -> Result<Self, DataError> {
59        let manager = diesel::r2d2::ConnectionManager::<DieselConn>::new(database_url);
60        let pool = r2d2::Pool::builder()
61            .max_size(max_size)
62            .build(manager)
63            .map_err(|e| DataError::config(format!("diesel pool build failed: {e}")))?;
64        Ok(Self {
65            pool: Arc::new(pool),
66        })
67    }
68
69    /// Run `job` on the blocking pool. Pool checkout AND the queries execute
70    /// off the async worker; the worker only awaits the join handle.
71    pub async fn run<R, F>(&self, job: F) -> Result<R, DataError>
72    where
73        F: FnOnce(&mut DieselConn) -> Result<R, diesel::result::Error> + Send + 'static,
74        R: Send + 'static,
75    {
76        let pool = self.pool.clone();
77        tokio::task::spawn_blocking(move || {
78            let mut conn = pool
79                .get()
80                .map_err(|e| DataError::connection(e.to_string()))?;
81            job(&mut conn).map_err(|e| DataError::query(e.to_string()))
82        })
83        .await
84        .map_err(|e| DataError::other(format!("blocking task failed: {e}")))?
85    }
86
87    /// Health ping: a pooled checkout on the blocking pool. `r2d2` validates
88    /// connections on checkout, so success means the database answered.
89    pub async fn ping(&self) -> Result<(), DataError> {
90        let pool = self.pool.clone();
91        tokio::task::spawn_blocking(move || {
92            pool.get()
93                .map(|_| ())
94                .map_err(|e| DataError::connection(e.to_string()))
95        })
96        .await
97        .map_err(|e| DataError::other(format!("blocking task failed: {e}")))?
98    }
99
100    /// Run `job` inside one Diesel transaction on the blocking pool:
101    /// commit on `Ok`, rollback on `Err` — the closure-scoped equivalent of
102    /// `#[Transactional]` for the sync ecosystem.
103    pub async fn transaction<R, F>(&self, job: F) -> Result<R, DataError>
104    where
105        F: FnOnce(&mut DieselConn) -> Result<R, diesel::result::Error> + Send + 'static,
106        R: Send + 'static,
107    {
108        use diesel::Connection;
109        self.run(move |conn| conn.transaction(job)).await
110    }
111}