diesel_async/sync_connection_wrapper/sqlite.rs
1use diesel::connection::AnsiTransactionManager;
2use diesel::SqliteConnection;
3
4use crate::sync_connection_wrapper::SyncTransactionManagerWrapper;
5use crate::transaction_manager::AsyncFunc;
6use crate::TransactionManager;
7
8use super::SyncConnectionWrapper;
9
10impl SyncConnectionWrapper<SqliteConnection> {
11 /// Run a transaction with `BEGIN IMMEDIATE`
12 ///
13 /// This method will return an error if a transaction is already open.
14 ///
15 /// **WARNING:** Canceling the returned future does currently **not**
16 /// close an already open transaction. You may end up with a connection
17 /// containing a dangling transaction.
18 ///
19 /// # Example
20 ///
21 /// ```rust
22 /// # include!("../doctest_setup.rs");
23 /// use diesel::result::Error;
24 /// use diesel_async::{RunQueryDsl, AsyncConnection};
25 /// #
26 /// # #[tokio::main(flavor = "current_thread")]
27 /// # async fn main() {
28 /// # run_test().await.unwrap();
29 /// # }
30 /// #
31 /// # async fn run_test() -> QueryResult<()> {
32 /// # use schema::users::dsl::*;
33 /// # let conn = &mut connection_no_transaction().await;
34 /// conn.immediate_transaction(async |conn| {
35 /// diesel::insert_into(users)
36 /// .values(name.eq("Ruby"))
37 /// .execute(conn)
38 /// .await?;
39 ///
40 /// let all_names = users.select(name).load::<String>(conn).await?;
41 /// assert_eq!(vec!["Sean", "Tess", "Ruby"], all_names);
42 ///
43 /// Ok(())
44 /// }).await
45 /// # }
46 /// ```
47 pub async fn immediate_transaction<'a, R, E, F>(&mut self, f: F) -> Result<R, E>
48 where
49 for<'r> F: AsyncFnOnce(&'r mut Self) -> Result<R, E>
50 + AsyncFunc<&'r mut Self, Result<R, E>, Fut: Send>
51 + Send
52 + 'a,
53 E: From<diesel::result::Error> + Send + 'a,
54 R: Send + 'a,
55 {
56 self.transaction_sql(f, "BEGIN IMMEDIATE").await
57 }
58
59 /// Run a transaction with `BEGIN EXCLUSIVE`
60 ///
61 /// This method will return an error if a transaction is already open.
62 ///
63 /// **WARNING:** Canceling the returned future does currently **not**
64 /// close an already open transaction. You may end up with a connection
65 /// containing a dangling transaction.
66 ///
67 /// # Example
68 ///
69 /// ```rust
70 /// # include!("../doctest_setup.rs");
71 /// use diesel::result::Error;
72 /// use diesel_async::{RunQueryDsl, AsyncConnection};
73 /// #
74 /// # #[tokio::main(flavor = "current_thread")]
75 /// # async fn main() {
76 /// # run_test().await.unwrap();
77 /// # }
78 /// #
79 /// # async fn run_test() -> QueryResult<()> {
80 /// # use schema::users::dsl::*;
81 /// # let conn = &mut connection_no_transaction().await;
82 /// conn.exclusive_transaction(async |conn| {
83 /// diesel::insert_into(users)
84 /// .values(name.eq("Ruby"))
85 /// .execute(conn)
86 /// .await?;
87 ///
88 /// let all_names = users.select(name).load::<String>(conn).await?;
89 /// assert_eq!(vec!["Sean", "Tess", "Ruby"], all_names);
90 ///
91 /// Ok(())
92 /// }).await
93 /// # }
94 /// ```
95 pub async fn exclusive_transaction<'a, R, E, F>(&mut self, f: F) -> Result<R, E>
96 where
97 for<'r> F: AsyncFnOnce(&'r mut Self) -> Result<R, E>
98 + AsyncFunc<&'r mut Self, Result<R, E>, Fut: Send>
99 + Send
100 + 'a,
101 E: From<diesel::result::Error> + Send + 'a,
102 R: Send + 'a,
103 {
104 self.transaction_sql(f, "BEGIN EXCLUSIVE").await
105 }
106
107 async fn transaction_sql<'a, R, E, F>(&mut self, f: F, sql: &'static str) -> Result<R, E>
108 where
109 for<'r> F: AsyncFnOnce(&'r mut Self) -> Result<R, E>
110 + AsyncFunc<&'r mut Self, Result<R, E>, Fut: Send>
111 + Send
112 + 'a,
113 E: From<diesel::result::Error> + Send + 'a,
114 R: Send + 'a,
115 {
116 self.spawn_blocking(|conn| AnsiTransactionManager::begin_transaction_sql(conn, sql))
117 .await?;
118
119 match f(&mut *self).await {
120 Ok(value) => {
121 SyncTransactionManagerWrapper::<AnsiTransactionManager>::commit_transaction(
122 &mut *self,
123 )
124 .await?;
125 Ok(value)
126 }
127 Err(e) => {
128 SyncTransactionManagerWrapper::<AnsiTransactionManager>::rollback_transaction(
129 &mut *self,
130 )
131 .await?;
132 Err(e)
133 }
134 }
135 }
136}