diesel_async/pooled_connection/
mod.rs

1//! This module contains support using diesel-async with
2//! various async rust connection pooling solutions
3//!
4//! See the concrete pool implementations for examples:
5//! * [deadpool](self::deadpool)
6//! * [bb8](self::bb8)
7//! * [mobc](self::mobc)
8use crate::{AsyncConnection, TransactionManager};
9use diesel::QueryResult;
10use futures_core::future::BoxFuture;
11use futures_util::FutureExt;
12use std::borrow::Cow;
13use std::fmt;
14use std::future::Future;
15
16#[cfg(feature = "bb8")]
17pub mod bb8;
18#[cfg(feature = "deadpool")]
19pub mod deadpool;
20#[cfg(feature = "mobc")]
21pub mod mobc;
22
23/// The error used when managing connections with `deadpool`.
24#[derive(Debug)]
25pub enum PoolError {
26    /// An error occurred establishing the connection
27    ConnectionError(diesel::result::ConnectionError),
28
29    /// An error occurred pinging the database
30    QueryError(diesel::result::Error),
31}
32
33impl fmt::Display for PoolError {
34    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35        match *self {
36            PoolError::ConnectionError(ref e) => e.fmt(f),
37            PoolError::QueryError(ref e) => e.fmt(f),
38        }
39    }
40}
41
42impl std::error::Error for PoolError {}
43
44/// Type of the custom setup closure passed to [`ManagerConfig::custom_setup`]
45pub type SetupCallback<C> =
46    Box<dyn Fn(&str) -> BoxFuture<diesel::ConnectionResult<C>> + Send + Sync>;
47
48/// Type of the recycle check callback for the [`RecyclingMethod::CustomFunction`] variant
49pub type RecycleCheckCallback<C> = dyn Fn(&mut C) -> BoxFuture<QueryResult<()>> + Send + Sync;
50
51/// Possible methods of how a connection is recycled.
52#[derive(Default)]
53pub enum RecyclingMethod<C> {
54    /// Only check for open transactions when recycling existing connections
55    /// Unless you have special needs this is a safe choice.
56    ///
57    /// If the database connection is closed you will recieve an error on the first place
58    /// you actually try to use the connection
59    Fast,
60    /// In addition to checking for open transactions a test query is executed
61    ///
62    /// This is slower, but guarantees that the database connection is ready to be used.
63    #[default]
64    Verified,
65    /// Like `Verified` but with a custom query
66    CustomQuery(Cow<'static, str>),
67    /// Like `Verified` but with a custom callback that allows to perform more checks
68    ///
69    /// The connection is only recycled if the callback returns `Ok(())`
70    CustomFunction(Box<RecycleCheckCallback<C>>),
71}
72
73impl<C: fmt::Debug> fmt::Debug for RecyclingMethod<C> {
74    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75        match self {
76            Self::Fast => write!(f, "Fast"),
77            Self::Verified => write!(f, "Verified"),
78            Self::CustomQuery(arg0) => f.debug_tuple("CustomQuery").field(arg0).finish(),
79            Self::CustomFunction(_) => f.debug_tuple("CustomFunction").finish(),
80        }
81    }
82}
83
84/// Configuration object for a Manager.
85///
86/// This makes it possible to specify which [`RecyclingMethod`]
87/// should be used when retrieving existing objects from the `Pool`
88/// and it allows to provide a custom setup function.
89#[non_exhaustive]
90pub struct ManagerConfig<C> {
91    /// Method of how a connection is recycled. See [RecyclingMethod].
92    pub recycling_method: RecyclingMethod<C>,
93    /// Construct a new connection manger
94    /// with a custom setup procedure
95    ///
96    /// This can be used to for example establish a SSL secured
97    /// postgres connection
98    pub custom_setup: SetupCallback<C>,
99}
100
101impl<C> Default for ManagerConfig<C>
102where
103    C: AsyncConnection + 'static,
104{
105    fn default() -> Self {
106        Self {
107            recycling_method: Default::default(),
108            custom_setup: Box::new(|url| C::establish(url).boxed()),
109        }
110    }
111}
112
113/// An connection manager for use with diesel-async.
114///
115/// See the concrete pool implementations for examples:
116/// * [deadpool](self::deadpool)
117/// * [bb8](self::bb8)
118/// * [mobc](self::mobc)
119#[allow(dead_code)]
120pub struct AsyncDieselConnectionManager<C> {
121    connection_url: String,
122    manager_config: ManagerConfig<C>,
123}
124
125impl<C> fmt::Debug for AsyncDieselConnectionManager<C> {
126    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127        write!(
128            f,
129            "AsyncDieselConnectionManager<{}>",
130            std::any::type_name::<C>()
131        )
132    }
133}
134
135impl<C> AsyncDieselConnectionManager<C>
136where
137    C: AsyncConnection + 'static,
138{
139    /// Returns a new connection manager,
140    /// which establishes connections to the given database URL.
141    #[must_use]
142    pub fn new(connection_url: impl Into<String>) -> Self
143    where
144        C: AsyncConnection + 'static,
145    {
146        Self::new_with_config(connection_url, Default::default())
147    }
148
149    /// Returns a new connection manager,
150    /// which establishes connections with the given database URL
151    /// and that uses the specified configuration
152    #[must_use]
153    pub fn new_with_config(
154        connection_url: impl Into<String>,
155        manager_config: ManagerConfig<C>,
156    ) -> Self {
157        Self {
158            connection_url: connection_url.into(),
159            manager_config,
160        }
161    }
162}
163
164#[doc(hidden)]
165pub trait PoolableConnection: AsyncConnection {
166    /// Check if a connection is still valid
167    ///
168    /// The default implementation will perform a check based on the provided
169    /// recycling method variant
170    fn ping(
171        &mut self,
172        config: &RecyclingMethod<Self>,
173    ) -> impl Future<Output = diesel::QueryResult<()>> + Send
174    where
175        for<'a> Self: 'a,
176        diesel::dsl::select<diesel::dsl::AsExprOf<i32, diesel::sql_types::Integer>>:
177            crate::methods::ExecuteDsl<Self>,
178        diesel::query_builder::SqlQuery: crate::methods::ExecuteDsl<Self>,
179    {
180        use crate::run_query_dsl::RunQueryDsl;
181        use diesel::IntoSql;
182
183        async move {
184            match config {
185                RecyclingMethod::Fast => Ok(()),
186                RecyclingMethod::Verified => {
187                    diesel::select(1_i32.into_sql::<diesel::sql_types::Integer>())
188                        .execute(self)
189                        .await
190                        .map(|_| ())
191                }
192                RecyclingMethod::CustomQuery(query) => diesel::sql_query(query.as_ref())
193                    .execute(self)
194                    .await
195                    .map(|_| ()),
196                RecyclingMethod::CustomFunction(c) => c(self).await,
197            }
198        }
199    }
200
201    /// Checks if the connection is broken and should not be reused
202    ///
203    /// This method should return only contain a fast non-blocking check
204    /// if the connection is considered to be broken or not. See
205    /// [ManageConnection::has_broken] for details.
206    ///
207    /// The default implementation uses
208    /// [TransactionManager::is_broken_transaction_manager].
209    fn is_broken(&mut self) -> bool {
210        Self::TransactionManager::is_broken_transaction_manager(self)
211    }
212}