diesel_async/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2//! Diesel-async provides async variants of diesel related query functionality
3//!
4//! diesel-async is an extension to diesel itself. It is designed to be used together
5//! with the main diesel crate. It only provides async variants of core diesel traits,
6//! that perform actual io-work.
7//! This includes async counterparts the following traits:
8//! * [`diesel::prelude::RunQueryDsl`](https://docs.diesel.rs/2.3.x/diesel/prelude/trait.RunQueryDsl.html)
9//!   -> [`diesel_async::RunQueryDsl`](crate::RunQueryDsl)
10//! * [`diesel::connection::Connection`](https://docs.diesel.rs/2.3.x/diesel/connection/trait.Connection.html)
11//!   -> [`diesel_async::AsyncConnection`](crate::AsyncConnection)
12//! * [`diesel::query_dsl::UpdateAndFetchResults`](https://docs.diesel.rs/2.3.x/diesel/query_dsl/trait.UpdateAndFetchResults.html)
13//!   -> [`diesel_async::UpdateAndFetchResults`](crate::UpdateAndFetchResults)
14//!
15//! These traits closely mirror their diesel counter parts while providing async functionality.
16//!
17//! In addition to these core traits 3 fully async connection implementations are provided
18//! by diesel-async:
19//!
20//! * [`AsyncMysqlConnection`] (enabled by the `mysql` feature)
21//! * [`AsyncPgConnection`] (enabled by the `postgres` feature)
22//! * [`SyncConnectionWrapper`](sync_connection_wrapper::SyncConnectionWrapper) (enabled by the `sync-connection-wrapper`/`sqlite` feature)
23//!
24//! Ordinary usage of `diesel-async` assumes that you just replace the corresponding sync trait
25//! method calls and connections with their async counterparts.
26//!
27//! ```rust
28//! # include!("./doctest_setup.rs");
29//! #
30//! use diesel::prelude::*;
31//! use diesel_async::{RunQueryDsl, AsyncConnection};
32//!
33//! diesel::table! {
34//!    users(id) {
35//!        id -> Integer,
36//!        name -> Text,
37//!    }
38//! }
39//! #
40//! # #[tokio::main(flavor = "current_thread")]
41//! # async fn main() {
42//! #     run_test().await;
43//! # }
44//! #
45//! # async fn run_test() -> QueryResult<()> {
46//!
47//! use crate::users::dsl::*;
48//!
49//! # let mut connection = establish_connection().await;
50//! # /*
51//! let mut connection = AsyncPgConnection::establish(std::env::var("DATABASE_URL")?).await?;
52//! # */
53//! let data = users
54//!     // use ordinary diesel query dsl here
55//!     .filter(id.gt(0))
56//!     // execute the query via the provided
57//!     // async variant of `diesel_async::RunQueryDsl`
58//!     .load::<(i32, String)>(&mut connection)
59//!     .await?;
60//! let expected_data = vec![
61//!     (1, String::from("Sean")),
62//!     (2, String::from("Tess")),
63//! ];
64//! assert_eq!(expected_data, data);
65//! #     Ok(())
66//! # }
67//! ```
68//!
69//! ## Crate features:
70//!
71//! * `postgres`: Enables the [`AsyncPgConnection`] implementation
72//! * `mysql`: Enables the [`AsyncMysqlConnection`] implementation
73//! * `sqlite`: Enables the [`SyncConnectionWrapper`](crate::sync_connection_wrapper::SyncConnectionWrapper)
74//!   and everything required to work with SQLite
75//! * `sync-connection-wrapper`: Enables the
76//!   [`SyncConnectionWrapper`](crate::sync_connection_wrapper::SyncConnectionWrapper) which allows to
77//!   wrap sync connections from [`diesel`] into async connection wrapper
78//! * `async-connection-wrapper`: Enables the [`AsyncConnectionWrapper`](crate::async_connection_wrapper::AsyncConnectionWrapper)
79//!   which allows
80//!   to use connection implementations from this crate as sync [`diesel::Connection`]
81//! * `migrations`: Enables the [`AsyncMigrationHarness`] to execute migrations via
82//!   [`diesel_migrations`]
83//! * `pool`: Enables general support for connection pools
84//! * `r2d2`: Enables support for pooling via the [`r2d2`] crate
85//! * `bb8`: Enables support for pooling via the [`bb8`] crate
86//! * `mobc`: Enables support for pooling via the [`mobc`] crate
87//! * `deadpool`: Enables support for pooling via the [`deadpool`] crate
88
89#![warn(
90    missing_docs,
91    clippy::cast_possible_wrap,
92    clippy::cast_possible_truncation,
93    clippy::cast_sign_loss
94)]
95
96use diesel::backend::Backend;
97use diesel::connection::{CacheSize, Instrumentation};
98use diesel::query_builder::{AsQuery, QueryFragment, QueryId};
99use diesel::row::Row;
100use diesel::{ConnectionResult, QueryResult};
101use futures_core::future::BoxFuture;
102use futures_core::Stream;
103use futures_util::FutureExt;
104use std::fmt::Debug;
105use std::future::Future;
106
107pub use scoped_futures;
108use scoped_futures::{ScopedBoxFuture, ScopedFutureExt};
109
110#[cfg(feature = "async-connection-wrapper")]
111pub mod async_connection_wrapper;
112mod deref_connection;
113#[cfg(feature = "migrations")]
114mod migrations;
115#[cfg(feature = "mysql")]
116mod mysql;
117#[cfg(feature = "postgres")]
118pub mod pg;
119#[cfg(feature = "pool")]
120pub mod pooled_connection;
121mod run_query_dsl;
122#[cfg(any(feature = "postgres", feature = "mysql"))]
123mod stmt_cache;
124#[cfg(feature = "sync-connection-wrapper")]
125pub mod sync_connection_wrapper;
126mod transaction_manager;
127
128#[cfg(feature = "mysql")]
129#[doc(inline)]
130pub use self::mysql::AsyncMysqlConnection;
131#[cfg(feature = "mysql")]
132#[doc(inline)]
133pub use self::mysql::MysqlCancelToken;
134#[cfg(feature = "postgres")]
135#[doc(inline)]
136pub use self::pg::AsyncPgConnection;
137#[doc(inline)]
138pub use self::run_query_dsl::*;
139
140#[doc(inline)]
141#[cfg(feature = "migrations")]
142pub use self::migrations::AsyncMigrationHarness;
143#[doc(inline)]
144pub use self::transaction_manager::{AnsiTransactionManager, TransactionManager};
145
146/// Perform simple operations on a backend.
147///
148/// You should likely use [`AsyncConnection`] instead.
149pub trait SimpleAsyncConnection {
150    /// Execute multiple SQL statements within the same string.
151    ///
152    /// This function is used to execute migrations,
153    /// which may contain more than one SQL statement.
154    fn batch_execute(&mut self, query: &str) -> impl Future<Output = QueryResult<()>> + Send;
155}
156
157/// Core trait for an async database connection
158pub trait AsyncConnectionCore: SimpleAsyncConnection + Send {
159    /// The future returned by `AsyncConnection::execute`
160    type ExecuteFuture<'conn, 'query>: Future<Output = QueryResult<usize>> + Send;
161    /// The future returned by `AsyncConnection::load`
162    type LoadFuture<'conn, 'query>: Future<Output = QueryResult<Self::Stream<'conn, 'query>>> + Send;
163    /// The inner stream returned by `AsyncConnection::load`
164    type Stream<'conn, 'query>: Stream<Item = QueryResult<Self::Row<'conn, 'query>>> + Send;
165    /// The row type used by the stream returned by `AsyncConnection::load`
166    type Row<'conn, 'query>: Row<'conn, Self::Backend>;
167
168    /// The backend this type connects to
169    type Backend: Backend;
170
171    #[doc(hidden)]
172    fn load<'conn, 'query, T>(&'conn mut self, source: T) -> Self::LoadFuture<'conn, 'query>
173    where
174        T: AsQuery + 'query,
175        T::Query: QueryFragment<Self::Backend> + QueryId + 'query;
176
177    #[doc(hidden)]
178    fn execute_returning_count<'conn, 'query, T>(
179        &'conn mut self,
180        source: T,
181    ) -> Self::ExecuteFuture<'conn, 'query>
182    where
183        T: QueryFragment<Self::Backend> + QueryId + 'query;
184
185    // These functions allow the associated types (`ExecuteFuture`, `LoadFuture`, etc.) to
186    // compile without a `where Self: '_` clause. This is needed the because bound causes
187    // lifetime issues when using `transaction()` with generic `AsyncConnection`s.
188    //
189    // See: https://github.com/rust-lang/rust/issues/87479
190    #[doc(hidden)]
191    fn _silence_lint_on_execute_future(_: Self::ExecuteFuture<'_, '_>) {}
192    #[doc(hidden)]
193    fn _silence_lint_on_load_future(_: Self::LoadFuture<'_, '_>) {}
194}
195
196/// An async connection to a database
197///
198/// This trait represents an async database connection. It can be used to query the database through
199/// the query dsl provided by diesel, custom extensions or raw sql queries. It essentially mirrors
200/// the sync diesel [`Connection`](diesel::connection::Connection) implementation
201pub trait AsyncConnection: AsyncConnectionCore + Sized {
202    #[doc(hidden)]
203    type TransactionManager: TransactionManager<Self>;
204
205    /// Establishes a new connection to the database
206    ///
207    /// The argument to this method and the method's behavior varies by backend.
208    /// See the documentation for that backend's connection class
209    /// for details about what it accepts and how it behaves.
210    fn establish(database_url: &str) -> impl Future<Output = ConnectionResult<Self>> + Send;
211
212    /// Executes the given function inside of a database transaction
213    ///
214    /// This function executes the provided closure `f` inside a database
215    /// transaction. If there is already an open transaction for the current
216    /// connection savepoints will be used instead. The connection is committed if
217    /// the closure returns `Ok(_)`, it will be rolled back if it returns `Err(_)`.
218    /// For both cases the original result value will be returned from this function.
219    ///
220    /// If the transaction fails to commit due to a `SerializationFailure` or a
221    /// `ReadOnlyTransaction` a rollback will be attempted.
222    /// If the rollback fails, the error will be returned in a
223    /// [`Error::RollbackErrorOnCommit`](diesel::result::Error::RollbackErrorOnCommit),
224    /// from which you will be able to extract both the original commit error and
225    /// the rollback error.
226    /// In addition, the connection will be considered broken
227    /// as it contains a uncommitted unabortable open transaction. Any further
228    /// interaction with the transaction system will result in an returned error
229    /// in this case.
230    ///
231    /// If the closure returns an `Err(_)` and the rollback fails the function
232    /// will return that rollback error directly, and the transaction manager will
233    /// be marked as broken as it contains a uncommitted unabortable open transaction.
234    ///
235    /// If a nested transaction fails to release the corresponding savepoint
236    /// the error will be returned directly.
237    ///
238    /// **WARNING:** Canceling the returned future does currently **not**
239    /// close an already open transaction. You may end up with a connection
240    /// containing a dangling transaction.
241    ///
242    /// # Example
243    ///
244    /// ```rust
245    /// # include!("doctest_setup.rs");
246    /// use diesel::result::Error;
247    /// use scoped_futures::ScopedFutureExt;
248    /// use diesel_async::{RunQueryDsl, AsyncConnection};
249    ///
250    /// # #[tokio::main(flavor = "current_thread")]
251    /// # async fn main() {
252    /// #     run_test().await.unwrap();
253    /// # }
254    /// #
255    /// # async fn run_test() -> QueryResult<()> {
256    /// #     use schema::users::dsl::*;
257    /// #     let conn = &mut establish_connection().await;
258    /// conn.transaction::<_, Error, _>(|conn| async move {
259    ///     diesel::insert_into(users)
260    ///         .values(name.eq("Ruby"))
261    ///         .execute(conn)
262    ///         .await?;
263    ///
264    ///     let all_names = users.select(name).load::<String>(conn).await?;
265    ///     assert_eq!(vec!["Sean", "Tess", "Ruby"], all_names);
266    ///
267    ///     Ok(())
268    /// }.scope_boxed()).await?;
269    ///
270    /// conn.transaction::<(), _, _>(|conn| async move {
271    ///     diesel::insert_into(users)
272    ///         .values(name.eq("Pascal"))
273    ///         .execute(conn)
274    ///         .await?;
275    ///
276    ///     let all_names = users.select(name).load::<String>(conn).await?;
277    ///     assert_eq!(vec!["Sean", "Tess", "Ruby", "Pascal"], all_names);
278    ///
279    ///     // If we want to roll back the transaction, but don't have an
280    ///     // actual error to return, we can return `RollbackTransaction`.
281    ///     Err(Error::RollbackTransaction)
282    /// }.scope_boxed()).await;
283    ///
284    /// let all_names = users.select(name).load::<String>(conn).await?;
285    /// assert_eq!(vec!["Sean", "Tess", "Ruby"], all_names);
286    /// #     Ok(())
287    /// # }
288    /// ```
289    fn transaction<'a, 'conn, R, E, F>(
290        &'conn mut self,
291        callback: F,
292    ) -> BoxFuture<'conn, Result<R, E>>
293    // we cannot use `impl Trait` here due to bugs in rustc
294    // https://github.com/rust-lang/rust/issues/100013
295    //impl Future<Output = Result<R, E>> + Send + 'async_trait
296    where
297        F: for<'r> FnOnce(&'r mut Self) -> ScopedBoxFuture<'a, 'r, Result<R, E>> + Send + 'a,
298        E: From<diesel::result::Error> + Send + 'a,
299        R: Send + 'a,
300        'a: 'conn,
301    {
302        Self::TransactionManager::transaction(self, callback).boxed()
303    }
304
305    /// Creates a transaction that will never be committed. This is useful for
306    /// tests. Panics if called while inside of a transaction or
307    /// if called with a connection containing a broken transaction
308    fn begin_test_transaction(&mut self) -> impl Future<Output = QueryResult<()>> + Send {
309        use diesel::connection::TransactionManagerStatus;
310
311        async {
312            match Self::TransactionManager::transaction_manager_status_mut(self) {
313                TransactionManagerStatus::Valid(valid_status) => {
314                    assert_eq!(None, valid_status.transaction_depth())
315                }
316                TransactionManagerStatus::InError => panic!("Transaction manager in error"),
317            };
318            Self::TransactionManager::begin_transaction(self).await?;
319            // set the test transaction flag
320            // to prevent that this connection gets dropped in connection pools
321            // Tests commonly set the poolsize to 1 and use `begin_test_transaction`
322            // to prevent modifications to the schema
323            Self::TransactionManager::transaction_manager_status_mut(self)
324                .set_test_transaction_flag();
325            Ok(())
326        }
327    }
328
329    /// Executes the given function inside a transaction, but does not commit
330    /// it. Panics if the given function returns an error.
331    ///
332    /// # Example
333    ///
334    /// ```rust
335    /// # include!("doctest_setup.rs");
336    /// use diesel::result::Error;
337    /// use scoped_futures::ScopedFutureExt;
338    /// use diesel_async::{RunQueryDsl, AsyncConnection};
339    ///
340    /// # #[tokio::main(flavor = "current_thread")]
341    /// # async fn main() {
342    /// #     run_test().await.unwrap();
343    /// # }
344    /// #
345    /// # async fn run_test() -> QueryResult<()> {
346    /// #     use schema::users::dsl::*;
347    /// #     let conn = &mut establish_connection().await;
348    /// conn.test_transaction::<_, Error, _>(|conn| async move {
349    ///     diesel::insert_into(users)
350    ///         .values(name.eq("Ruby"))
351    ///         .execute(conn)
352    ///         .await?;
353    ///
354    ///     let all_names = users.select(name).load::<String>(conn).await?;
355    ///     assert_eq!(vec!["Sean", "Tess", "Ruby"], all_names);
356    ///
357    ///     Ok(())
358    /// }.scope_boxed()).await;
359    ///
360    /// // Even though we returned `Ok`, the transaction wasn't committed.
361    /// let all_names = users.select(name).load::<String>(conn).await?;
362    /// assert_eq!(vec!["Sean", "Tess"], all_names);
363    /// #     Ok(())
364    /// # }
365    /// ```
366    fn test_transaction<'conn, 'a, R, E, F>(
367        &'conn mut self,
368        f: F,
369    ) -> impl Future<Output = R> + Send + 'conn
370    where
371        F: for<'r> FnOnce(&'r mut Self) -> ScopedBoxFuture<'a, 'r, Result<R, E>> + Send + 'a,
372        E: Debug + Send + 'a,
373        R: Send + 'a,
374        'a: 'conn,
375    {
376        use futures_util::TryFutureExt;
377        let (user_result_tx, user_result_rx) = std::sync::mpsc::channel();
378        self.transaction::<R, _, _>(move |conn| {
379            f(conn)
380                .map_err(|_| diesel::result::Error::RollbackTransaction)
381                .and_then(move |r| {
382                    let _ = user_result_tx.send(r);
383                    std::future::ready(Err(diesel::result::Error::RollbackTransaction))
384                })
385                .scope_boxed()
386        })
387        .then(move |_r| {
388            let r = user_result_rx
389                .try_recv()
390                .expect("Transaction did not succeed");
391            std::future::ready(r)
392        })
393    }
394
395    #[doc(hidden)]
396    fn transaction_state(
397        &mut self,
398    ) -> &mut <Self::TransactionManager as TransactionManager<Self>>::TransactionStateData;
399
400    #[doc(hidden)]
401    fn instrumentation(&mut self) -> &mut dyn Instrumentation;
402
403    /// Set a specific [`Instrumentation`] implementation for this connection
404    fn set_instrumentation(&mut self, instrumentation: impl Instrumentation);
405
406    /// Set the prepared statement cache size to [`CacheSize`] for this connection
407    fn set_prepared_statement_cache_size(&mut self, size: CacheSize);
408}