diesel_async/lib.rs
1#![cfg_attr(docsrs, feature(doc_cfg))]
2//! Diesel-async provides async variants of diesel related query functionality
3//!
4//! diesel-async is an extension to diesel itself. It is designed to be used together
5//! with the main diesel crate. It only provides async variants of core diesel traits,
6//! that perform actual io-work.
7//! This includes async counterparts the following traits:
8//! * [`diesel::prelude::RunQueryDsl`](https://docs.diesel.rs/2.3.x/diesel/prelude/trait.RunQueryDsl.html)
9//! -> [`diesel_async::RunQueryDsl`](crate::RunQueryDsl)
10//! * [`diesel::connection::Connection`](https://docs.diesel.rs/2.3.x/diesel/connection/trait.Connection.html)
11//! -> [`diesel_async::AsyncConnection`](crate::AsyncConnection)
12//! * [`diesel::query_dsl::UpdateAndFetchResults`](https://docs.diesel.rs/2.3.x/diesel/query_dsl/trait.UpdateAndFetchResults.html)
13//! -> [`diesel_async::UpdateAndFetchResults`](crate::UpdateAndFetchResults)
14//!
15//! These traits closely mirror their diesel counter parts while providing async functionality.
16//!
17//! In addition to these core traits 3 fully async connection implementations are provided
18//! by diesel-async:
19//!
20//! * [`AsyncMysqlConnection`] (enabled by the `mysql` feature)
21//! * [`AsyncPgConnection`] (enabled by the `postgres` feature)
22//! * [`SyncConnectionWrapper`](sync_connection_wrapper::SyncConnectionWrapper) (enabled by the `sync-connection-wrapper`/`sqlite` feature)
23//!
24//! Ordinary usage of `diesel-async` assumes that you just replace the corresponding sync trait
25//! method calls and connections with their async counterparts.
26//!
27//! ```rust
28//! # include!("./doctest_setup.rs");
29//! #
30//! use diesel::prelude::*;
31//! use diesel_async::{RunQueryDsl, AsyncConnection};
32//!
33//! diesel::table! {
34//! users(id) {
35//! id -> Integer,
36//! name -> Text,
37//! }
38//! }
39//! #
40//! # #[tokio::main(flavor = "current_thread")]
41//! # async fn main() {
42//! # run_test().await;
43//! # }
44//! #
45//! # async fn run_test() -> QueryResult<()> {
46//!
47//! use crate::users::dsl::*;
48//!
49//! # let mut connection = establish_connection().await;
50//! # /*
51//! let mut connection = AsyncPgConnection::establish(std::env::var("DATABASE_URL")?).await?;
52//! # */
53//! let data = users
54//! // use ordinary diesel query dsl here
55//! .filter(id.gt(0))
56//! // execute the query via the provided
57//! // async variant of `diesel_async::RunQueryDsl`
58//! .load::<(i32, String)>(&mut connection)
59//! .await?;
60//! let expected_data = vec![
61//! (1, String::from("Sean")),
62//! (2, String::from("Tess")),
63//! ];
64//! assert_eq!(expected_data, data);
65//! # Ok(())
66//! # }
67//! ```
68//!
69//! ## Crate features:
70//!
71//! * `postgres`: Enables the [`AsyncPgConnection`] implementation
72//! * `mysql`: Enables the [`AsyncMysqlConnection`] implementation
73//! * `sqlite`: Enables the [`SyncConnectionWrapper`](crate::sync_connection_wrapper::SyncConnectionWrapper)
74//! and everything required to work with SQLite
75//! * `sync-connection-wrapper`: Enables the
76//! [`SyncConnectionWrapper`](crate::sync_connection_wrapper::SyncConnectionWrapper) which allows to
77//! wrap sync connections from [`diesel`] into async connection wrapper
78//! * `async-connection-wrapper`: Enables the [`AsyncConnectionWrapper`](crate::async_connection_wrapper::AsyncConnectionWrapper)
79//! which allows
80//! to use connection implementations from this crate as sync [`diesel::Connection`]
81//! * `migrations`: Enables the [`AsyncMigrationHarness`] to execute migrations via
82//! [`diesel_migrations`]
83//! * `pool`: Enables general support for connection pools
84//! * `r2d2`: Enables support for pooling via the [`r2d2`] crate
85//! * `bb8`: Enables support for pooling via the [`bb8`] crate
86//! * `mobc`: Enables support for pooling via the [`mobc`] crate
87//! * `deadpool`: Enables support for pooling via the [`deadpool`] crate
88
89#![warn(
90 missing_docs,
91 clippy::cast_possible_wrap,
92 clippy::cast_possible_truncation,
93 clippy::cast_sign_loss
94)]
95
96use diesel::backend::Backend;
97use diesel::connection::{CacheSize, Instrumentation};
98use diesel::query_builder::{AsQuery, QueryFragment, QueryId};
99use diesel::row::Row;
100use diesel::{ConnectionResult, QueryResult};
101use futures_core::Stream;
102use futures_util::FutureExt;
103use std::fmt::Debug;
104use std::future::Future;
105use transaction_manager::AsyncFunc;
106
107#[cfg(feature = "async-connection-wrapper")]
108pub mod async_connection_wrapper;
109mod deref_connection;
110#[cfg(feature = "migrations")]
111mod migrations;
112#[cfg(feature = "mysql")]
113mod mysql;
114#[cfg(feature = "postgres")]
115pub mod pg;
116#[cfg(feature = "pool")]
117pub mod pooled_connection;
118mod run_query_dsl;
119#[cfg(any(feature = "postgres", feature = "mysql"))]
120mod stmt_cache;
121#[cfg(feature = "sync-connection-wrapper")]
122pub mod sync_connection_wrapper;
123mod transaction_manager;
124
125#[cfg(feature = "mysql")]
126#[doc(inline)]
127pub use self::mysql::AsyncMysqlConnection;
128#[cfg(feature = "mysql")]
129#[doc(inline)]
130pub use self::mysql::MysqlCancelToken;
131#[cfg(feature = "postgres")]
132#[doc(inline)]
133pub use self::pg::AsyncPgConnection;
134#[doc(inline)]
135pub use self::run_query_dsl::*;
136
137#[doc(inline)]
138#[cfg(feature = "migrations")]
139pub use self::migrations::AsyncMigrationHarness;
140#[doc(inline)]
141pub use self::transaction_manager::{AnsiTransactionManager, TransactionManager};
142
143/// Perform simple operations on a backend.
144///
145/// You should likely use [`AsyncConnection`] instead.
146pub trait SimpleAsyncConnection {
147 /// Execute multiple SQL statements within the same string.
148 ///
149 /// This function is used to execute migrations,
150 /// which may contain more than one SQL statement.
151 fn batch_execute(&mut self, query: &str) -> impl Future<Output = QueryResult<()>> + Send;
152}
153
154/// Core trait for an async database connection
155pub trait AsyncConnectionCore: SimpleAsyncConnection + Send {
156 /// The future returned by `AsyncConnection::execute`
157 type ExecuteFuture<'conn, 'query>: Future<Output = QueryResult<usize>> + Send;
158 /// The future returned by `AsyncConnection::load`
159 type LoadFuture<'conn, 'query>: Future<Output = QueryResult<Self::Stream<'conn, 'query>>> + Send;
160 /// The inner stream returned by `AsyncConnection::load`
161 type Stream<'conn, 'query>: Stream<Item = QueryResult<Self::Row<'conn, 'query>>> + Send;
162 /// The row type used by the stream returned by `AsyncConnection::load`
163 type Row<'conn, 'query>: Row<'conn, Self::Backend>;
164
165 /// The backend this type connects to
166 type Backend: Backend;
167
168 #[doc(hidden)]
169 fn load<'conn, 'query, T>(&'conn mut self, source: T) -> Self::LoadFuture<'conn, 'query>
170 where
171 T: AsQuery + 'query,
172 T::Query: QueryFragment<Self::Backend> + QueryId + 'query;
173
174 #[doc(hidden)]
175 fn execute_returning_count<'conn, 'query, T>(
176 &'conn mut self,
177 source: T,
178 ) -> Self::ExecuteFuture<'conn, 'query>
179 where
180 T: QueryFragment<Self::Backend> + QueryId + 'query;
181
182 // These functions allow the associated types (`ExecuteFuture`, `LoadFuture`, etc.) to
183 // compile without a `where Self: '_` clause. This is needed the because bound causes
184 // lifetime issues when using `transaction()` with generic `AsyncConnection`s.
185 //
186 // See: https://github.com/rust-lang/rust/issues/87479
187 #[doc(hidden)]
188 fn _silence_lint_on_execute_future(_: Self::ExecuteFuture<'_, '_>) {}
189 #[doc(hidden)]
190 fn _silence_lint_on_load_future(_: Self::LoadFuture<'_, '_>) {}
191}
192
193/// An async connection to a database
194///
195/// This trait represents an async database connection. It can be used to query the database through
196/// the query dsl provided by diesel, custom extensions or raw sql queries. It essentially mirrors
197/// the sync diesel [`Connection`](diesel::connection::Connection) implementation
198pub trait AsyncConnection: AsyncConnectionCore + Sized {
199 #[doc(hidden)]
200 type TransactionManager: TransactionManager<Self>;
201
202 /// Establishes a new connection to the database
203 ///
204 /// The argument to this method and the method's behavior varies by backend.
205 /// See the documentation for that backend's connection class
206 /// for details about what it accepts and how it behaves.
207 fn establish(database_url: &str) -> impl Future<Output = ConnectionResult<Self>> + Send;
208
209 /// Executes the given function inside of a database transaction
210 ///
211 /// This function executes the provided closure `f` inside a database
212 /// transaction. If there is already an open transaction for the current
213 /// connection savepoints will be used instead. The connection is committed if
214 /// the closure returns `Ok(_)`, it will be rolled back if it returns `Err(_)`.
215 /// For both cases the original result value will be returned from this function.
216 ///
217 /// If the transaction fails to commit due to a `SerializationFailure` or a
218 /// `ReadOnlyTransaction` a rollback will be attempted.
219 /// If the rollback fails, the error will be returned in a
220 /// [`Error::RollbackErrorOnCommit`](diesel::result::Error::RollbackErrorOnCommit),
221 /// from which you will be able to extract both the original commit error and
222 /// the rollback error.
223 /// In addition, the connection will be considered broken
224 /// as it contains a uncommitted unabortable open transaction. Any further
225 /// interaction with the transaction system will result in an returned error
226 /// in this case.
227 ///
228 /// If the closure returns an `Err(_)` and the rollback fails the function
229 /// will return that rollback error directly, and the transaction manager will
230 /// be marked as broken as it contains a uncommitted unabortable open transaction.
231 ///
232 /// If a nested transaction fails to release the corresponding savepoint
233 /// the error will be returned directly.
234 ///
235 /// **WARNING:** Canceling the returned future does currently **not**
236 /// close an already open transaction. You may end up with a connection
237 /// containing a dangling transaction.
238 ///
239 /// # Example
240 ///
241 /// ```rust
242 /// # include!("doctest_setup.rs");
243 /// use diesel::result::Error;
244 /// use diesel_async::{RunQueryDsl, AsyncConnection};
245 ///
246 /// # #[tokio::main(flavor = "current_thread")]
247 /// # async fn main() {
248 /// # run_test().await.unwrap();
249 /// # }
250 /// #
251 /// # async fn run_test() -> QueryResult<()> {
252 /// # use schema::users::dsl::*;
253 /// # let conn = &mut establish_connection().await;
254 /// conn.transaction::<_, Error, _>(async |conn| {
255 /// diesel::insert_into(users)
256 /// .values(name.eq("Ruby"))
257 /// .execute(conn)
258 /// .await?;
259 ///
260 /// let all_names = users.select(name).load::<String>(conn).await?;
261 /// assert_eq!(vec!["Sean", "Tess", "Ruby"], all_names);
262 ///
263 /// Ok(())
264 /// }).await?;
265 ///
266 /// conn.transaction::<(), _, _>(async |conn| {
267 /// diesel::insert_into(users)
268 /// .values(name.eq("Pascal"))
269 /// .execute(conn)
270 /// .await?;
271 ///
272 /// let all_names = users.select(name).load::<String>(conn).await?;
273 /// assert_eq!(vec!["Sean", "Tess", "Ruby", "Pascal"], all_names);
274 ///
275 /// // If we want to roll back the transaction, but don't have an
276 /// // actual error to return, we can return `RollbackTransaction`.
277 /// Err(Error::RollbackTransaction)
278 /// }).await;
279 ///
280 /// let all_names = users.select(name).load::<String>(conn).await?;
281 /// assert_eq!(vec!["Sean", "Tess", "Ruby"], all_names);
282 /// # Ok(())
283 /// # }
284 /// ```
285 fn transaction<'a, 'conn, R, E, F>(
286 &'conn mut self,
287 callback: F,
288 ) -> impl Future<Output = Result<R, E>> + Send + 'conn
289 where
290 for<'r> F: AsyncFnOnce(&'r mut Self) -> Result<R, E>
291 + AsyncFunc<&'r mut Self, Result<R, E>, Fut: Send>
292 + Send
293 + 'a,
294 E: From<diesel::result::Error> + Send + 'a,
295 R: Send + 'a,
296 'a: 'conn,
297 {
298 Self::TransactionManager::transaction(self, callback)
299 }
300
301 /// Creates a transaction that will never be committed. This is useful for
302 /// tests. Panics if called while inside of a transaction or
303 /// if called with a connection containing a broken transaction
304 fn begin_test_transaction(&mut self) -> impl Future<Output = QueryResult<()>> + Send {
305 use diesel::connection::TransactionManagerStatus;
306
307 async {
308 match Self::TransactionManager::transaction_manager_status_mut(self) {
309 TransactionManagerStatus::Valid(valid_status) => {
310 assert_eq!(None, valid_status.transaction_depth())
311 }
312 TransactionManagerStatus::InError => panic!("Transaction manager in error"),
313 };
314 Self::TransactionManager::begin_transaction(self).await?;
315 // set the test transaction flag
316 // to prevent that this connection gets dropped in connection pools
317 // Tests commonly set the poolsize to 1 and use `begin_test_transaction`
318 // to prevent modifications to the schema
319 Self::TransactionManager::transaction_manager_status_mut(self)
320 .set_test_transaction_flag();
321 Ok(())
322 }
323 }
324
325 /// Executes the given function inside a transaction, but does not commit
326 /// it. Panics if the given function returns an error.
327 ///
328 /// # Example
329 ///
330 /// ```rust
331 /// # include!("doctest_setup.rs");
332 /// use diesel::result::Error;
333 /// use diesel_async::{RunQueryDsl, AsyncConnection};
334 ///
335 /// # #[tokio::main(flavor = "current_thread")]
336 /// # async fn main() {
337 /// # run_test().await.unwrap();
338 /// # }
339 /// #
340 /// # async fn run_test() -> QueryResult<()> {
341 /// # use schema::users::dsl::*;
342 /// # let conn = &mut establish_connection().await;
343 /// conn.test_transaction::<_, Error, _>(async |conn| {
344 /// diesel::insert_into(users)
345 /// .values(name.eq("Ruby"))
346 /// .execute(conn)
347 /// .await?;
348 ///
349 /// let all_names = users.select(name).load::<String>(conn).await?;
350 /// assert_eq!(vec!["Sean", "Tess", "Ruby"], all_names);
351 ///
352 /// Ok(())
353 /// }).await;
354 ///
355 /// // Even though we returned `Ok`, the transaction wasn't committed.
356 /// let all_names = users.select(name).load::<String>(conn).await?;
357 /// assert_eq!(vec!["Sean", "Tess"], all_names);
358 /// # Ok(())
359 /// # }
360 /// ```
361 fn test_transaction<'conn, 'a, R, E, F>(
362 &'conn mut self,
363 f: F,
364 ) -> impl Future<Output = R> + Send + 'conn
365 where
366 for<'r> F: AsyncFnOnce(&'r mut Self) -> Result<R, E>
367 + AsyncFunc<&'r mut Self, Result<R, E>, Fut: Send>
368 + Send
369 + 'a,
370 E: Debug + Send + 'a,
371 R: Send + 'a,
372 'a: 'conn,
373 {
374 use futures_util::TryFutureExt;
375 let (user_result_tx, user_result_rx) = std::sync::mpsc::channel();
376 self.transaction::<R, _, _>(async |conn| {
377 f(conn)
378 .map_err(|_| diesel::result::Error::RollbackTransaction)
379 .and_then(move |r| {
380 let _ = user_result_tx.send(r);
381 std::future::ready(Err(diesel::result::Error::RollbackTransaction))
382 })
383 .await
384 })
385 .then(move |_r| {
386 let r = user_result_rx
387 .try_recv()
388 .expect("Transaction did not succeed");
389 std::future::ready(r)
390 })
391 }
392
393 #[doc(hidden)]
394 fn transaction_state(
395 &mut self,
396 ) -> &mut <Self::TransactionManager as TransactionManager<Self>>::TransactionStateData;
397
398 #[doc(hidden)]
399 fn instrumentation(&mut self) -> &mut dyn Instrumentation;
400
401 /// Set a specific [`Instrumentation`] implementation for this connection
402 fn set_instrumentation(&mut self, instrumentation: impl Instrumentation);
403
404 /// Set the prepared statement cache size to [`CacheSize`] for this connection
405 fn set_prepared_statement_cache_size(&mut self, size: CacheSize);
406}