diesel_async/pg/transaction_builder.rs
1use crate::transaction_manager::AsyncFunc;
2use crate::{AnsiTransactionManager, AsyncConnection, TransactionManager};
3use diesel::backend::Backend;
4use diesel::pg::Pg;
5use diesel::query_builder::{AstPass, QueryBuilder, QueryFragment};
6use diesel::QueryResult;
7
8/// Used to build a transaction, specifying additional details.
9///
10/// This struct is returned by [`AsyncPgConnection::build_transaction`].
11/// See the documentation for methods on this struct for usage examples.
12/// See [the PostgreSQL documentation for `SET TRANSACTION`][pg-docs]
13/// for details on the behavior of each option.
14///
15/// [`AsyncPgConnection::build_transaction`]: super::AsyncPgConnection::build_transaction()
16/// [pg-docs]: https://www.postgresql.org/docs/current/static/sql-set-transaction.html
17#[must_use = "Transaction builder does nothing unless you call `run` on it"]
18#[cfg(feature = "postgres")]
19pub struct TransactionBuilder<'a, C> {
20 connection: &'a mut C,
21 isolation_level: Option<IsolationLevel>,
22 read_mode: Option<ReadMode>,
23 deferrable: Option<Deferrable>,
24}
25
26impl<'a, C> TransactionBuilder<'a, C>
27where
28 C: AsyncConnection<Backend = Pg, TransactionManager = AnsiTransactionManager>,
29{
30 pub(crate) fn new(connection: &'a mut C) -> Self {
31 Self {
32 connection,
33 isolation_level: None,
34 read_mode: None,
35 deferrable: None,
36 }
37 }
38
39 /// Makes the transaction `READ ONLY`
40 ///
41 /// # Example
42 ///
43 /// ```rust
44 /// # include!("../doctest_setup.rs");
45 /// # use diesel::sql_query;
46 /// use diesel_async::RunQueryDsl;
47 /// #
48 /// # #[tokio::main(flavor = "current_thread")]
49 /// # async fn main() {
50 /// # run_test().await.unwrap();
51 /// # }
52 /// #
53 /// # diesel::table! {
54 /// # users_for_read_only {
55 /// # id -> Integer,
56 /// # name -> Text,
57 /// # }
58 /// # }
59 /// #
60 /// # async fn run_test() -> QueryResult<()> {
61 /// # use users_for_read_only::table as users;
62 /// # use users_for_read_only::columns::*;
63 /// # let conn = &mut connection_no_transaction().await;
64 /// # sql_query("CREATE TABLE IF NOT EXISTS users_for_read_only (
65 /// # id SERIAL PRIMARY KEY,
66 /// # name TEXT NOT NULL
67 /// # )").execute(conn).await?;
68 /// conn.build_transaction()
69 /// .read_only()
70 /// .run::<_, diesel::result::Error, _>(async |conn| {
71 /// let read_attempt = users.select(name).load::<String>(conn).await;
72 /// assert!(read_attempt.is_ok());
73 ///
74 /// let write_attempt = diesel::insert_into(users)
75 /// .values(name.eq("Ruby"))
76 /// .execute(conn)
77 /// .await;
78 /// assert!(write_attempt.is_err());
79 ///
80 /// Ok(())
81 /// }).await?;
82 /// # sql_query("DROP TABLE users_for_read_only").execute(conn).await?;
83 /// # Ok(())
84 /// # }
85 /// ```
86 pub fn read_only(mut self) -> Self {
87 self.read_mode = Some(ReadMode::ReadOnly);
88 self
89 }
90
91 /// Makes the transaction `READ WRITE`
92 ///
93 /// This is the default, unless you've changed the
94 /// `default_transaction_read_only` configuration parameter.
95 ///
96 /// # Example
97 ///
98 /// ```rust
99 /// # include!("../doctest_setup.rs");
100 /// # use diesel::result::Error::RollbackTransaction;
101 /// # use diesel::sql_query;
102 /// use diesel_async::RunQueryDsl;
103 ///
104 /// #
105 /// # #[tokio::main(flavor = "current_thread")]
106 /// # async fn main() {
107 /// # assert_eq!(run_test().await, Err(RollbackTransaction));
108 /// # }
109 /// #
110 /// # async fn run_test() -> QueryResult<()> {
111 /// # use schema::users::dsl::*;
112 /// # let conn = &mut connection_no_transaction().await;
113 /// conn.build_transaction()
114 /// .read_write()
115 /// .run(async |conn| {
116 /// # sql_query("CREATE TABLE IF NOT EXISTS users (
117 /// # id SERIAL PRIMARY KEY,
118 /// # name TEXT NOT NULL
119 /// # )").execute(conn).await?;
120 /// let read_attempt = users.select(name).load::<String>(conn).await;
121 /// assert!(read_attempt.is_ok());
122 ///
123 /// let write_attempt = diesel::insert_into(users)
124 /// .values(name.eq("Ruby"))
125 /// .execute(conn)
126 /// .await;
127 /// assert!(write_attempt.is_ok());
128 ///
129 /// # Err(RollbackTransaction)
130 /// # /*
131 /// Ok(())
132 /// # */
133 /// }).await
134 /// # }
135 /// ```
136 pub fn read_write(mut self) -> Self {
137 self.read_mode = Some(ReadMode::ReadWrite);
138 self
139 }
140
141 /// Makes the transaction `DEFERRABLE`
142 ///
143 /// # Example
144 ///
145 /// ```rust
146 /// # include!("../doctest_setup.rs");
147 /// #
148 /// # #[tokio::main(flavor = "current_thread")]
149 /// # async fn main() {
150 /// # run_test().await.unwrap();
151 /// # }
152 /// #
153 /// # async fn run_test() -> QueryResult<()> {
154 /// # use schema::users::dsl::*;
155 /// # let conn = &mut connection_no_transaction().await;
156 /// conn.build_transaction()
157 /// .deferrable()
158 /// .run(async |conn| Ok(()))
159 /// .await
160 /// # }
161 /// ```
162 pub fn deferrable(mut self) -> Self {
163 self.deferrable = Some(Deferrable::Deferrable);
164 self
165 }
166
167 /// Makes the transaction `NOT DEFERRABLE`
168 ///
169 /// This is the default, unless you've changed the
170 /// `default_transaction_deferrable` configuration parameter.
171 ///
172 /// # Example
173 ///
174 /// ```rust
175 /// # include!("../doctest_setup.rs");
176 /// #
177 /// # #[tokio::main(flavor = "current_thread")]
178 /// # async fn main() {
179 /// # run_test().await.unwrap();
180 /// # }
181 /// #
182 /// # async fn run_test() -> QueryResult<()> {
183 /// # use schema::users::dsl::*;
184 /// # let conn = &mut connection_no_transaction().await;
185 /// conn.build_transaction()
186 /// .not_deferrable()
187 /// .run(async |conn| Ok(()))
188 /// .await
189 /// # }
190 /// ```
191 pub fn not_deferrable(mut self) -> Self {
192 self.deferrable = Some(Deferrable::NotDeferrable);
193 self
194 }
195
196 /// Makes the transaction `ISOLATION LEVEL READ COMMITTED`
197 ///
198 /// This is the default, unless you've changed the
199 /// `default_transaction_isolation_level` configuration parameter.
200 ///
201 /// # Example
202 ///
203 /// ```rust
204 /// # include!("../doctest_setup.rs");
205 /// #
206 /// # #[tokio::main(flavor = "current_thread")]
207 /// # async fn main() {
208 /// # run_test().await.unwrap();
209 /// # }
210 /// #
211 /// # async fn run_test() -> QueryResult<()> {
212 /// # use schema::users::dsl::*;
213 /// # let conn = &mut connection_no_transaction().await;
214 /// conn.build_transaction()
215 /// .read_committed()
216 /// .run(async |conn| Ok(()))
217 /// .await
218 /// # }
219 /// ```
220 pub fn read_committed(mut self) -> Self {
221 self.isolation_level = Some(IsolationLevel::ReadCommitted);
222 self
223 }
224
225 /// Makes the transaction `ISOLATION LEVEL REPEATABLE READ`
226 ///
227 /// # Example
228 ///
229 /// ```rust
230 /// # include!("../doctest_setup.rs");
231 /// #
232 /// # #[tokio::main(flavor = "current_thread")]
233 /// # async fn main() {
234 /// # run_test().await.unwrap();
235 /// # }
236 /// #
237 /// # async fn run_test() -> QueryResult<()> {
238 /// # use schema::users::dsl::*;
239 /// # let conn = &mut connection_no_transaction().await;
240 /// conn.build_transaction()
241 /// .repeatable_read()
242 /// .run(async |conn| Ok(()))
243 /// .await
244 /// # }
245 /// ```
246 pub fn repeatable_read(mut self) -> Self {
247 self.isolation_level = Some(IsolationLevel::RepeatableRead);
248 self
249 }
250
251 /// Makes the transaction `ISOLATION LEVEL SERIALIZABLE`
252 ///
253 /// # Example
254 ///
255 /// ```rust
256 /// # include!("../doctest_setup.rs");
257 /// #
258 /// # #[tokio::main(flavor = "current_thread")]
259 /// # async fn main() {
260 /// # run_test().await.unwrap();
261 /// # }
262 /// #
263 /// # async fn run_test() -> QueryResult<()> {
264 /// # use schema::users::dsl::*;
265 /// # let conn = &mut connection_no_transaction().await;
266 /// conn.build_transaction()
267 /// .serializable()
268 /// .run(async |conn| Ok(()) )
269 /// .await
270 /// # }
271 /// ```
272 pub fn serializable(mut self) -> Self {
273 self.isolation_level = Some(IsolationLevel::Serializable);
274 self
275 }
276
277 /// Runs the given function inside of the transaction
278 /// with the parameters given to this builder.
279 ///
280 /// Returns an error if the connection is already inside a transaction,
281 /// or if the transaction fails to commit or rollback
282 ///
283 /// If the transaction fails to commit due to a `SerializationFailure` or a
284 /// `ReadOnlyTransaction` a rollback will be attempted. If the rollback succeeds,
285 /// the original error will be returned, otherwise the error generated by the rollback
286 /// will be returned. In the second case the connection should be considered broken
287 /// as it contains a uncommitted unabortable open transaction.
288 pub async fn run<'b, T, E, F>(&mut self, f: F) -> Result<T, E>
289 where
290 for<'r> F: AsyncFnOnce(&'r mut C) -> Result<T, E>
291 + AsyncFunc<&'r mut C, Result<T, E>, Fut: Send>
292 + Send
293 + 'a,
294 T: 'b,
295 E: From<diesel::result::Error> + 'b,
296 {
297 let mut query_builder = <Pg as Backend>::QueryBuilder::default();
298 self.to_sql(&mut query_builder, &Pg)?;
299 let sql = query_builder.finish();
300
301 AnsiTransactionManager::begin_transaction_sql(&mut *self.connection, &sql).await?;
302 match f(&mut *self.connection).await {
303 Ok(value) => {
304 AnsiTransactionManager::commit_transaction(&mut *self.connection).await?;
305 Ok(value)
306 }
307 Err(e) => {
308 AnsiTransactionManager::rollback_transaction(&mut *self.connection).await?;
309 Err(e)
310 }
311 }
312 }
313}
314
315impl<C> QueryFragment<Pg> for TransactionBuilder<'_, C> {
316 fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> {
317 out.push_sql("BEGIN TRANSACTION");
318 if let Some(ref isolation_level) = self.isolation_level {
319 isolation_level.walk_ast(out.reborrow())?;
320 }
321 if let Some(ref read_mode) = self.read_mode {
322 read_mode.walk_ast(out.reborrow())?;
323 }
324 if let Some(ref deferrable) = self.deferrable {
325 deferrable.walk_ast(out.reborrow())?;
326 }
327 Ok(())
328 }
329}
330
331#[derive(Debug, Clone, Copy)]
332enum IsolationLevel {
333 ReadCommitted,
334 RepeatableRead,
335 Serializable,
336}
337
338impl QueryFragment<Pg> for IsolationLevel {
339 fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> {
340 out.push_sql(" ISOLATION LEVEL ");
341 match *self {
342 IsolationLevel::ReadCommitted => out.push_sql("READ COMMITTED"),
343 IsolationLevel::RepeatableRead => out.push_sql("REPEATABLE READ"),
344 IsolationLevel::Serializable => out.push_sql("SERIALIZABLE"),
345 }
346 Ok(())
347 }
348}
349
350#[derive(Debug, Clone, Copy)]
351enum ReadMode {
352 ReadOnly,
353 ReadWrite,
354}
355
356impl QueryFragment<Pg> for ReadMode {
357 fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> {
358 match *self {
359 ReadMode::ReadOnly => out.push_sql(" READ ONLY"),
360 ReadMode::ReadWrite => out.push_sql(" READ WRITE"),
361 }
362 Ok(())
363 }
364}
365
366#[derive(Debug, Clone, Copy)]
367enum Deferrable {
368 Deferrable,
369 NotDeferrable,
370}
371
372impl QueryFragment<Pg> for Deferrable {
373 fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> {
374 match *self {
375 Deferrable::Deferrable => out.push_sql(" DEFERRABLE"),
376 Deferrable::NotDeferrable => out.push_sql(" NOT DEFERRABLE"),
377 }
378 Ok(())
379 }
380}
381
382#[cfg(test)]
383mod tests {
384 use super::*;
385
386 #[tokio::test]
387 async fn test_transaction_builder_generates_correct_sql() {
388 macro_rules! assert_sql {
389 ($query:expr, $sql:expr) => {
390 let mut query_builder = <Pg as Backend>::QueryBuilder::default();
391 $query.to_sql(&mut query_builder, &Pg).unwrap();
392 let sql = query_builder.finish();
393 assert_eq!(sql, $sql);
394 };
395 }
396
397 let database_url =
398 dbg!(std::env::var("DATABASE_URL")
399 .expect("DATABASE_URL must be set in order to run tests"));
400 let mut conn = crate::AsyncPgConnection::establish(&database_url)
401 .await
402 .unwrap();
403
404 assert_sql!(conn.build_transaction(), "BEGIN TRANSACTION");
405 assert_sql!(
406 conn.build_transaction().read_only(),
407 "BEGIN TRANSACTION READ ONLY"
408 );
409 assert_sql!(
410 conn.build_transaction().read_write(),
411 "BEGIN TRANSACTION READ WRITE"
412 );
413 assert_sql!(
414 conn.build_transaction().deferrable(),
415 "BEGIN TRANSACTION DEFERRABLE"
416 );
417 assert_sql!(
418 conn.build_transaction().not_deferrable(),
419 "BEGIN TRANSACTION NOT DEFERRABLE"
420 );
421 assert_sql!(
422 conn.build_transaction().read_committed(),
423 "BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED"
424 );
425 assert_sql!(
426 conn.build_transaction().repeatable_read(),
427 "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ"
428 );
429 assert_sql!(
430 conn.build_transaction().serializable(),
431 "BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE"
432 );
433 assert_sql!(
434 conn.build_transaction()
435 .serializable()
436 .deferrable()
437 .read_only(),
438 "BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE READ ONLY DEFERRABLE"
439 );
440 }
441}