1#![allow(unused_assignments)]
2use std::sync::Arc;
3
4#[cfg(feature = "sqlx-sqlite")]
5use sqlx_core::sql_str::SqlSafeStr;
6#[cfg(feature = "sqlx-dep")]
7use sqlx_core::transaction::TransactionManager;
8use std::sync::Mutex;
9use tracing::instrument;
10
11use crate::{
12 AccessMode, ConnectionTrait, DbBackend, DbErr, ExecResult, InnerConnection, IsolationLevel,
13 QueryResult, SqliteTransactionMode, Statement, TransactionOptions, TransactionSession,
14 TransactionTrait, debug_print, error::*,
15};
16#[cfg(feature = "sqlx-dep")]
17use crate::{sqlx_error_to_exec_err, sqlx_error_to_query_err};
18
19#[cfg(feature = "stream")]
20use crate::{StreamTrait, TransactionStream};
21
22pub struct DatabaseTransaction {
34 conn: Arc<Mutex<InnerConnection>>,
35 backend: DbBackend,
36 open: bool,
37 metric_callback: Option<crate::metric::Callback>,
38 record_stmt_in_spans: bool,
39}
40
41#[instrument(level = "trace", skip(transaction, callback))]
42pub(crate) fn run_async_transaction_callback<Txn, F, T, E>(
43 transaction: Txn,
44 callback: F,
45) -> Result<T, TransactionError<E>>
46where
47 Txn: TransactionSession,
48 F: for<'b> FnOnce(&'b Txn) -> Result<T, E>,
49 E: std::fmt::Display + std::fmt::Debug,
50{
51 let res = callback(&transaction).map_err(TransactionError::Transaction);
52 if res.is_ok() {
53 transaction.commit().map_err(TransactionError::Connection)?;
54 } else {
55 transaction
56 .rollback()
57 .map_err(TransactionError::Connection)?;
58 }
59 res
60}
61
62impl std::fmt::Debug for DatabaseTransaction {
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 write!(f, "DatabaseTransaction")
65 }
66}
67
68impl DatabaseTransaction {
69 #[instrument(level = "trace", skip(metric_callback))]
70 pub(crate) fn begin(
71 conn: Arc<Mutex<InnerConnection>>,
72 backend: DbBackend,
73 metric_callback: Option<crate::metric::Callback>,
74 record_stmt_in_spans: bool,
75 isolation_level: Option<IsolationLevel>,
76 access_mode: Option<AccessMode>,
77 sqlite_transaction_mode: Option<SqliteTransactionMode>,
78 ) -> Result<DatabaseTransaction, DbErr> {
79 let res = DatabaseTransaction {
80 conn,
81 backend,
82 open: true,
83 metric_callback,
84 record_stmt_in_spans,
85 };
86
87 let begin_result: Result<(), DbErr> = super::tracing_spans::with_db_span!(
88 "sea_orm.begin",
89 backend,
90 "BEGIN",
91 record_stmt = false,
92 {
93 #[cfg(not(feature = "sync"))]
94 let conn = &mut *res.conn.lock();
95 #[cfg(feature = "sync")]
96 let conn = &mut *res.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
97
98 match conn {
99 #[cfg(feature = "sqlx-mysql")]
100 InnerConnection::MySql(c) => {
101 crate::driver::sqlx_mysql::set_transaction_config(
103 c,
104 isolation_level,
105 access_mode,
106 )?;
107 <sqlx::MySql as sqlx::Database>::TransactionManager::begin(c, None)
108 .map_err(sqlx_error_to_query_err)
109 }
110 #[cfg(feature = "sqlx-postgres")]
111 InnerConnection::Postgres(c) => {
112 <sqlx::Postgres as sqlx::Database>::TransactionManager::begin(c, None)
113 .map_err(sqlx_error_to_query_err)?;
114 crate::driver::sqlx_postgres::set_transaction_config(
116 c,
117 isolation_level,
118 access_mode,
119 )
120 }
121 #[cfg(feature = "sqlx-sqlite")]
122 InnerConnection::Sqlite(c) => {
123 crate::driver::sqlx_sqlite::set_transaction_config(
124 c,
125 isolation_level,
126 access_mode,
127 )?;
128 let depth = <sqlx::Sqlite as sqlx::Database>::TransactionManager::get_transaction_depth(c);
129 let statement = if depth == 0 {
130 sqlite_transaction_mode.map(|mode| {
131 sqlx::AssertSqlSafe(format!("BEGIN {}", mode.sqlite_keyword()))
132 .into_sql_str()
133 })
134 } else {
135 None
137 };
138 <sqlx::Sqlite as sqlx::Database>::TransactionManager::begin(c, statement)
139 .map_err(sqlx_error_to_query_err)
140 }
141 #[cfg(feature = "rusqlite")]
142 InnerConnection::Rusqlite(c) => c.begin(sqlite_transaction_mode),
143 #[cfg(feature = "mock")]
144 InnerConnection::Mock(c) => {
145 c.begin();
146 Ok(())
147 }
148 #[cfg(feature = "proxy")]
149 InnerConnection::Proxy(c) => {
150 c.begin();
151 Ok(())
152 }
153 #[allow(unreachable_patterns)]
154 _ => Err(conn_err("Disconnected")),
155 }
156 }
157 );
158
159 begin_result?;
160 Ok(res)
161 }
162
163 #[instrument(level = "trace", skip(callback))]
166 pub(crate) fn run<F, T, E>(self, callback: F) -> Result<T, TransactionError<E>>
167 where
168 F: for<'b> FnOnce(&'b DatabaseTransaction) -> Result<T, E>,
169 E: std::fmt::Display + std::fmt::Debug,
170 {
171 let res = callback(&self).map_err(TransactionError::Transaction);
172 if res.is_ok() {
173 self.commit().map_err(TransactionError::Connection)?;
174 } else {
175 self.rollback().map_err(TransactionError::Connection)?;
176 }
177 res
178 }
179
180 #[instrument(level = "trace", skip(callback))]
184 pub fn transaction<F, T, E>(&self, callback: F) -> Result<T, TransactionError<E>>
185 where
186 F: for<'c> FnOnce(&'c DatabaseTransaction) -> Result<T, E>,
187 E: std::fmt::Display + std::fmt::Debug,
188 {
189 let transaction = self.begin().map_err(TransactionError::Connection)?;
190 run_async_transaction_callback(transaction, callback)
191 }
192
193 #[instrument(level = "trace", skip(callback))]
197 pub fn transaction_with_config<F, T, E>(
198 &self,
199 callback: F,
200 isolation_level: Option<IsolationLevel>,
201 access_mode: Option<AccessMode>,
202 ) -> Result<T, TransactionError<E>>
203 where
204 F: for<'c> FnOnce(&'c DatabaseTransaction) -> Result<T, E>,
205 E: std::fmt::Display + std::fmt::Debug,
206 {
207 let transaction = self
208 .begin_with_config(isolation_level, access_mode)
209 .map_err(TransactionError::Connection)?;
210 run_async_transaction_callback(transaction, callback)
211 }
212
213 #[instrument(level = "trace")]
215 #[allow(unreachable_code, unused_mut)]
216 pub fn commit(mut self) -> Result<(), DbErr> {
217 let result: Result<(), DbErr> = super::tracing_spans::with_db_span!(
218 "sea_orm.commit",
219 self.backend,
220 "COMMIT",
221 record_stmt = false,
222 {
223 #[cfg(not(feature = "sync"))]
224 let conn = &mut *self.conn.lock();
225 #[cfg(feature = "sync")]
226 let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
227
228 match conn {
229 #[cfg(feature = "sqlx-mysql")]
230 InnerConnection::MySql(c) => {
231 <sqlx::MySql as sqlx::Database>::TransactionManager::commit(c)
232 .map_err(sqlx_error_to_query_err)
233 }
234 #[cfg(feature = "sqlx-postgres")]
235 InnerConnection::Postgres(c) => {
236 <sqlx::Postgres as sqlx::Database>::TransactionManager::commit(c)
237 .map_err(sqlx_error_to_query_err)
238 }
239 #[cfg(feature = "sqlx-sqlite")]
240 InnerConnection::Sqlite(c) => {
241 <sqlx::Sqlite as sqlx::Database>::TransactionManager::commit(c)
242 .map_err(sqlx_error_to_query_err)
243 }
244 #[cfg(feature = "rusqlite")]
245 InnerConnection::Rusqlite(c) => c.commit(),
246 #[cfg(feature = "mock")]
247 InnerConnection::Mock(c) => {
248 c.commit();
249 Ok(())
250 }
251 #[cfg(feature = "proxy")]
252 InnerConnection::Proxy(c) => {
253 c.commit();
254 Ok(())
255 }
256 #[allow(unreachable_patterns)]
257 _ => Err(conn_err("Disconnected")),
258 }
259 }
260 );
261
262 result?;
263 self.open = false; Ok(())
265 }
266
267 #[instrument(level = "trace")]
269 #[allow(unreachable_code, unused_mut)]
270 pub fn rollback(mut self) -> Result<(), DbErr> {
271 let result: Result<(), DbErr> = super::tracing_spans::with_db_span!(
272 "sea_orm.rollback",
273 self.backend,
274 "ROLLBACK",
275 record_stmt = false,
276 {
277 #[cfg(not(feature = "sync"))]
278 let conn = &mut *self.conn.lock();
279 #[cfg(feature = "sync")]
280 let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
281
282 match conn {
283 #[cfg(feature = "sqlx-mysql")]
284 InnerConnection::MySql(c) => {
285 <sqlx::MySql as sqlx::Database>::TransactionManager::rollback(c)
286 .map_err(sqlx_error_to_query_err)
287 }
288 #[cfg(feature = "sqlx-postgres")]
289 InnerConnection::Postgres(c) => {
290 <sqlx::Postgres as sqlx::Database>::TransactionManager::rollback(c)
291 .map_err(sqlx_error_to_query_err)
292 }
293 #[cfg(feature = "sqlx-sqlite")]
294 InnerConnection::Sqlite(c) => {
295 <sqlx::Sqlite as sqlx::Database>::TransactionManager::rollback(c)
296 .map_err(sqlx_error_to_query_err)
297 }
298 #[cfg(feature = "rusqlite")]
299 InnerConnection::Rusqlite(c) => c.rollback(),
300 #[cfg(feature = "mock")]
301 InnerConnection::Mock(c) => {
302 c.rollback();
303 Ok(())
304 }
305 #[cfg(feature = "proxy")]
306 InnerConnection::Proxy(c) => {
307 c.rollback();
308 Ok(())
309 }
310 #[allow(unreachable_patterns)]
311 _ => Err(conn_err("Disconnected")),
312 }
313 }
314 );
315
316 result?;
317 self.open = false; Ok(())
319 }
320
321 #[instrument(level = "trace")]
323 fn start_rollback(&mut self) -> Result<(), DbErr> {
324 if self.open {
325 if let Some(mut conn) = self.conn.try_lock().ok() {
326 match &mut *conn {
327 #[cfg(feature = "sqlx-mysql")]
328 InnerConnection::MySql(c) => {
329 <sqlx::MySql as sqlx::Database>::TransactionManager::start_rollback(c);
330 }
331 #[cfg(feature = "sqlx-postgres")]
332 InnerConnection::Postgres(c) => {
333 <sqlx::Postgres as sqlx::Database>::TransactionManager::start_rollback(c);
334 }
335 #[cfg(feature = "sqlx-sqlite")]
336 InnerConnection::Sqlite(c) => {
337 <sqlx::Sqlite as sqlx::Database>::TransactionManager::start_rollback(c);
338 }
339 #[cfg(feature = "rusqlite")]
340 InnerConnection::Rusqlite(c) => {
341 c.start_rollback()?;
342 }
343 #[cfg(feature = "mock")]
344 InnerConnection::Mock(c) => {
345 c.rollback();
346 }
347 #[cfg(feature = "proxy")]
348 InnerConnection::Proxy(c) => {
349 c.start_rollback();
350 }
351 #[allow(unreachable_patterns)]
352 _ => return Err(conn_err("Disconnected")),
353 }
354 } else {
355 return Err(conn_err("Dropping a locked Transaction"));
357 }
358 }
359 Ok(())
360 }
361}
362
363impl TransactionSession for DatabaseTransaction {
364 fn commit(self) -> Result<(), DbErr> {
365 self.commit()
366 }
367
368 fn rollback(self) -> Result<(), DbErr> {
369 self.rollback()
370 }
371}
372
373impl Drop for DatabaseTransaction {
374 fn drop(&mut self) {
375 self.start_rollback().expect("Fail to rollback transaction");
376 }
377}
378
379impl ConnectionTrait for DatabaseTransaction {
380 fn get_database_backend(&self) -> DbBackend {
381 self.backend
383 }
384
385 #[instrument(level = "trace", skip(stmt))]
386 #[allow(unused_variables)]
387 fn execute_raw(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
388 debug_print!("{}", stmt);
389
390 super::tracing_spans::with_db_span!(
391 "sea_orm.execute",
392 self.backend,
393 stmt.sql.as_str(),
394 record_stmt = self.record_stmt_in_spans,
395 {
396 #[cfg(not(feature = "sync"))]
397 let conn = &mut *self.conn.lock();
398 #[cfg(feature = "sync")]
399 let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
400
401 match conn {
402 #[cfg(feature = "sqlx-mysql")]
403 InnerConnection::MySql(conn) => {
404 let query = crate::driver::sqlx_mysql::sqlx_query(&stmt);
405 let conn: &mut sqlx::MySqlConnection = &mut *conn;
406 crate::metric::metric!(self.metric_callback, &stmt, {
407 query.execute(conn).map(Into::into)
408 })
409 .map_err(sqlx_error_to_exec_err)
410 }
411 #[cfg(feature = "sqlx-postgres")]
412 InnerConnection::Postgres(conn) => {
413 let query = crate::driver::sqlx_postgres::sqlx_query(&stmt);
414 let conn: &mut sqlx::PgConnection = &mut *conn;
415 crate::metric::metric!(self.metric_callback, &stmt, {
416 query.execute(conn).map(Into::into)
417 })
418 .map_err(sqlx_error_to_exec_err)
419 }
420 #[cfg(feature = "sqlx-sqlite")]
421 InnerConnection::Sqlite(conn) => {
422 let query = crate::driver::sqlx_sqlite::sqlx_query(&stmt);
423 let conn: &mut sqlx::SqliteConnection = &mut *conn;
424 crate::metric::metric!(self.metric_callback, &stmt, {
425 query.execute(conn).map(Into::into)
426 })
427 .map_err(sqlx_error_to_exec_err)
428 }
429 #[cfg(feature = "rusqlite")]
430 InnerConnection::Rusqlite(conn) => conn.execute(stmt, &self.metric_callback),
431 #[cfg(feature = "mock")]
432 InnerConnection::Mock(conn) => conn.execute(stmt),
433 #[cfg(feature = "proxy")]
434 InnerConnection::Proxy(conn) => conn.execute(stmt),
435 #[allow(unreachable_patterns)]
436 _ => Err(conn_err("Disconnected")),
437 }
438 }
439 )
440 }
441
442 #[instrument(level = "trace", skip(sql))]
443 #[allow(unused_variables)]
444 fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
445 debug_print!("{}", sql);
446
447 super::tracing_spans::with_db_span!(
448 "sea_orm.execute_unprepared",
449 self.backend,
450 sql,
451 record_stmt = false,
452 {
453 #[cfg(not(feature = "sync"))]
454 let conn = &mut *self.conn.lock();
455 #[cfg(feature = "sync")]
456 let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
457
458 match conn {
459 #[cfg(feature = "sqlx-mysql")]
460 InnerConnection::MySql(conn) => {
461 let conn: &mut sqlx::MySqlConnection = &mut *conn;
462 sqlx::Executor::execute(conn, sqlx::AssertSqlSafe(sql.to_owned()))
463 .map(Into::into)
464 .map_err(sqlx_error_to_exec_err)
465 }
466 #[cfg(feature = "sqlx-postgres")]
467 InnerConnection::Postgres(conn) => {
468 let conn: &mut sqlx::PgConnection = &mut *conn;
469 sqlx::Executor::execute(conn, sqlx::AssertSqlSafe(sql.to_owned()))
470 .map(Into::into)
471 .map_err(sqlx_error_to_exec_err)
472 }
473 #[cfg(feature = "sqlx-sqlite")]
474 InnerConnection::Sqlite(conn) => {
475 let conn: &mut sqlx::SqliteConnection = &mut *conn;
476 sqlx::Executor::execute(conn, sqlx::AssertSqlSafe(sql.to_owned()))
477 .map(Into::into)
478 .map_err(sqlx_error_to_exec_err)
479 }
480 #[cfg(feature = "rusqlite")]
481 InnerConnection::Rusqlite(conn) => conn.execute_unprepared(sql),
482 #[cfg(feature = "mock")]
483 InnerConnection::Mock(conn) => {
484 let db_backend = conn.get_database_backend();
485 let stmt = Statement::from_string(db_backend, sql);
486 conn.execute(stmt)
487 }
488 #[cfg(feature = "proxy")]
489 InnerConnection::Proxy(conn) => {
490 let db_backend = conn.get_database_backend();
491 let stmt = Statement::from_string(db_backend, sql);
492 conn.execute(stmt)
493 }
494 #[allow(unreachable_patterns)]
495 _ => Err(conn_err("Disconnected")),
496 }
497 }
498 )
499 }
500
501 #[instrument(level = "trace", skip(stmt))]
502 #[allow(unused_variables)]
503 fn query_one_raw(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
504 debug_print!("{}", stmt);
505
506 super::tracing_spans::with_db_span!(
507 "sea_orm.query_one",
508 self.backend,
509 stmt.sql.as_str(),
510 record_stmt = self.record_stmt_in_spans,
511 {
512 #[cfg(not(feature = "sync"))]
513 let conn = &mut *self.conn.lock();
514 #[cfg(feature = "sync")]
515 let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
516
517 match conn {
518 #[cfg(feature = "sqlx-mysql")]
519 InnerConnection::MySql(conn) => {
520 let query = crate::driver::sqlx_mysql::sqlx_query(&stmt);
521 let conn: &mut sqlx::MySqlConnection = &mut *conn;
522 crate::metric::metric!(self.metric_callback, &stmt, {
523 crate::sqlx_map_err_ignore_not_found(
524 query.fetch_one(conn).map(|row| Some(row.into())),
525 )
526 })
527 }
528 #[cfg(feature = "sqlx-postgres")]
529 InnerConnection::Postgres(conn) => {
530 let query = crate::driver::sqlx_postgres::sqlx_query(&stmt);
531 let conn: &mut sqlx::PgConnection = &mut *conn;
532 crate::metric::metric!(self.metric_callback, &stmt, {
533 crate::sqlx_map_err_ignore_not_found(
534 query.fetch_one(conn).map(|row| Some(row.into())),
535 )
536 })
537 }
538 #[cfg(feature = "sqlx-sqlite")]
539 InnerConnection::Sqlite(conn) => {
540 let query = crate::driver::sqlx_sqlite::sqlx_query(&stmt);
541 let conn: &mut sqlx::SqliteConnection = &mut *conn;
542 crate::metric::metric!(self.metric_callback, &stmt, {
543 crate::sqlx_map_err_ignore_not_found(
544 query.fetch_one(conn).map(|row| Some(row.into())),
545 )
546 })
547 }
548 #[cfg(feature = "rusqlite")]
549 InnerConnection::Rusqlite(conn) => conn.query_one(stmt, &self.metric_callback),
550 #[cfg(feature = "mock")]
551 InnerConnection::Mock(conn) => conn.query_one(stmt),
552 #[cfg(feature = "proxy")]
553 InnerConnection::Proxy(conn) => conn.query_one(stmt),
554 #[allow(unreachable_patterns)]
555 _ => Err(conn_err("Disconnected")),
556 }
557 }
558 )
559 }
560
561 #[instrument(level = "trace", skip(stmt))]
562 #[allow(unused_variables)]
563 fn query_all_raw(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
564 debug_print!("{}", stmt);
565
566 super::tracing_spans::with_db_span!(
567 "sea_orm.query_all",
568 self.backend,
569 stmt.sql.as_str(),
570 record_stmt = self.record_stmt_in_spans,
571 {
572 #[cfg(not(feature = "sync"))]
573 let conn = &mut *self.conn.lock();
574 #[cfg(feature = "sync")]
575 let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
576
577 match conn {
578 #[cfg(feature = "sqlx-mysql")]
579 InnerConnection::MySql(conn) => {
580 let query = crate::driver::sqlx_mysql::sqlx_query(&stmt);
581 let conn: &mut sqlx::MySqlConnection = &mut *conn;
582 crate::metric::metric!(self.metric_callback, &stmt, {
583 query
584 .fetch_all(conn)
585 .map(|rows| rows.into_iter().map(|r| r.into()).collect())
586 .map_err(sqlx_error_to_query_err)
587 })
588 }
589 #[cfg(feature = "sqlx-postgres")]
590 InnerConnection::Postgres(conn) => {
591 let query = crate::driver::sqlx_postgres::sqlx_query(&stmt);
592 let conn: &mut sqlx::PgConnection = &mut *conn;
593 crate::metric::metric!(self.metric_callback, &stmt, {
594 query
595 .fetch_all(conn)
596 .map(|rows| rows.into_iter().map(|r| r.into()).collect())
597 .map_err(sqlx_error_to_query_err)
598 })
599 }
600 #[cfg(feature = "sqlx-sqlite")]
601 InnerConnection::Sqlite(conn) => {
602 let query = crate::driver::sqlx_sqlite::sqlx_query(&stmt);
603 let conn: &mut sqlx::SqliteConnection = &mut *conn;
604 crate::metric::metric!(self.metric_callback, &stmt, {
605 query
606 .fetch_all(conn)
607 .map(|rows| rows.into_iter().map(|r| r.into()).collect())
608 .map_err(sqlx_error_to_query_err)
609 })
610 }
611 #[cfg(feature = "rusqlite")]
612 InnerConnection::Rusqlite(conn) => conn.query_all(stmt, &self.metric_callback),
613 #[cfg(feature = "mock")]
614 InnerConnection::Mock(conn) => conn.query_all(stmt),
615 #[cfg(feature = "proxy")]
616 InnerConnection::Proxy(conn) => conn.query_all(stmt),
617 #[allow(unreachable_patterns)]
618 _ => Err(conn_err("Disconnected")),
619 }
620 }
621 )
622 }
623}
624
625#[cfg(feature = "stream")]
626impl StreamTrait for DatabaseTransaction {
627 type Stream<'a> = TransactionStream<'a>;
628
629 fn get_database_backend(&self) -> DbBackend {
630 self.backend
631 }
632
633 #[instrument(level = "trace", skip(stmt))]
634 fn stream_raw<'a>(&'a self, stmt: Statement) -> Result<Self::Stream<'a>, DbErr> {
635 ({
636 #[cfg(not(feature = "sync"))]
637 let conn = self.conn.lock();
638 #[cfg(feature = "sync")]
639 let conn = self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
640 Ok(crate::TransactionStream::build(
641 conn,
642 stmt,
643 self.metric_callback.clone(),
644 ))
645 })
646 }
647}
648
649impl TransactionTrait for DatabaseTransaction {
650 type Transaction = DatabaseTransaction;
651
652 #[instrument(level = "trace")]
653 fn begin(&self) -> Result<DatabaseTransaction, DbErr> {
654 DatabaseTransaction::begin(
655 Arc::clone(&self.conn),
656 self.backend,
657 self.metric_callback.clone(),
658 self.record_stmt_in_spans,
659 None,
660 None,
661 None,
662 )
663 }
664
665 #[instrument(level = "trace")]
666 fn begin_with_config(
667 &self,
668 isolation_level: Option<IsolationLevel>,
669 access_mode: Option<AccessMode>,
670 ) -> Result<DatabaseTransaction, DbErr> {
671 DatabaseTransaction::begin(
672 Arc::clone(&self.conn),
673 self.backend,
674 self.metric_callback.clone(),
675 self.record_stmt_in_spans,
676 isolation_level,
677 access_mode,
678 None,
679 )
680 }
681
682 #[instrument(level = "trace")]
683 fn begin_with_options(
684 &self,
685 options: TransactionOptions,
686 ) -> Result<DatabaseTransaction, DbErr> {
687 DatabaseTransaction::begin(
688 Arc::clone(&self.conn),
689 self.backend,
690 self.metric_callback.clone(),
691 self.record_stmt_in_spans,
692 options.isolation_level,
693 options.access_mode,
694 options.sqlite_transaction_mode,
695 )
696 }
697
698 #[instrument(level = "trace", skip(_callback))]
702 fn transaction<F, T, E>(&self, _callback: F) -> Result<T, TransactionError<E>>
703 where
704 F: for<'c> FnOnce(&'c DatabaseTransaction) -> Result<T, E>,
705 E: std::fmt::Display + std::fmt::Debug,
706 {
707 let transaction = self.begin().map_err(TransactionError::Connection)?;
708 transaction.run(_callback)
709 }
710
711 #[instrument(level = "trace", skip(_callback))]
715 fn transaction_with_config<F, T, E>(
716 &self,
717 _callback: F,
718 isolation_level: Option<IsolationLevel>,
719 access_mode: Option<AccessMode>,
720 ) -> Result<T, TransactionError<E>>
721 where
722 F: for<'c> FnOnce(&'c DatabaseTransaction) -> Result<T, E>,
723 E: std::fmt::Display + std::fmt::Debug,
724 {
725 let transaction = self
726 .begin_with_config(isolation_level, access_mode)
727 .map_err(TransactionError::Connection)?;
728 transaction.run(_callback)
729 }
730}
731
732#[derive(Debug)]
736pub enum TransactionError<E> {
737 Connection(DbErr),
739 Transaction(E),
741}
742
743impl<E> std::fmt::Display for TransactionError<E>
744where
745 E: std::fmt::Display + std::fmt::Debug,
746{
747 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
748 match self {
749 TransactionError::Connection(e) => std::fmt::Display::fmt(e, f),
750 TransactionError::Transaction(e) => std::fmt::Display::fmt(e, f),
751 }
752 }
753}
754
755impl<E> std::error::Error for TransactionError<E> where E: std::fmt::Display + std::fmt::Debug {}
756
757impl<E> From<DbErr> for TransactionError<E>
758where
759 E: std::fmt::Display + std::fmt::Debug,
760{
761 fn from(e: DbErr) -> Self {
762 Self::Connection(e)
763 }
764}