Skip to main content

sea_orm/database/
transaction.rs

1#![allow(unused_assignments)]
2use std::sync::Arc;
3
4#[cfg(feature = "sqlx-dep")]
5use sqlx::TransactionManager;
6use std::sync::Mutex;
7use tracing::instrument;
8
9use crate::{
10    AccessMode, ConnectionTrait, DbBackend, DbErr, ExecResult, InnerConnection, IsolationLevel,
11    QueryResult, SqliteTransactionMode, Statement, StreamTrait, TransactionOptions,
12    TransactionSession, TransactionStream, TransactionTrait, debug_print, error::*,
13};
14#[cfg(feature = "sqlx-dep")]
15use crate::{sqlx_error_to_exec_err, sqlx_error_to_query_err};
16
17/// Defines a database transaction, whether it is an open transaction and the type of
18/// backend to use.
19/// Under the hood, a Transaction is just a wrapper for a connection where
20/// START TRANSACTION has been executed.
21pub struct DatabaseTransaction {
22    conn: Arc<Mutex<InnerConnection>>,
23    backend: DbBackend,
24    open: bool,
25    metric_callback: Option<crate::metric::Callback>,
26}
27
28impl std::fmt::Debug for DatabaseTransaction {
29    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30        write!(f, "DatabaseTransaction")
31    }
32}
33
34impl DatabaseTransaction {
35    #[instrument(level = "trace", skip(metric_callback))]
36    pub(crate) fn begin(
37        conn: Arc<Mutex<InnerConnection>>,
38        backend: DbBackend,
39        metric_callback: Option<crate::metric::Callback>,
40        isolation_level: Option<IsolationLevel>,
41        access_mode: Option<AccessMode>,
42        sqlite_transaction_mode: Option<SqliteTransactionMode>,
43    ) -> Result<DatabaseTransaction, DbErr> {
44        let res = DatabaseTransaction {
45            conn,
46            backend,
47            open: true,
48            metric_callback,
49        };
50
51        let begin_result: Result<(), DbErr> = super::tracing_spans::with_db_span!(
52            "sea_orm.begin",
53            backend,
54            "BEGIN",
55            record_stmt = false,
56            {
57                #[cfg(not(feature = "sync"))]
58                let conn = &mut *res.conn.lock();
59                #[cfg(feature = "sync")]
60                let conn = &mut *res.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
61
62                match conn {
63                    #[cfg(feature = "sqlx-mysql")]
64                    InnerConnection::MySql(c) => {
65                        // in MySQL SET TRANSACTION operations must be executed before transaction start
66                        crate::driver::sqlx_mysql::set_transaction_config(
67                            c,
68                            isolation_level,
69                            access_mode,
70                        )?;
71                        <sqlx::MySql as sqlx::Database>::TransactionManager::begin(c, None)
72                            .map_err(sqlx_error_to_query_err)
73                    }
74                    #[cfg(feature = "sqlx-postgres")]
75                    InnerConnection::Postgres(c) => {
76                        <sqlx::Postgres as sqlx::Database>::TransactionManager::begin(c, None)
77                            .map_err(sqlx_error_to_query_err)?;
78                        // in PostgreSQL SET TRANSACTION operations must be executed inside transaction
79                        crate::driver::sqlx_postgres::set_transaction_config(
80                            c,
81                            isolation_level,
82                            access_mode,
83                        )
84                    }
85                    #[cfg(feature = "sqlx-sqlite")]
86                    InnerConnection::Sqlite(c) => {
87                        crate::driver::sqlx_sqlite::set_transaction_config(
88                            c,
89                            isolation_level,
90                            access_mode,
91                        )?;
92                        let depth = <sqlx::Sqlite as sqlx::Database>::TransactionManager::get_transaction_depth(c);
93                        let statement = if depth == 0 {
94                            sqlite_transaction_mode.map(|mode| {
95                                std::borrow::Cow::from(format!("BEGIN {}", mode.sqlite_keyword()))
96                            })
97                        } else {
98                            // Nested transaction uses SAVEPOINT; the mode only applies to the top-level BEGIN
99                            None
100                        };
101                        <sqlx::Sqlite as sqlx::Database>::TransactionManager::begin(c, statement)
102                            .map_err(sqlx_error_to_query_err)
103                    }
104                    #[cfg(feature = "rusqlite")]
105                    InnerConnection::Rusqlite(c) => c.begin(sqlite_transaction_mode),
106                    #[cfg(feature = "mock")]
107                    InnerConnection::Mock(c) => {
108                        c.begin();
109                        Ok(())
110                    }
111                    #[cfg(feature = "proxy")]
112                    InnerConnection::Proxy(c) => {
113                        c.begin();
114                        Ok(())
115                    }
116                    #[allow(unreachable_patterns)]
117                    _ => Err(conn_err("Disconnected")),
118                }
119            }
120        );
121
122        begin_result?;
123        Ok(res)
124    }
125
126    /// Runs a transaction to completion passing through the result.
127    /// Rolling back the transaction on encountering an error.
128    #[instrument(level = "trace", skip(callback))]
129    pub(crate) fn run<F, T, E>(self, callback: F) -> Result<T, TransactionError<E>>
130    where
131        F: for<'b> FnOnce(&'b DatabaseTransaction) -> Result<T, E>,
132        E: std::fmt::Display + std::fmt::Debug,
133    {
134        let res = callback(&self).map_err(TransactionError::Transaction);
135        if res.is_ok() {
136            self.commit().map_err(TransactionError::Connection)?;
137        } else {
138            self.rollback().map_err(TransactionError::Connection)?;
139        }
140        res
141    }
142
143    /// Commit a transaction
144    #[instrument(level = "trace")]
145    #[allow(unreachable_code, unused_mut)]
146    pub fn commit(mut self) -> Result<(), DbErr> {
147        let result: Result<(), DbErr> = super::tracing_spans::with_db_span!(
148            "sea_orm.commit",
149            self.backend,
150            "COMMIT",
151            record_stmt = false,
152            {
153                #[cfg(not(feature = "sync"))]
154                let conn = &mut *self.conn.lock();
155                #[cfg(feature = "sync")]
156                let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
157
158                match conn {
159                    #[cfg(feature = "sqlx-mysql")]
160                    InnerConnection::MySql(c) => {
161                        <sqlx::MySql as sqlx::Database>::TransactionManager::commit(c)
162                            .map_err(sqlx_error_to_query_err)
163                    }
164                    #[cfg(feature = "sqlx-postgres")]
165                    InnerConnection::Postgres(c) => {
166                        <sqlx::Postgres as sqlx::Database>::TransactionManager::commit(c)
167                            .map_err(sqlx_error_to_query_err)
168                    }
169                    #[cfg(feature = "sqlx-sqlite")]
170                    InnerConnection::Sqlite(c) => {
171                        <sqlx::Sqlite as sqlx::Database>::TransactionManager::commit(c)
172                            .map_err(sqlx_error_to_query_err)
173                    }
174                    #[cfg(feature = "rusqlite")]
175                    InnerConnection::Rusqlite(c) => c.commit(),
176                    #[cfg(feature = "mock")]
177                    InnerConnection::Mock(c) => {
178                        c.commit();
179                        Ok(())
180                    }
181                    #[cfg(feature = "proxy")]
182                    InnerConnection::Proxy(c) => {
183                        c.commit();
184                        Ok(())
185                    }
186                    #[allow(unreachable_patterns)]
187                    _ => Err(conn_err("Disconnected")),
188                }
189            }
190        );
191
192        result?;
193        self.open = false; // read by start_rollback
194        Ok(())
195    }
196
197    /// Rolls back a transaction explicitly
198    #[instrument(level = "trace")]
199    #[allow(unreachable_code, unused_mut)]
200    pub fn rollback(mut self) -> Result<(), DbErr> {
201        let result: Result<(), DbErr> = super::tracing_spans::with_db_span!(
202            "sea_orm.rollback",
203            self.backend,
204            "ROLLBACK",
205            record_stmt = false,
206            {
207                #[cfg(not(feature = "sync"))]
208                let conn = &mut *self.conn.lock();
209                #[cfg(feature = "sync")]
210                let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
211
212                match conn {
213                    #[cfg(feature = "sqlx-mysql")]
214                    InnerConnection::MySql(c) => {
215                        <sqlx::MySql as sqlx::Database>::TransactionManager::rollback(c)
216                            .map_err(sqlx_error_to_query_err)
217                    }
218                    #[cfg(feature = "sqlx-postgres")]
219                    InnerConnection::Postgres(c) => {
220                        <sqlx::Postgres as sqlx::Database>::TransactionManager::rollback(c)
221                            .map_err(sqlx_error_to_query_err)
222                    }
223                    #[cfg(feature = "sqlx-sqlite")]
224                    InnerConnection::Sqlite(c) => {
225                        <sqlx::Sqlite as sqlx::Database>::TransactionManager::rollback(c)
226                            .map_err(sqlx_error_to_query_err)
227                    }
228                    #[cfg(feature = "rusqlite")]
229                    InnerConnection::Rusqlite(c) => c.rollback(),
230                    #[cfg(feature = "mock")]
231                    InnerConnection::Mock(c) => {
232                        c.rollback();
233                        Ok(())
234                    }
235                    #[cfg(feature = "proxy")]
236                    InnerConnection::Proxy(c) => {
237                        c.rollback();
238                        Ok(())
239                    }
240                    #[allow(unreachable_patterns)]
241                    _ => Err(conn_err("Disconnected")),
242                }
243            }
244        );
245
246        result?;
247        self.open = false; // read by start_rollback
248        Ok(())
249    }
250
251    // the rollback is queued and will be performed on next operation, like returning the connection to the pool
252    #[instrument(level = "trace")]
253    fn start_rollback(&mut self) -> Result<(), DbErr> {
254        if self.open {
255            if let Some(mut conn) = self.conn.try_lock().ok() {
256                match &mut *conn {
257                    #[cfg(feature = "sqlx-mysql")]
258                    InnerConnection::MySql(c) => {
259                        <sqlx::MySql as sqlx::Database>::TransactionManager::start_rollback(c);
260                    }
261                    #[cfg(feature = "sqlx-postgres")]
262                    InnerConnection::Postgres(c) => {
263                        <sqlx::Postgres as sqlx::Database>::TransactionManager::start_rollback(c);
264                    }
265                    #[cfg(feature = "sqlx-sqlite")]
266                    InnerConnection::Sqlite(c) => {
267                        <sqlx::Sqlite as sqlx::Database>::TransactionManager::start_rollback(c);
268                    }
269                    #[cfg(feature = "rusqlite")]
270                    InnerConnection::Rusqlite(c) => {
271                        c.start_rollback()?;
272                    }
273                    #[cfg(feature = "mock")]
274                    InnerConnection::Mock(c) => {
275                        c.rollback();
276                    }
277                    #[cfg(feature = "proxy")]
278                    InnerConnection::Proxy(c) => {
279                        c.start_rollback();
280                    }
281                    #[allow(unreachable_patterns)]
282                    _ => return Err(conn_err("Disconnected")),
283                }
284            } else {
285                //this should never happen
286                return Err(conn_err("Dropping a locked Transaction"));
287            }
288        }
289        Ok(())
290    }
291}
292
293impl TransactionSession for DatabaseTransaction {
294    fn commit(self) -> Result<(), DbErr> {
295        self.commit()
296    }
297
298    fn rollback(self) -> Result<(), DbErr> {
299        self.rollback()
300    }
301}
302
303impl Drop for DatabaseTransaction {
304    fn drop(&mut self) {
305        self.start_rollback().expect("Fail to rollback transaction");
306    }
307}
308
309impl ConnectionTrait for DatabaseTransaction {
310    fn get_database_backend(&self) -> DbBackend {
311        // this way we don't need to lock just to know the backend
312        self.backend
313    }
314
315    #[instrument(level = "trace")]
316    #[allow(unused_variables)]
317    fn execute_raw(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
318        debug_print!("{}", stmt);
319
320        super::tracing_spans::with_db_span!(
321            "sea_orm.execute",
322            self.backend,
323            stmt.sql.as_str(),
324            record_stmt = true,
325            {
326                #[cfg(not(feature = "sync"))]
327                let conn = &mut *self.conn.lock();
328                #[cfg(feature = "sync")]
329                let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
330
331                match conn {
332                    #[cfg(feature = "sqlx-mysql")]
333                    InnerConnection::MySql(conn) => {
334                        let query = crate::driver::sqlx_mysql::sqlx_query(&stmt);
335                        let conn: &mut sqlx::MySqlConnection = &mut *conn;
336                        crate::metric::metric!(self.metric_callback, &stmt, {
337                            query.execute(conn).map(Into::into)
338                        })
339                        .map_err(sqlx_error_to_exec_err)
340                    }
341                    #[cfg(feature = "sqlx-postgres")]
342                    InnerConnection::Postgres(conn) => {
343                        let query = crate::driver::sqlx_postgres::sqlx_query(&stmt);
344                        let conn: &mut sqlx::PgConnection = &mut *conn;
345                        crate::metric::metric!(self.metric_callback, &stmt, {
346                            query.execute(conn).map(Into::into)
347                        })
348                        .map_err(sqlx_error_to_exec_err)
349                    }
350                    #[cfg(feature = "sqlx-sqlite")]
351                    InnerConnection::Sqlite(conn) => {
352                        let query = crate::driver::sqlx_sqlite::sqlx_query(&stmt);
353                        let conn: &mut sqlx::SqliteConnection = &mut *conn;
354                        crate::metric::metric!(self.metric_callback, &stmt, {
355                            query.execute(conn).map(Into::into)
356                        })
357                        .map_err(sqlx_error_to_exec_err)
358                    }
359                    #[cfg(feature = "rusqlite")]
360                    InnerConnection::Rusqlite(conn) => conn.execute(stmt, &self.metric_callback),
361                    #[cfg(feature = "mock")]
362                    InnerConnection::Mock(conn) => conn.execute(stmt),
363                    #[cfg(feature = "proxy")]
364                    InnerConnection::Proxy(conn) => conn.execute(stmt),
365                    #[allow(unreachable_patterns)]
366                    _ => Err(conn_err("Disconnected")),
367                }
368            }
369        )
370    }
371
372    #[instrument(level = "trace")]
373    #[allow(unused_variables)]
374    fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
375        debug_print!("{}", sql);
376
377        super::tracing_spans::with_db_span!(
378            "sea_orm.execute_unprepared",
379            self.backend,
380            sql,
381            record_stmt = false,
382            {
383                #[cfg(not(feature = "sync"))]
384                let conn = &mut *self.conn.lock();
385                #[cfg(feature = "sync")]
386                let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
387
388                match conn {
389                    #[cfg(feature = "sqlx-mysql")]
390                    InnerConnection::MySql(conn) => {
391                        let conn: &mut sqlx::MySqlConnection = &mut *conn;
392                        sqlx::Executor::execute(conn, sql)
393                            .map(Into::into)
394                            .map_err(sqlx_error_to_exec_err)
395                    }
396                    #[cfg(feature = "sqlx-postgres")]
397                    InnerConnection::Postgres(conn) => {
398                        let conn: &mut sqlx::PgConnection = &mut *conn;
399                        sqlx::Executor::execute(conn, sql)
400                            .map(Into::into)
401                            .map_err(sqlx_error_to_exec_err)
402                    }
403                    #[cfg(feature = "sqlx-sqlite")]
404                    InnerConnection::Sqlite(conn) => {
405                        let conn: &mut sqlx::SqliteConnection = &mut *conn;
406                        sqlx::Executor::execute(conn, sql)
407                            .map(Into::into)
408                            .map_err(sqlx_error_to_exec_err)
409                    }
410                    #[cfg(feature = "rusqlite")]
411                    InnerConnection::Rusqlite(conn) => conn.execute_unprepared(sql),
412                    #[cfg(feature = "mock")]
413                    InnerConnection::Mock(conn) => {
414                        let db_backend = conn.get_database_backend();
415                        let stmt = Statement::from_string(db_backend, sql);
416                        conn.execute(stmt)
417                    }
418                    #[cfg(feature = "proxy")]
419                    InnerConnection::Proxy(conn) => {
420                        let db_backend = conn.get_database_backend();
421                        let stmt = Statement::from_string(db_backend, sql);
422                        conn.execute(stmt)
423                    }
424                    #[allow(unreachable_patterns)]
425                    _ => Err(conn_err("Disconnected")),
426                }
427            }
428        )
429    }
430
431    #[instrument(level = "trace")]
432    #[allow(unused_variables)]
433    fn query_one_raw(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
434        debug_print!("{}", stmt);
435
436        super::tracing_spans::with_db_span!(
437            "sea_orm.query_one",
438            self.backend,
439            stmt.sql.as_str(),
440            record_stmt = true,
441            {
442                #[cfg(not(feature = "sync"))]
443                let conn = &mut *self.conn.lock();
444                #[cfg(feature = "sync")]
445                let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
446
447                match conn {
448                    #[cfg(feature = "sqlx-mysql")]
449                    InnerConnection::MySql(conn) => {
450                        let query = crate::driver::sqlx_mysql::sqlx_query(&stmt);
451                        let conn: &mut sqlx::MySqlConnection = &mut *conn;
452                        crate::metric::metric!(self.metric_callback, &stmt, {
453                            crate::sqlx_map_err_ignore_not_found(
454                                query.fetch_one(conn).map(|row| Some(row.into())),
455                            )
456                        })
457                    }
458                    #[cfg(feature = "sqlx-postgres")]
459                    InnerConnection::Postgres(conn) => {
460                        let query = crate::driver::sqlx_postgres::sqlx_query(&stmt);
461                        let conn: &mut sqlx::PgConnection = &mut *conn;
462                        crate::metric::metric!(self.metric_callback, &stmt, {
463                            crate::sqlx_map_err_ignore_not_found(
464                                query.fetch_one(conn).map(|row| Some(row.into())),
465                            )
466                        })
467                    }
468                    #[cfg(feature = "sqlx-sqlite")]
469                    InnerConnection::Sqlite(conn) => {
470                        let query = crate::driver::sqlx_sqlite::sqlx_query(&stmt);
471                        let conn: &mut sqlx::SqliteConnection = &mut *conn;
472                        crate::metric::metric!(self.metric_callback, &stmt, {
473                            crate::sqlx_map_err_ignore_not_found(
474                                query.fetch_one(conn).map(|row| Some(row.into())),
475                            )
476                        })
477                    }
478                    #[cfg(feature = "rusqlite")]
479                    InnerConnection::Rusqlite(conn) => conn.query_one(stmt, &self.metric_callback),
480                    #[cfg(feature = "mock")]
481                    InnerConnection::Mock(conn) => conn.query_one(stmt),
482                    #[cfg(feature = "proxy")]
483                    InnerConnection::Proxy(conn) => conn.query_one(stmt),
484                    #[allow(unreachable_patterns)]
485                    _ => Err(conn_err("Disconnected")),
486                }
487            }
488        )
489    }
490
491    #[instrument(level = "trace")]
492    #[allow(unused_variables)]
493    fn query_all_raw(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
494        debug_print!("{}", stmt);
495
496        super::tracing_spans::with_db_span!(
497            "sea_orm.query_all",
498            self.backend,
499            stmt.sql.as_str(),
500            record_stmt = true,
501            {
502                #[cfg(not(feature = "sync"))]
503                let conn = &mut *self.conn.lock();
504                #[cfg(feature = "sync")]
505                let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
506
507                match conn {
508                    #[cfg(feature = "sqlx-mysql")]
509                    InnerConnection::MySql(conn) => {
510                        let query = crate::driver::sqlx_mysql::sqlx_query(&stmt);
511                        let conn: &mut sqlx::MySqlConnection = &mut *conn;
512                        crate::metric::metric!(self.metric_callback, &stmt, {
513                            query
514                                .fetch_all(conn)
515                                .map(|rows| rows.into_iter().map(|r| r.into()).collect())
516                                .map_err(sqlx_error_to_query_err)
517                        })
518                    }
519                    #[cfg(feature = "sqlx-postgres")]
520                    InnerConnection::Postgres(conn) => {
521                        let query = crate::driver::sqlx_postgres::sqlx_query(&stmt);
522                        let conn: &mut sqlx::PgConnection = &mut *conn;
523                        crate::metric::metric!(self.metric_callback, &stmt, {
524                            query
525                                .fetch_all(conn)
526                                .map(|rows| rows.into_iter().map(|r| r.into()).collect())
527                                .map_err(sqlx_error_to_query_err)
528                        })
529                    }
530                    #[cfg(feature = "sqlx-sqlite")]
531                    InnerConnection::Sqlite(conn) => {
532                        let query = crate::driver::sqlx_sqlite::sqlx_query(&stmt);
533                        let conn: &mut sqlx::SqliteConnection = &mut *conn;
534                        crate::metric::metric!(self.metric_callback, &stmt, {
535                            query
536                                .fetch_all(conn)
537                                .map(|rows| rows.into_iter().map(|r| r.into()).collect())
538                                .map_err(sqlx_error_to_query_err)
539                        })
540                    }
541                    #[cfg(feature = "rusqlite")]
542                    InnerConnection::Rusqlite(conn) => conn.query_all(stmt, &self.metric_callback),
543                    #[cfg(feature = "mock")]
544                    InnerConnection::Mock(conn) => conn.query_all(stmt),
545                    #[cfg(feature = "proxy")]
546                    InnerConnection::Proxy(conn) => conn.query_all(stmt),
547                    #[allow(unreachable_patterns)]
548                    _ => Err(conn_err("Disconnected")),
549                }
550            }
551        )
552    }
553}
554
555impl StreamTrait for DatabaseTransaction {
556    type Stream<'a> = TransactionStream<'a>;
557
558    fn get_database_backend(&self) -> DbBackend {
559        self.backend
560    }
561
562    #[instrument(level = "trace")]
563    fn stream_raw<'a>(&'a self, stmt: Statement) -> Result<Self::Stream<'a>, DbErr> {
564        ({
565            #[cfg(not(feature = "sync"))]
566            let conn = self.conn.lock();
567            #[cfg(feature = "sync")]
568            let conn = self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
569            Ok(crate::TransactionStream::build(
570                conn,
571                stmt,
572                self.metric_callback.clone(),
573            ))
574        })
575    }
576}
577
578impl TransactionTrait for DatabaseTransaction {
579    type Transaction = DatabaseTransaction;
580
581    #[instrument(level = "trace")]
582    fn begin(&self) -> Result<DatabaseTransaction, DbErr> {
583        DatabaseTransaction::begin(
584            Arc::clone(&self.conn),
585            self.backend,
586            self.metric_callback.clone(),
587            None,
588            None,
589            None,
590        )
591    }
592
593    #[instrument(level = "trace")]
594    fn begin_with_config(
595        &self,
596        isolation_level: Option<IsolationLevel>,
597        access_mode: Option<AccessMode>,
598    ) -> Result<DatabaseTransaction, DbErr> {
599        DatabaseTransaction::begin(
600            Arc::clone(&self.conn),
601            self.backend,
602            self.metric_callback.clone(),
603            isolation_level,
604            access_mode,
605            None,
606        )
607    }
608
609    #[instrument(level = "trace")]
610    fn begin_with_options(
611        &self,
612        options: TransactionOptions,
613    ) -> Result<DatabaseTransaction, DbErr> {
614        DatabaseTransaction::begin(
615            Arc::clone(&self.conn),
616            self.backend,
617            self.metric_callback.clone(),
618            options.isolation_level,
619            options.access_mode,
620            options.sqlite_transaction_mode,
621        )
622    }
623
624    /// Execute the function inside a transaction.
625    /// If the function returns an error, the transaction will be rolled back.
626    /// Otherwise, the transaction will be committed.
627    #[instrument(level = "trace", skip(_callback))]
628    fn transaction<F, T, E>(&self, _callback: F) -> Result<T, TransactionError<E>>
629    where
630        F: for<'c> FnOnce(&'c DatabaseTransaction) -> Result<T, E>,
631        E: std::fmt::Display + std::fmt::Debug,
632    {
633        let transaction = self.begin().map_err(TransactionError::Connection)?;
634        transaction.run(_callback)
635    }
636
637    /// Execute the function inside a transaction.
638    /// If the function returns an error, the transaction will be rolled back.
639    /// Otherwise, the transaction will be committed.
640    #[instrument(level = "trace", skip(_callback))]
641    fn transaction_with_config<F, T, E>(
642        &self,
643        _callback: F,
644        isolation_level: Option<IsolationLevel>,
645        access_mode: Option<AccessMode>,
646    ) -> Result<T, TransactionError<E>>
647    where
648        F: for<'c> FnOnce(&'c DatabaseTransaction) -> Result<T, E>,
649        E: std::fmt::Display + std::fmt::Debug,
650    {
651        let transaction = self
652            .begin_with_config(isolation_level, access_mode)
653            .map_err(TransactionError::Connection)?;
654        transaction.run(_callback)
655    }
656}
657
658/// Defines errors for handling transaction failures
659#[derive(Debug)]
660pub enum TransactionError<E> {
661    /// A Database connection error
662    Connection(DbErr),
663    /// An error occurring when doing database transactions
664    Transaction(E),
665}
666
667impl<E> std::fmt::Display for TransactionError<E>
668where
669    E: std::fmt::Display + std::fmt::Debug,
670{
671    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
672        match self {
673            TransactionError::Connection(e) => std::fmt::Display::fmt(e, f),
674            TransactionError::Transaction(e) => std::fmt::Display::fmt(e, f),
675        }
676    }
677}
678
679impl<E> std::error::Error for TransactionError<E> where E: std::fmt::Display + std::fmt::Debug {}
680
681impl<E> From<DbErr> for TransactionError<E>
682where
683    E: std::fmt::Display + std::fmt::Debug,
684{
685    fn from(e: DbErr) -> Self {
686        Self::Connection(e)
687    }
688}