Skip to main content

diesel_async/sync_connection_wrapper/
sqlite.rs

1use diesel::connection::AnsiTransactionManager;
2use diesel::SqliteConnection;
3
4use crate::sync_connection_wrapper::SyncTransactionManagerWrapper;
5use crate::transaction_manager::AsyncFunc;
6use crate::TransactionManager;
7
8use super::SyncConnectionWrapper;
9
10impl SyncConnectionWrapper<SqliteConnection> {
11    /// Run a transaction with `BEGIN IMMEDIATE`
12    ///
13    /// This method will return an error if a transaction is already open.
14    ///
15    /// **WARNING:** Canceling the returned future does currently **not**
16    /// close an already open transaction. You may end up with a connection
17    /// containing a dangling transaction.
18    ///
19    /// # Example
20    ///
21    /// ```rust
22    /// # include!("../doctest_setup.rs");
23    /// use diesel::result::Error;
24    /// use diesel_async::{RunQueryDsl, AsyncConnection};
25    /// #
26    /// # #[tokio::main(flavor = "current_thread")]
27    /// # async fn main() {
28    /// #     run_test().await.unwrap();
29    /// # }
30    /// #
31    /// # async fn run_test() -> QueryResult<()> {
32    /// #     use schema::users::dsl::*;
33    /// #     let conn = &mut connection_no_transaction().await;
34    /// conn.immediate_transaction(async |conn| {
35    ///     diesel::insert_into(users)
36    ///         .values(name.eq("Ruby"))
37    ///         .execute(conn)
38    ///         .await?;
39    ///
40    ///     let all_names = users.select(name).load::<String>(conn).await?;
41    ///     assert_eq!(vec!["Sean", "Tess", "Ruby"], all_names);
42    ///
43    ///     Ok(())
44    /// }).await
45    /// # }
46    /// ```
47    pub async fn immediate_transaction<'a, R, E, F>(&mut self, f: F) -> Result<R, E>
48    where
49        for<'r> F: AsyncFnOnce(&'r mut Self) -> Result<R, E>
50            + AsyncFunc<&'r mut Self, Result<R, E>, Fut: Send>
51            + Send
52            + 'a,
53        E: From<diesel::result::Error> + Send + 'a,
54        R: Send + 'a,
55    {
56        self.transaction_sql(f, "BEGIN IMMEDIATE").await
57    }
58
59    /// Run a transaction with `BEGIN EXCLUSIVE`
60    ///
61    /// This method will return an error if a transaction is already open.
62    ///
63    /// **WARNING:** Canceling the returned future does currently **not**
64    /// close an already open transaction. You may end up with a connection
65    /// containing a dangling transaction.
66    ///
67    /// # Example
68    ///
69    /// ```rust
70    /// # include!("../doctest_setup.rs");
71    /// use diesel::result::Error;
72    /// use diesel_async::{RunQueryDsl, AsyncConnection};
73    /// #
74    /// # #[tokio::main(flavor = "current_thread")]
75    /// # async fn main() {
76    /// #     run_test().await.unwrap();
77    /// # }
78    /// #
79    /// # async fn run_test() -> QueryResult<()> {
80    /// #     use schema::users::dsl::*;
81    /// #     let conn = &mut connection_no_transaction().await;
82    /// conn.exclusive_transaction(async |conn|  {
83    ///     diesel::insert_into(users)
84    ///         .values(name.eq("Ruby"))
85    ///         .execute(conn)
86    ///         .await?;
87    ///
88    ///     let all_names = users.select(name).load::<String>(conn).await?;
89    ///     assert_eq!(vec!["Sean", "Tess", "Ruby"], all_names);
90    ///
91    ///     Ok(())
92    /// }).await
93    /// # }
94    /// ```
95    pub async fn exclusive_transaction<'a, R, E, F>(&mut self, f: F) -> Result<R, E>
96    where
97        for<'r> F: AsyncFnOnce(&'r mut Self) -> Result<R, E>
98            + AsyncFunc<&'r mut Self, Result<R, E>, Fut: Send>
99            + Send
100            + 'a,
101        E: From<diesel::result::Error> + Send + 'a,
102        R: Send + 'a,
103    {
104        self.transaction_sql(f, "BEGIN EXCLUSIVE").await
105    }
106
107    async fn transaction_sql<'a, R, E, F>(&mut self, f: F, sql: &'static str) -> Result<R, E>
108    where
109        for<'r> F: AsyncFnOnce(&'r mut Self) -> Result<R, E>
110            + AsyncFunc<&'r mut Self, Result<R, E>, Fut: Send>
111            + Send
112            + 'a,
113        E: From<diesel::result::Error> + Send + 'a,
114        R: Send + 'a,
115    {
116        self.spawn_blocking(|conn| AnsiTransactionManager::begin_transaction_sql(conn, sql))
117            .await?;
118
119        match f(&mut *self).await {
120            Ok(value) => {
121                SyncTransactionManagerWrapper::<AnsiTransactionManager>::commit_transaction(
122                    &mut *self,
123                )
124                .await?;
125                Ok(value)
126            }
127            Err(e) => {
128                SyncTransactionManagerWrapper::<AnsiTransactionManager>::rollback_transaction(
129                    &mut *self,
130                )
131                .await?;
132                Err(e)
133            }
134        }
135    }
136}