async_bb8_diesel/
connection_manager.rs

1//! An async-safe connection pool for Diesel.
2
3use crate::{Connection, ConnectionError};
4use async_trait::async_trait;
5use diesel::r2d2::{self, ManageConnection, R2D2Connection};
6use std::sync::{Arc, Mutex};
7
8/// A connection manager which implements [`bb8::ManageConnection`] to
9/// integrate with bb8.
10///
11/// ```no_run
12/// use async_bb8_diesel::AsyncRunQueryDsl;
13/// use diesel::prelude::*;
14/// use diesel::pg::PgConnection;
15///
16/// table! {
17///     users (id) {
18///         id -> Integer,
19///     }
20/// }
21///
22/// #[tokio::main]
23/// async fn main() {
24///     use users::dsl;
25///
26///     // Creates a Diesel-specific connection manager for bb8.
27///     let mgr = async_bb8_diesel::ConnectionManager::<PgConnection>::new("localhost:1234");
28///     let pool = bb8::Pool::builder().build(mgr).await.unwrap();
29///
30///     diesel::insert_into(dsl::users)
31///         .values(dsl::id.eq(1337))
32///         .execute_async(&*pool.get().await.unwrap())
33///         .await
34///         .unwrap();
35/// }
36/// ```
37#[derive(Clone)]
38pub struct ConnectionManager<T> {
39    inner: Arc<Mutex<r2d2::ConnectionManager<T>>>,
40}
41
42impl<T: Send + 'static> ConnectionManager<T> {
43    pub fn new<S: Into<String>>(database_url: S) -> Self {
44        Self {
45            inner: Arc::new(Mutex::new(r2d2::ConnectionManager::new(database_url))),
46        }
47    }
48
49    async fn run_blocking<R, F>(&self, f: F) -> R
50    where
51        R: Send + 'static,
52        F: Send + 'static + FnOnce(&r2d2::ConnectionManager<T>) -> R,
53    {
54        let cloned = self.inner.clone();
55        tokio::task::spawn_blocking(move || f(&*cloned.lock().unwrap()))
56            .await
57            // Intentionally panic if the inner closure panics.
58            .unwrap()
59    }
60}
61
62#[async_trait]
63impl<T> bb8::ManageConnection for ConnectionManager<T>
64where
65    T: R2D2Connection + Send + 'static,
66{
67    type Connection = Connection<T>;
68    type Error = ConnectionError;
69
70    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
71        self.run_blocking(|m| m.connect())
72            .await
73            .map(Connection::new)
74            .map_err(ConnectionError::Connection)
75    }
76
77    async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
78        let c = Connection(conn.0.clone());
79        self.run_blocking(move |m| {
80            m.is_valid(&mut *c.inner())?;
81            Ok(())
82        })
83        .await
84    }
85
86    fn has_broken(&self, _: &mut Self::Connection) -> bool {
87        // Diesel returns this value internally. We have no way of calling the
88        // inner method without blocking as this method is not async, but `bb8`
89        // indicates that this method is not mandatory.
90        false
91    }
92}