diesel_async/pooled_connection/
mod.rs

1//! This module contains support using diesel-async with
2//! various async rust connection pooling solutions
3//!
4//! See the concrete pool implementations for examples:
5//! * [deadpool](self::deadpool)
6//! * [bb8](self::bb8)
7//! * [mobc](self::mobc)
8use crate::statement_cache::CacheSize;
9use crate::{AsyncConnection, SimpleAsyncConnection};
10use crate::{TransactionManager, UpdateAndFetchResults};
11use diesel::associations::HasTable;
12use diesel::connection::Instrumentation;
13use diesel::QueryResult;
14use futures_core::future::BoxFuture;
15use futures_util::FutureExt;
16use std::borrow::Cow;
17use std::fmt;
18use std::future::Future;
19use std::ops::DerefMut;
20
21#[cfg(feature = "bb8")]
22pub mod bb8;
23#[cfg(feature = "deadpool")]
24pub mod deadpool;
25#[cfg(feature = "mobc")]
26pub mod mobc;
27
28/// The error used when managing connections with `deadpool`.
29#[derive(Debug)]
30pub enum PoolError {
31    /// An error occurred establishing the connection
32    ConnectionError(diesel::result::ConnectionError),
33
34    /// An error occurred pinging the database
35    QueryError(diesel::result::Error),
36}
37
38impl fmt::Display for PoolError {
39    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40        match *self {
41            PoolError::ConnectionError(ref e) => e.fmt(f),
42            PoolError::QueryError(ref e) => e.fmt(f),
43        }
44    }
45}
46
47impl std::error::Error for PoolError {}
48
49/// Type of the custom setup closure passed to [`ManagerConfig::custom_setup`]
50pub type SetupCallback<C> =
51    Box<dyn Fn(&str) -> BoxFuture<diesel::ConnectionResult<C>> + Send + Sync>;
52
53/// Type of the recycle check callback for the [`RecyclingMethod::CustomFunction`] variant
54pub type RecycleCheckCallback<C> = dyn Fn(&mut C) -> BoxFuture<QueryResult<()>> + Send + Sync;
55
56/// Possible methods of how a connection is recycled.
57#[derive(Default)]
58pub enum RecyclingMethod<C> {
59    /// Only check for open transactions when recycling existing connections
60    /// Unless you have special needs this is a safe choice.
61    ///
62    /// If the database connection is closed you will recieve an error on the first place
63    /// you actually try to use the connection
64    Fast,
65    /// In addition to checking for open transactions a test query is executed
66    ///
67    /// This is slower, but guarantees that the database connection is ready to be used.
68    #[default]
69    Verified,
70    /// Like `Verified` but with a custom query
71    CustomQuery(Cow<'static, str>),
72    /// Like `Verified` but with a custom callback that allows to perform more checks
73    ///
74    /// The connection is only recycled if the callback returns `Ok(())`
75    CustomFunction(Box<RecycleCheckCallback<C>>),
76}
77
78impl<C: fmt::Debug> fmt::Debug for RecyclingMethod<C> {
79    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
80        match self {
81            Self::Fast => write!(f, "Fast"),
82            Self::Verified => write!(f, "Verified"),
83            Self::CustomQuery(arg0) => f.debug_tuple("CustomQuery").field(arg0).finish(),
84            Self::CustomFunction(_) => f.debug_tuple("CustomFunction").finish(),
85        }
86    }
87}
88
89/// Configuration object for a Manager.
90///
91/// This makes it possible to specify which [`RecyclingMethod`]
92/// should be used when retrieving existing objects from the `Pool`
93/// and it allows to provide a custom setup function.
94#[non_exhaustive]
95pub struct ManagerConfig<C> {
96    /// Method of how a connection is recycled. See [RecyclingMethod].
97    pub recycling_method: RecyclingMethod<C>,
98    /// Construct a new connection manger
99    /// with a custom setup procedure
100    ///
101    /// This can be used to for example establish a SSL secured
102    /// postgres connection
103    pub custom_setup: SetupCallback<C>,
104}
105
106impl<C> Default for ManagerConfig<C>
107where
108    C: AsyncConnection + 'static,
109{
110    fn default() -> Self {
111        Self {
112            recycling_method: Default::default(),
113            custom_setup: Box::new(|url| C::establish(url).boxed()),
114        }
115    }
116}
117
118/// An connection manager for use with diesel-async.
119///
120/// See the concrete pool implementations for examples:
121/// * [deadpool](self::deadpool)
122/// * [bb8](self::bb8)
123/// * [mobc](self::mobc)
124#[allow(dead_code)]
125pub struct AsyncDieselConnectionManager<C> {
126    connection_url: String,
127    manager_config: ManagerConfig<C>,
128}
129
130impl<C> fmt::Debug for AsyncDieselConnectionManager<C> {
131    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132        write!(
133            f,
134            "AsyncDieselConnectionManager<{}>",
135            std::any::type_name::<C>()
136        )
137    }
138}
139
140impl<C> AsyncDieselConnectionManager<C>
141where
142    C: AsyncConnection + 'static,
143{
144    /// Returns a new connection manager,
145    /// which establishes connections to the given database URL.
146    #[must_use]
147    pub fn new(connection_url: impl Into<String>) -> Self
148    where
149        C: AsyncConnection + 'static,
150    {
151        Self::new_with_config(connection_url, Default::default())
152    }
153
154    /// Returns a new connection manager,
155    /// which establishes connections with the given database URL
156    /// and that uses the specified configuration
157    #[must_use]
158    pub fn new_with_config(
159        connection_url: impl Into<String>,
160        manager_config: ManagerConfig<C>,
161    ) -> Self {
162        Self {
163            connection_url: connection_url.into(),
164            manager_config,
165        }
166    }
167}
168
169impl<C> SimpleAsyncConnection for C
170where
171    C: DerefMut + Send,
172    C::Target: SimpleAsyncConnection + Send,
173{
174    async fn batch_execute(&mut self, query: &str) -> diesel::QueryResult<()> {
175        let conn = self.deref_mut();
176        conn.batch_execute(query).await
177    }
178}
179
180impl<C> AsyncConnection for C
181where
182    C: DerefMut + Send,
183    C::Target: AsyncConnection,
184{
185    type ExecuteFuture<'conn, 'query> =
186        <C::Target as AsyncConnection>::ExecuteFuture<'conn, 'query>;
187    type LoadFuture<'conn, 'query> = <C::Target as AsyncConnection>::LoadFuture<'conn, 'query>;
188    type Stream<'conn, 'query> = <C::Target as AsyncConnection>::Stream<'conn, 'query>;
189    type Row<'conn, 'query> = <C::Target as AsyncConnection>::Row<'conn, 'query>;
190
191    type Backend = <C::Target as AsyncConnection>::Backend;
192
193    type TransactionManager =
194        PoolTransactionManager<<C::Target as AsyncConnection>::TransactionManager>;
195
196    async fn establish(_database_url: &str) -> diesel::ConnectionResult<Self> {
197        Err(diesel::result::ConnectionError::BadConnection(
198            String::from("Cannot directly establish a pooled connection"),
199        ))
200    }
201
202    fn load<'conn, 'query, T>(&'conn mut self, source: T) -> Self::LoadFuture<'conn, 'query>
203    where
204        T: diesel::query_builder::AsQuery + 'query,
205        T::Query: diesel::query_builder::QueryFragment<Self::Backend>
206            + diesel::query_builder::QueryId
207            + 'query,
208    {
209        let conn = self.deref_mut();
210        conn.load(source)
211    }
212
213    fn execute_returning_count<'conn, 'query, T>(
214        &'conn mut self,
215        source: T,
216    ) -> Self::ExecuteFuture<'conn, 'query>
217    where
218        T: diesel::query_builder::QueryFragment<Self::Backend>
219            + diesel::query_builder::QueryId
220            + 'query,
221    {
222        let conn = self.deref_mut();
223        conn.execute_returning_count(source)
224    }
225
226    fn transaction_state(
227        &mut self,
228    ) -> &mut <Self::TransactionManager as crate::transaction_manager::TransactionManager<Self>>::TransactionStateData{
229        let conn = self.deref_mut();
230        conn.transaction_state()
231    }
232
233    async fn begin_test_transaction(&mut self) -> diesel::QueryResult<()> {
234        self.deref_mut().begin_test_transaction().await
235    }
236
237    fn instrumentation(&mut self) -> &mut dyn Instrumentation {
238        self.deref_mut().instrumentation()
239    }
240
241    fn set_instrumentation(&mut self, instrumentation: impl Instrumentation) {
242        self.deref_mut().set_instrumentation(instrumentation);
243    }
244
245    fn set_prepared_statement_cache_size(&mut self, size: CacheSize) {
246        self.deref_mut().set_prepared_statement_cache_size(size);
247    }
248}
249
250#[doc(hidden)]
251#[allow(missing_debug_implementations)]
252pub struct PoolTransactionManager<TM>(std::marker::PhantomData<TM>);
253
254impl<C, TM> TransactionManager<C> for PoolTransactionManager<TM>
255where
256    C: DerefMut + Send,
257    C::Target: AsyncConnection<TransactionManager = TM>,
258    TM: TransactionManager<C::Target>,
259{
260    type TransactionStateData = TM::TransactionStateData;
261
262    async fn begin_transaction(conn: &mut C) -> diesel::QueryResult<()> {
263        TM::begin_transaction(&mut **conn).await
264    }
265
266    async fn rollback_transaction(conn: &mut C) -> diesel::QueryResult<()> {
267        TM::rollback_transaction(&mut **conn).await
268    }
269
270    async fn commit_transaction(conn: &mut C) -> diesel::QueryResult<()> {
271        TM::commit_transaction(&mut **conn).await
272    }
273
274    fn transaction_manager_status_mut(
275        conn: &mut C,
276    ) -> &mut diesel::connection::TransactionManagerStatus {
277        TM::transaction_manager_status_mut(&mut **conn)
278    }
279
280    fn is_broken_transaction_manager(conn: &mut C) -> bool {
281        TM::is_broken_transaction_manager(&mut **conn)
282    }
283}
284
285impl<Changes, Output, Conn> UpdateAndFetchResults<Changes, Output> for Conn
286where
287    Conn: DerefMut + Send,
288    Changes: diesel::prelude::Identifiable + HasTable + Send,
289    Conn::Target: UpdateAndFetchResults<Changes, Output>,
290{
291    fn update_and_fetch<'conn, 'changes>(
292        &'conn mut self,
293        changeset: Changes,
294    ) -> BoxFuture<'changes, QueryResult<Output>>
295    where
296        Changes: 'changes,
297        'conn: 'changes,
298        Self: 'changes,
299    {
300        self.deref_mut().update_and_fetch(changeset)
301    }
302}
303
304#[derive(diesel::query_builder::QueryId)]
305struct CheckConnectionQuery;
306
307impl<DB> diesel::query_builder::QueryFragment<DB> for CheckConnectionQuery
308where
309    DB: diesel::backend::Backend,
310{
311    fn walk_ast<'b>(
312        &'b self,
313        mut pass: diesel::query_builder::AstPass<'_, 'b, DB>,
314    ) -> diesel::QueryResult<()> {
315        pass.push_sql("SELECT 1");
316        Ok(())
317    }
318}
319
320impl diesel::query_builder::Query for CheckConnectionQuery {
321    type SqlType = diesel::sql_types::Integer;
322}
323
324impl<C> diesel::query_dsl::RunQueryDsl<C> for CheckConnectionQuery {}
325
326#[doc(hidden)]
327pub trait PoolableConnection: AsyncConnection {
328    /// Check if a connection is still valid
329    ///
330    /// The default implementation will perform a check based on the provided
331    /// recycling method variant
332    fn ping(
333        &mut self,
334        config: &RecyclingMethod<Self>,
335    ) -> impl Future<Output = diesel::QueryResult<()>> + Send
336    where
337        for<'a> Self: 'a,
338        diesel::dsl::select<diesel::dsl::AsExprOf<i32, diesel::sql_types::Integer>>:
339            crate::methods::ExecuteDsl<Self>,
340        diesel::query_builder::SqlQuery: crate::methods::ExecuteDsl<Self>,
341    {
342        use crate::run_query_dsl::RunQueryDsl;
343        use diesel::IntoSql;
344
345        async move {
346            match config {
347                RecyclingMethod::Fast => Ok(()),
348                RecyclingMethod::Verified => {
349                    diesel::select(1_i32.into_sql::<diesel::sql_types::Integer>())
350                        .execute(self)
351                        .await
352                        .map(|_| ())
353                }
354                RecyclingMethod::CustomQuery(query) => diesel::sql_query(query.as_ref())
355                    .execute(self)
356                    .await
357                    .map(|_| ()),
358                RecyclingMethod::CustomFunction(c) => c(self).await,
359            }
360        }
361    }
362
363    /// Checks if the connection is broken and should not be reused
364    ///
365    /// This method should return only contain a fast non-blocking check
366    /// if the connection is considered to be broken or not. See
367    /// [ManageConnection::has_broken] for details.
368    ///
369    /// The default implementation uses
370    /// [TransactionManager::is_broken_transaction_manager].
371    fn is_broken(&mut self) -> bool {
372        Self::TransactionManager::is_broken_transaction_manager(self)
373    }
374}