diesel_async/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
2//! Diesel-async provides async variants of diesel related query functionality
3//!
4//! diesel-async is an extension to diesel itself. It is designed to be used together
5//! with the main diesel crate. It only provides async variants of core diesel traits,
6//! that perform actual io-work.
7//! This includes async counterparts the following traits:
8//! * [`diesel::prelude::RunQueryDsl`](https://docs.diesel.rs/2.0.x/diesel/prelude/trait.RunQueryDsl.html)
9//!    -> [`diesel_async::RunQueryDsl`](crate::RunQueryDsl)
10//! * [`diesel::connection::Connection`](https://docs.diesel.rs/2.0.x/diesel/connection/trait.Connection.html)
11//!    -> [`diesel_async::AsyncConnection`](crate::AsyncConnection)
12//! * [`diesel::query_dsl::UpdateAndFetchResults`](https://docs.diesel.rs/2.0.x/diesel/query_dsl/trait.UpdateAndFetchResults.html)
13//!    -> [`diesel_async::UpdateAndFetchResults`](crate::UpdateAndFetchResults)
14//!
15//! These traits closely mirror their diesel counter parts while providing async functionality.
16//!
17//! In addition to these core traits 3 fully async connection implementations are provided
18//! by diesel-async:
19//!
20//! * [`AsyncMysqlConnection`] (enabled by the `mysql` feature)
21//! * [`AsyncPgConnection`] (enabled by the `postgres` feature)
22//! * [`SyncConnectionWrapper`](sync_connection_wrapper::SyncConnectionWrapper) (enabled by the `sync-connection-wrapper`/`sqlite` feature)
23//!
24//! Ordinary usage of `diesel-async` assumes that you just replace the corresponding sync trait
25//! method calls and connections with their async counterparts.
26//!
27//! ```rust
28//! # include!("./doctest_setup.rs");
29//! #
30//! use diesel::prelude::*;
31//! use diesel_async::{RunQueryDsl, AsyncConnection};
32//!
33//! diesel::table! {
34//!    users(id) {
35//!        id -> Integer,
36//!        name -> Text,
37//!    }
38//! }
39//! #
40//! # #[tokio::main(flavor = "current_thread")]
41//! # async fn main() {
42//! #     run_test().await;
43//! # }
44//! #
45//! # async fn run_test() -> QueryResult<()> {
46//!
47//! use crate::users::dsl::*;
48//!
49//! # let mut connection = establish_connection().await;
50//! # /*
51//! let mut connection = AsyncPgConnection::establish(std::env::var("DATABASE_URL")?).await?;
52//! # */
53//! let data = users
54//!     // use ordinary diesel query dsl here
55//!     .filter(id.gt(0))
56//!     // execute the query via the provided
57//!     // async variant of `diesel_async::RunQueryDsl`
58//!     .load::<(i32, String)>(&mut connection)
59//!     .await?;
60//! let expected_data = vec![
61//!     (1, String::from("Sean")),
62//!     (2, String::from("Tess")),
63//! ];
64//! assert_eq!(expected_data, data);
65//! #     Ok(())
66//! # }
67//! ```
68
69#![warn(
70    missing_docs,
71    clippy::cast_possible_wrap,
72    clippy::cast_possible_truncation,
73    clippy::cast_sign_loss
74)]
75use diesel::backend::Backend;
76use diesel::connection::Instrumentation;
77use diesel::query_builder::{AsQuery, QueryFragment, QueryId};
78use diesel::row::Row;
79use diesel::{ConnectionResult, QueryResult};
80use futures_core::future::BoxFuture;
81use futures_core::Stream;
82use futures_util::FutureExt;
83use std::fmt::Debug;
84use std::future::Future;
85
86pub use scoped_futures;
87use scoped_futures::{ScopedBoxFuture, ScopedFutureExt};
88
89#[cfg(feature = "async-connection-wrapper")]
90pub mod async_connection_wrapper;
91#[cfg(feature = "mysql")]
92mod mysql;
93#[cfg(feature = "postgres")]
94pub mod pg;
95#[cfg(feature = "pool")]
96pub mod pooled_connection;
97mod run_query_dsl;
98mod statement_cache;
99#[cfg(any(feature = "postgres", feature = "mysql"))]
100mod stmt_cache;
101#[cfg(feature = "sync-connection-wrapper")]
102pub mod sync_connection_wrapper;
103mod transaction_manager;
104
105#[cfg(feature = "mysql")]
106#[doc(inline)]
107pub use self::mysql::AsyncMysqlConnection;
108#[cfg(feature = "postgres")]
109#[doc(inline)]
110pub use self::pg::AsyncPgConnection;
111#[doc(inline)]
112pub use self::run_query_dsl::*;
113
114#[doc(inline)]
115pub use self::statement_cache::CacheSize;
116#[doc(inline)]
117pub use self::transaction_manager::{AnsiTransactionManager, TransactionManager};
118
119/// Perform simple operations on a backend.
120///
121/// You should likely use [`AsyncConnection`] instead.
122pub trait SimpleAsyncConnection {
123    /// Execute multiple SQL statements within the same string.
124    ///
125    /// This function is used to execute migrations,
126    /// which may contain more than one SQL statement.
127    fn batch_execute(&mut self, query: &str) -> impl Future<Output = QueryResult<()>> + Send;
128}
129
130/// An async connection to a database
131///
132/// This trait represents a n async database connection. It can be used to query the database through
133/// the query dsl provided by diesel, custom extensions or raw sql queries. It essentially mirrors
134/// the sync diesel [`Connection`](diesel::connection::Connection) implementation
135pub trait AsyncConnection: SimpleAsyncConnection + Sized + Send {
136    /// The future returned by `AsyncConnection::execute`
137    type ExecuteFuture<'conn, 'query>: Future<Output = QueryResult<usize>> + Send;
138    /// The future returned by `AsyncConnection::load`
139    type LoadFuture<'conn, 'query>: Future<Output = QueryResult<Self::Stream<'conn, 'query>>> + Send;
140    /// The inner stream returned by `AsyncConnection::load`
141    type Stream<'conn, 'query>: Stream<Item = QueryResult<Self::Row<'conn, 'query>>> + Send;
142    /// The row type used by the stream returned by `AsyncConnection::load`
143    type Row<'conn, 'query>: Row<'conn, Self::Backend>;
144
145    /// The backend this type connects to
146    type Backend: Backend;
147
148    #[doc(hidden)]
149    type TransactionManager: TransactionManager<Self>;
150
151    /// Establishes a new connection to the database
152    ///
153    /// The argument to this method and the method's behavior varies by backend.
154    /// See the documentation for that backend's connection class
155    /// for details about what it accepts and how it behaves.
156    fn establish(database_url: &str) -> impl Future<Output = ConnectionResult<Self>> + Send;
157
158    /// Executes the given function inside of a database transaction
159    ///
160    /// This function executes the provided closure `f` inside a database
161    /// transaction. If there is already an open transaction for the current
162    /// connection savepoints will be used instead. The connection is committed if
163    /// the closure returns `Ok(_)`, it will be rolled back if it returns `Err(_)`.
164    /// For both cases the original result value will be returned from this function.
165    ///
166    /// If the transaction fails to commit due to a `SerializationFailure` or a
167    /// `ReadOnlyTransaction` a rollback will be attempted.
168    /// If the rollback fails, the error will be returned in a
169    /// [`Error::RollbackErrorOnCommit`](diesel::result::Error::RollbackErrorOnCommit),
170    /// from which you will be able to extract both the original commit error and
171    /// the rollback error.
172    /// In addition, the connection will be considered broken
173    /// as it contains a uncommitted unabortable open transaction. Any further
174    /// interaction with the transaction system will result in an returned error
175    /// in this case.
176    ///
177    /// If the closure returns an `Err(_)` and the rollback fails the function
178    /// will return that rollback error directly, and the transaction manager will
179    /// be marked as broken as it contains a uncommitted unabortable open transaction.
180    ///
181    /// If a nested transaction fails to release the corresponding savepoint
182    /// the error will be returned directly.
183    ///
184    /// **WARNING:** Canceling the returned future does currently **not**
185    /// close an already open transaction. You may end up with a connection
186    /// containing a dangling transaction.
187    ///
188    /// # Example
189    ///
190    /// ```rust
191    /// # include!("doctest_setup.rs");
192    /// use diesel::result::Error;
193    /// use scoped_futures::ScopedFutureExt;
194    /// use diesel_async::{RunQueryDsl, AsyncConnection};
195    ///
196    /// # #[tokio::main(flavor = "current_thread")]
197    /// # async fn main() {
198    /// #     run_test().await.unwrap();
199    /// # }
200    /// #
201    /// # async fn run_test() -> QueryResult<()> {
202    /// #     use schema::users::dsl::*;
203    /// #     let conn = &mut establish_connection().await;
204    /// conn.transaction::<_, Error, _>(|conn| async move {
205    ///     diesel::insert_into(users)
206    ///         .values(name.eq("Ruby"))
207    ///         .execute(conn)
208    ///         .await?;
209    ///
210    ///     let all_names = users.select(name).load::<String>(conn).await?;
211    ///     assert_eq!(vec!["Sean", "Tess", "Ruby"], all_names);
212    ///
213    ///     Ok(())
214    /// }.scope_boxed()).await?;
215    ///
216    /// conn.transaction::<(), _, _>(|conn| async move {
217    ///     diesel::insert_into(users)
218    ///         .values(name.eq("Pascal"))
219    ///         .execute(conn)
220    ///         .await?;
221    ///
222    ///     let all_names = users.select(name).load::<String>(conn).await?;
223    ///     assert_eq!(vec!["Sean", "Tess", "Ruby", "Pascal"], all_names);
224    ///
225    ///     // If we want to roll back the transaction, but don't have an
226    ///     // actual error to return, we can return `RollbackTransaction`.
227    ///     Err(Error::RollbackTransaction)
228    /// }.scope_boxed()).await;
229    ///
230    /// let all_names = users.select(name).load::<String>(conn).await?;
231    /// assert_eq!(vec!["Sean", "Tess", "Ruby"], all_names);
232    /// #     Ok(())
233    /// # }
234    /// ```
235    fn transaction<'a, 'conn, R, E, F>(
236        &'conn mut self,
237        callback: F,
238    ) -> BoxFuture<'conn, Result<R, E>>
239    // we cannot use `impl Trait` here due to bugs in rustc
240    // https://github.com/rust-lang/rust/issues/100013
241    //impl Future<Output = Result<R, E>> + Send + 'async_trait
242    where
243        F: for<'r> FnOnce(&'r mut Self) -> ScopedBoxFuture<'a, 'r, Result<R, E>> + Send + 'a,
244        E: From<diesel::result::Error> + Send + 'a,
245        R: Send + 'a,
246        'a: 'conn,
247    {
248        Self::TransactionManager::transaction(self, callback).boxed()
249    }
250
251    /// Creates a transaction that will never be committed. This is useful for
252    /// tests. Panics if called while inside of a transaction or
253    /// if called with a connection containing a broken transaction
254    fn begin_test_transaction(&mut self) -> impl Future<Output = QueryResult<()>> + Send {
255        use diesel::connection::TransactionManagerStatus;
256
257        async {
258            match Self::TransactionManager::transaction_manager_status_mut(self) {
259                TransactionManagerStatus::Valid(valid_status) => {
260                    assert_eq!(None, valid_status.transaction_depth())
261                }
262                TransactionManagerStatus::InError => panic!("Transaction manager in error"),
263            };
264            Self::TransactionManager::begin_transaction(self).await?;
265            // set the test transaction flag
266            // to prevent that this connection gets dropped in connection pools
267            // Tests commonly set the poolsize to 1 and use `begin_test_transaction`
268            // to prevent modifications to the schema
269            Self::TransactionManager::transaction_manager_status_mut(self)
270                .set_test_transaction_flag();
271            Ok(())
272        }
273    }
274
275    /// Executes the given function inside a transaction, but does not commit
276    /// it. Panics if the given function returns an error.
277    ///
278    /// # Example
279    ///
280    /// ```rust
281    /// # include!("doctest_setup.rs");
282    /// use diesel::result::Error;
283    /// use scoped_futures::ScopedFutureExt;
284    /// use diesel_async::{RunQueryDsl, AsyncConnection};
285    ///
286    /// # #[tokio::main(flavor = "current_thread")]
287    /// # async fn main() {
288    /// #     run_test().await.unwrap();
289    /// # }
290    /// #
291    /// # async fn run_test() -> QueryResult<()> {
292    /// #     use schema::users::dsl::*;
293    /// #     let conn = &mut establish_connection().await;
294    /// conn.test_transaction::<_, Error, _>(|conn| async move {
295    ///     diesel::insert_into(users)
296    ///         .values(name.eq("Ruby"))
297    ///         .execute(conn)
298    ///         .await?;
299    ///
300    ///     let all_names = users.select(name).load::<String>(conn).await?;
301    ///     assert_eq!(vec!["Sean", "Tess", "Ruby"], all_names);
302    ///
303    ///     Ok(())
304    /// }.scope_boxed()).await;
305    ///
306    /// // Even though we returned `Ok`, the transaction wasn't committed.
307    /// let all_names = users.select(name).load::<String>(conn).await?;
308    /// assert_eq!(vec!["Sean", "Tess"], all_names);
309    /// #     Ok(())
310    /// # }
311    /// ```
312    fn test_transaction<'conn, 'a, R, E, F>(
313        &'conn mut self,
314        f: F,
315    ) -> impl Future<Output = R> + Send + 'conn
316    where
317        F: for<'r> FnOnce(&'r mut Self) -> ScopedBoxFuture<'a, 'r, Result<R, E>> + Send + 'a,
318        E: Debug + Send + 'a,
319        R: Send + 'a,
320        'a: 'conn,
321    {
322        use futures_util::TryFutureExt;
323        let (user_result_tx, user_result_rx) = std::sync::mpsc::channel();
324        self.transaction::<R, _, _>(move |conn| {
325            f(conn)
326                .map_err(|_| diesel::result::Error::RollbackTransaction)
327                .and_then(move |r| {
328                    let _ = user_result_tx.send(r);
329                    std::future::ready(Err(diesel::result::Error::RollbackTransaction))
330                })
331                .scope_boxed()
332        })
333        .then(move |_r| {
334            let r = user_result_rx
335                .try_recv()
336                .expect("Transaction did not succeed");
337            std::future::ready(r)
338        })
339    }
340
341    #[doc(hidden)]
342    fn load<'conn, 'query, T>(&'conn mut self, source: T) -> Self::LoadFuture<'conn, 'query>
343    where
344        T: AsQuery + 'query,
345        T::Query: QueryFragment<Self::Backend> + QueryId + 'query;
346
347    #[doc(hidden)]
348    fn execute_returning_count<'conn, 'query, T>(
349        &'conn mut self,
350        source: T,
351    ) -> Self::ExecuteFuture<'conn, 'query>
352    where
353        T: QueryFragment<Self::Backend> + QueryId + 'query;
354
355    #[doc(hidden)]
356    fn transaction_state(
357        &mut self,
358    ) -> &mut <Self::TransactionManager as TransactionManager<Self>>::TransactionStateData;
359
360    // These functions allow the associated types (`ExecuteFuture`, `LoadFuture`, etc.) to
361    // compile without a `where Self: '_` clause. This is needed the because bound causes
362    // lifetime issues when using `transaction()` with generic `AsyncConnection`s.
363    //
364    // See: https://github.com/rust-lang/rust/issues/87479
365    #[doc(hidden)]
366    fn _silence_lint_on_execute_future(_: Self::ExecuteFuture<'_, '_>) {}
367    #[doc(hidden)]
368    fn _silence_lint_on_load_future(_: Self::LoadFuture<'_, '_>) {}
369
370    #[doc(hidden)]
371    fn instrumentation(&mut self) -> &mut dyn Instrumentation;
372
373    /// Set a specific [`Instrumentation`] implementation for this connection
374    fn set_instrumentation(&mut self, instrumentation: impl Instrumentation);
375
376    /// Set the prepared statement cache size to [`CacheSize`] for this connection
377    fn set_prepared_statement_cache_size(&mut self, size: CacheSize);
378}