diesel_async/pooled_connection/
mod.rs1use crate::statement_cache::CacheSize;
9use crate::{AsyncConnection, SimpleAsyncConnection};
10use crate::{TransactionManager, UpdateAndFetchResults};
11use diesel::associations::HasTable;
12use diesel::connection::Instrumentation;
13use diesel::QueryResult;
14use futures_core::future::BoxFuture;
15use futures_util::FutureExt;
16use std::borrow::Cow;
17use std::fmt;
18use std::future::Future;
19use std::ops::DerefMut;
20
21#[cfg(feature = "bb8")]
22pub mod bb8;
23#[cfg(feature = "deadpool")]
24pub mod deadpool;
25#[cfg(feature = "mobc")]
26pub mod mobc;
27
28#[derive(Debug)]
30pub enum PoolError {
31 ConnectionError(diesel::result::ConnectionError),
33
34 QueryError(diesel::result::Error),
36}
37
38impl fmt::Display for PoolError {
39 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40 match *self {
41 PoolError::ConnectionError(ref e) => e.fmt(f),
42 PoolError::QueryError(ref e) => e.fmt(f),
43 }
44 }
45}
46
47impl std::error::Error for PoolError {}
48
49pub type SetupCallback<C> =
51 Box<dyn Fn(&str) -> BoxFuture<diesel::ConnectionResult<C>> + Send + Sync>;
52
53pub type RecycleCheckCallback<C> = dyn Fn(&mut C) -> BoxFuture<QueryResult<()>> + Send + Sync;
55
56#[derive(Default)]
58pub enum RecyclingMethod<C> {
59 Fast,
65 #[default]
69 Verified,
70 CustomQuery(Cow<'static, str>),
72 CustomFunction(Box<RecycleCheckCallback<C>>),
76}
77
78impl<C: fmt::Debug> fmt::Debug for RecyclingMethod<C> {
79 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
80 match self {
81 Self::Fast => write!(f, "Fast"),
82 Self::Verified => write!(f, "Verified"),
83 Self::CustomQuery(arg0) => f.debug_tuple("CustomQuery").field(arg0).finish(),
84 Self::CustomFunction(_) => f.debug_tuple("CustomFunction").finish(),
85 }
86 }
87}
88
89#[non_exhaustive]
95pub struct ManagerConfig<C> {
96 pub recycling_method: RecyclingMethod<C>,
98 pub custom_setup: SetupCallback<C>,
104}
105
106impl<C> Default for ManagerConfig<C>
107where
108 C: AsyncConnection + 'static,
109{
110 fn default() -> Self {
111 Self {
112 recycling_method: Default::default(),
113 custom_setup: Box::new(|url| C::establish(url).boxed()),
114 }
115 }
116}
117
118#[allow(dead_code)]
125pub struct AsyncDieselConnectionManager<C> {
126 connection_url: String,
127 manager_config: ManagerConfig<C>,
128}
129
130impl<C> fmt::Debug for AsyncDieselConnectionManager<C> {
131 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132 write!(
133 f,
134 "AsyncDieselConnectionManager<{}>",
135 std::any::type_name::<C>()
136 )
137 }
138}
139
140impl<C> AsyncDieselConnectionManager<C>
141where
142 C: AsyncConnection + 'static,
143{
144 #[must_use]
147 pub fn new(connection_url: impl Into<String>) -> Self
148 where
149 C: AsyncConnection + 'static,
150 {
151 Self::new_with_config(connection_url, Default::default())
152 }
153
154 #[must_use]
158 pub fn new_with_config(
159 connection_url: impl Into<String>,
160 manager_config: ManagerConfig<C>,
161 ) -> Self {
162 Self {
163 connection_url: connection_url.into(),
164 manager_config,
165 }
166 }
167}
168
169impl<C> SimpleAsyncConnection for C
170where
171 C: DerefMut + Send,
172 C::Target: SimpleAsyncConnection + Send,
173{
174 async fn batch_execute(&mut self, query: &str) -> diesel::QueryResult<()> {
175 let conn = self.deref_mut();
176 conn.batch_execute(query).await
177 }
178}
179
180impl<C> AsyncConnection for C
181where
182 C: DerefMut + Send,
183 C::Target: AsyncConnection,
184{
185 type ExecuteFuture<'conn, 'query> =
186 <C::Target as AsyncConnection>::ExecuteFuture<'conn, 'query>;
187 type LoadFuture<'conn, 'query> = <C::Target as AsyncConnection>::LoadFuture<'conn, 'query>;
188 type Stream<'conn, 'query> = <C::Target as AsyncConnection>::Stream<'conn, 'query>;
189 type Row<'conn, 'query> = <C::Target as AsyncConnection>::Row<'conn, 'query>;
190
191 type Backend = <C::Target as AsyncConnection>::Backend;
192
193 type TransactionManager =
194 PoolTransactionManager<<C::Target as AsyncConnection>::TransactionManager>;
195
196 async fn establish(_database_url: &str) -> diesel::ConnectionResult<Self> {
197 Err(diesel::result::ConnectionError::BadConnection(
198 String::from("Cannot directly establish a pooled connection"),
199 ))
200 }
201
202 fn load<'conn, 'query, T>(&'conn mut self, source: T) -> Self::LoadFuture<'conn, 'query>
203 where
204 T: diesel::query_builder::AsQuery + 'query,
205 T::Query: diesel::query_builder::QueryFragment<Self::Backend>
206 + diesel::query_builder::QueryId
207 + 'query,
208 {
209 let conn = self.deref_mut();
210 conn.load(source)
211 }
212
213 fn execute_returning_count<'conn, 'query, T>(
214 &'conn mut self,
215 source: T,
216 ) -> Self::ExecuteFuture<'conn, 'query>
217 where
218 T: diesel::query_builder::QueryFragment<Self::Backend>
219 + diesel::query_builder::QueryId
220 + 'query,
221 {
222 let conn = self.deref_mut();
223 conn.execute_returning_count(source)
224 }
225
226 fn transaction_state(
227 &mut self,
228 ) -> &mut <Self::TransactionManager as crate::transaction_manager::TransactionManager<Self>>::TransactionStateData{
229 let conn = self.deref_mut();
230 conn.transaction_state()
231 }
232
233 async fn begin_test_transaction(&mut self) -> diesel::QueryResult<()> {
234 self.deref_mut().begin_test_transaction().await
235 }
236
237 fn instrumentation(&mut self) -> &mut dyn Instrumentation {
238 self.deref_mut().instrumentation()
239 }
240
241 fn set_instrumentation(&mut self, instrumentation: impl Instrumentation) {
242 self.deref_mut().set_instrumentation(instrumentation);
243 }
244
245 fn set_prepared_statement_cache_size(&mut self, size: CacheSize) {
246 self.deref_mut().set_prepared_statement_cache_size(size);
247 }
248}
249
250#[doc(hidden)]
251#[allow(missing_debug_implementations)]
252pub struct PoolTransactionManager<TM>(std::marker::PhantomData<TM>);
253
254impl<C, TM> TransactionManager<C> for PoolTransactionManager<TM>
255where
256 C: DerefMut + Send,
257 C::Target: AsyncConnection<TransactionManager = TM>,
258 TM: TransactionManager<C::Target>,
259{
260 type TransactionStateData = TM::TransactionStateData;
261
262 async fn begin_transaction(conn: &mut C) -> diesel::QueryResult<()> {
263 TM::begin_transaction(&mut **conn).await
264 }
265
266 async fn rollback_transaction(conn: &mut C) -> diesel::QueryResult<()> {
267 TM::rollback_transaction(&mut **conn).await
268 }
269
270 async fn commit_transaction(conn: &mut C) -> diesel::QueryResult<()> {
271 TM::commit_transaction(&mut **conn).await
272 }
273
274 fn transaction_manager_status_mut(
275 conn: &mut C,
276 ) -> &mut diesel::connection::TransactionManagerStatus {
277 TM::transaction_manager_status_mut(&mut **conn)
278 }
279
280 fn is_broken_transaction_manager(conn: &mut C) -> bool {
281 TM::is_broken_transaction_manager(&mut **conn)
282 }
283}
284
285impl<Changes, Output, Conn> UpdateAndFetchResults<Changes, Output> for Conn
286where
287 Conn: DerefMut + Send,
288 Changes: diesel::prelude::Identifiable + HasTable + Send,
289 Conn::Target: UpdateAndFetchResults<Changes, Output>,
290{
291 fn update_and_fetch<'conn, 'changes>(
292 &'conn mut self,
293 changeset: Changes,
294 ) -> BoxFuture<'changes, QueryResult<Output>>
295 where
296 Changes: 'changes,
297 'conn: 'changes,
298 Self: 'changes,
299 {
300 self.deref_mut().update_and_fetch(changeset)
301 }
302}
303
304#[derive(diesel::query_builder::QueryId)]
305struct CheckConnectionQuery;
306
307impl<DB> diesel::query_builder::QueryFragment<DB> for CheckConnectionQuery
308where
309 DB: diesel::backend::Backend,
310{
311 fn walk_ast<'b>(
312 &'b self,
313 mut pass: diesel::query_builder::AstPass<'_, 'b, DB>,
314 ) -> diesel::QueryResult<()> {
315 pass.push_sql("SELECT 1");
316 Ok(())
317 }
318}
319
320impl diesel::query_builder::Query for CheckConnectionQuery {
321 type SqlType = diesel::sql_types::Integer;
322}
323
324impl<C> diesel::query_dsl::RunQueryDsl<C> for CheckConnectionQuery {}
325
326#[doc(hidden)]
327pub trait PoolableConnection: AsyncConnection {
328 fn ping(
333 &mut self,
334 config: &RecyclingMethod<Self>,
335 ) -> impl Future<Output = diesel::QueryResult<()>> + Send
336 where
337 for<'a> Self: 'a,
338 diesel::dsl::select<diesel::dsl::AsExprOf<i32, diesel::sql_types::Integer>>:
339 crate::methods::ExecuteDsl<Self>,
340 diesel::query_builder::SqlQuery: crate::methods::ExecuteDsl<Self>,
341 {
342 use crate::run_query_dsl::RunQueryDsl;
343 use diesel::IntoSql;
344
345 async move {
346 match config {
347 RecyclingMethod::Fast => Ok(()),
348 RecyclingMethod::Verified => {
349 diesel::select(1_i32.into_sql::<diesel::sql_types::Integer>())
350 .execute(self)
351 .await
352 .map(|_| ())
353 }
354 RecyclingMethod::CustomQuery(query) => diesel::sql_query(query.as_ref())
355 .execute(self)
356 .await
357 .map(|_| ()),
358 RecyclingMethod::CustomFunction(c) => c(self).await,
359 }
360 }
361 }
362
363 fn is_broken(&mut self) -> bool {
372 Self::TransactionManager::is_broken_transaction_manager(self)
373 }
374}