Skip to main content

sea_orm/database/
transaction.rs

1#![allow(unused_assignments)]
2use std::sync::Arc;
3
4#[cfg(feature = "sqlx-sqlite")]
5use sqlx_core::sql_str::SqlSafeStr;
6#[cfg(feature = "sqlx-dep")]
7use sqlx_core::transaction::TransactionManager;
8use std::sync::Mutex;
9use tracing::instrument;
10
11use crate::{
12    AccessMode, ConnectionTrait, DbBackend, DbErr, ExecResult, InnerConnection, IsolationLevel,
13    QueryResult, SqliteTransactionMode, Statement, StreamTrait, TransactionOptions,
14    TransactionSession, TransactionStream, TransactionTrait, debug_print, error::*,
15};
16#[cfg(feature = "sqlx-dep")]
17use crate::{sqlx_error_to_exec_err, sqlx_error_to_query_err};
18
19/// Defines a database transaction, whether it is an open transaction and the type of
20/// backend to use.
21/// Under the hood, a Transaction is just a wrapper for a connection where
22/// START TRANSACTION has been executed.
23pub struct DatabaseTransaction {
24    conn: Arc<Mutex<InnerConnection>>,
25    backend: DbBackend,
26    open: bool,
27    metric_callback: Option<crate::metric::Callback>,
28    record_stmt_in_spans: bool,
29}
30
31#[instrument(level = "trace", skip(transaction, callback))]
32pub(crate) fn run_async_transaction_callback<Txn, F, T, E>(
33    transaction: Txn,
34    callback: F,
35) -> Result<T, TransactionError<E>>
36where
37    Txn: TransactionSession,
38    F: for<'b> FnOnce(&'b Txn) -> Result<T, E>,
39    E: std::fmt::Display + std::fmt::Debug,
40{
41    let res = callback(&transaction).map_err(TransactionError::Transaction);
42    if res.is_ok() {
43        transaction.commit().map_err(TransactionError::Connection)?;
44    } else {
45        transaction
46            .rollback()
47            .map_err(TransactionError::Connection)?;
48    }
49    res
50}
51
52impl std::fmt::Debug for DatabaseTransaction {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        write!(f, "DatabaseTransaction")
55    }
56}
57
58impl DatabaseTransaction {
59    #[instrument(level = "trace", skip(metric_callback))]
60    pub(crate) fn begin(
61        conn: Arc<Mutex<InnerConnection>>,
62        backend: DbBackend,
63        metric_callback: Option<crate::metric::Callback>,
64        record_stmt_in_spans: bool,
65        isolation_level: Option<IsolationLevel>,
66        access_mode: Option<AccessMode>,
67        sqlite_transaction_mode: Option<SqliteTransactionMode>,
68    ) -> Result<DatabaseTransaction, DbErr> {
69        let res = DatabaseTransaction {
70            conn,
71            backend,
72            open: true,
73            metric_callback,
74            record_stmt_in_spans,
75        };
76
77        let begin_result: Result<(), DbErr> = super::tracing_spans::with_db_span!(
78            "sea_orm.begin",
79            backend,
80            "BEGIN",
81            record_stmt = false,
82            {
83                #[cfg(not(feature = "sync"))]
84                let conn = &mut *res.conn.lock();
85                #[cfg(feature = "sync")]
86                let conn = &mut *res.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
87
88                match conn {
89                    #[cfg(feature = "sqlx-mysql")]
90                    InnerConnection::MySql(c) => {
91                        // in MySQL SET TRANSACTION operations must be executed before transaction start
92                        crate::driver::sqlx_mysql::set_transaction_config(
93                            c,
94                            isolation_level,
95                            access_mode,
96                        )?;
97                        <sqlx::MySql as sqlx::Database>::TransactionManager::begin(c, None)
98                            .map_err(sqlx_error_to_query_err)
99                    }
100                    #[cfg(feature = "sqlx-postgres")]
101                    InnerConnection::Postgres(c) => {
102                        <sqlx::Postgres as sqlx::Database>::TransactionManager::begin(c, None)
103                            .map_err(sqlx_error_to_query_err)?;
104                        // in PostgreSQL SET TRANSACTION operations must be executed inside transaction
105                        crate::driver::sqlx_postgres::set_transaction_config(
106                            c,
107                            isolation_level,
108                            access_mode,
109                        )
110                    }
111                    #[cfg(feature = "sqlx-sqlite")]
112                    InnerConnection::Sqlite(c) => {
113                        crate::driver::sqlx_sqlite::set_transaction_config(
114                            c,
115                            isolation_level,
116                            access_mode,
117                        )?;
118                        let depth = <sqlx::Sqlite as sqlx::Database>::TransactionManager::get_transaction_depth(c);
119                        let statement = if depth == 0 {
120                            sqlite_transaction_mode.map(|mode| {
121                                sqlx::AssertSqlSafe(format!("BEGIN {}", mode.sqlite_keyword()))
122                                    .into_sql_str()
123                            })
124                        } else {
125                            // Nested transaction uses SAVEPOINT; the mode only applies to the top-level BEGIN
126                            None
127                        };
128                        <sqlx::Sqlite as sqlx::Database>::TransactionManager::begin(c, statement)
129                            .map_err(sqlx_error_to_query_err)
130                    }
131                    #[cfg(feature = "rusqlite")]
132                    InnerConnection::Rusqlite(c) => c.begin(sqlite_transaction_mode),
133                    #[cfg(feature = "mock")]
134                    InnerConnection::Mock(c) => {
135                        c.begin();
136                        Ok(())
137                    }
138                    #[cfg(feature = "proxy")]
139                    InnerConnection::Proxy(c) => {
140                        c.begin();
141                        Ok(())
142                    }
143                    #[allow(unreachable_patterns)]
144                    _ => Err(conn_err("Disconnected")),
145                }
146            }
147        );
148
149        begin_result?;
150        Ok(res)
151    }
152
153    /// Runs a transaction to completion passing through the result.
154    /// Rolling back the transaction on encountering an error.
155    #[instrument(level = "trace", skip(callback))]
156    pub(crate) fn run<F, T, E>(self, callback: F) -> Result<T, TransactionError<E>>
157    where
158        F: for<'b> FnOnce(&'b DatabaseTransaction) -> Result<T, E>,
159        E: std::fmt::Display + std::fmt::Debug,
160    {
161        let res = callback(&self).map_err(TransactionError::Transaction);
162        if res.is_ok() {
163            self.commit().map_err(TransactionError::Connection)?;
164        } else {
165            self.rollback().map_err(TransactionError::Connection)?;
166        }
167        res
168    }
169
170    /// Execute the function inside a transaction.
171    /// If the function returns an error, the transaction will be rolled back.
172    /// Otherwise, the transaction will be committed.
173    #[instrument(level = "trace", skip(callback))]
174    pub fn transaction<F, T, E>(&self, callback: F) -> Result<T, TransactionError<E>>
175    where
176        F: for<'c> FnOnce(&'c DatabaseTransaction) -> Result<T, E>,
177        E: std::fmt::Display + std::fmt::Debug,
178    {
179        let transaction = self.begin().map_err(TransactionError::Connection)?;
180        run_async_transaction_callback(transaction, callback)
181    }
182
183    /// Execute the function inside a transaction with isolation level and/or access mode.
184    /// If the function returns an error, the transaction will be rolled back.
185    /// Otherwise, the transaction will be committed.
186    #[instrument(level = "trace", skip(callback))]
187    pub fn transaction_with_config<F, T, E>(
188        &self,
189        callback: F,
190        isolation_level: Option<IsolationLevel>,
191        access_mode: Option<AccessMode>,
192    ) -> Result<T, TransactionError<E>>
193    where
194        F: for<'c> FnOnce(&'c DatabaseTransaction) -> Result<T, E>,
195        E: std::fmt::Display + std::fmt::Debug,
196    {
197        let transaction = self
198            .begin_with_config(isolation_level, access_mode)
199            .map_err(TransactionError::Connection)?;
200        run_async_transaction_callback(transaction, callback)
201    }
202
203    /// Commit a transaction
204    #[instrument(level = "trace")]
205    #[allow(unreachable_code, unused_mut)]
206    pub fn commit(mut self) -> Result<(), DbErr> {
207        let result: Result<(), DbErr> = super::tracing_spans::with_db_span!(
208            "sea_orm.commit",
209            self.backend,
210            "COMMIT",
211            record_stmt = false,
212            {
213                #[cfg(not(feature = "sync"))]
214                let conn = &mut *self.conn.lock();
215                #[cfg(feature = "sync")]
216                let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
217
218                match conn {
219                    #[cfg(feature = "sqlx-mysql")]
220                    InnerConnection::MySql(c) => {
221                        <sqlx::MySql as sqlx::Database>::TransactionManager::commit(c)
222                            .map_err(sqlx_error_to_query_err)
223                    }
224                    #[cfg(feature = "sqlx-postgres")]
225                    InnerConnection::Postgres(c) => {
226                        <sqlx::Postgres as sqlx::Database>::TransactionManager::commit(c)
227                            .map_err(sqlx_error_to_query_err)
228                    }
229                    #[cfg(feature = "sqlx-sqlite")]
230                    InnerConnection::Sqlite(c) => {
231                        <sqlx::Sqlite as sqlx::Database>::TransactionManager::commit(c)
232                            .map_err(sqlx_error_to_query_err)
233                    }
234                    #[cfg(feature = "rusqlite")]
235                    InnerConnection::Rusqlite(c) => c.commit(),
236                    #[cfg(feature = "mock")]
237                    InnerConnection::Mock(c) => {
238                        c.commit();
239                        Ok(())
240                    }
241                    #[cfg(feature = "proxy")]
242                    InnerConnection::Proxy(c) => {
243                        c.commit();
244                        Ok(())
245                    }
246                    #[allow(unreachable_patterns)]
247                    _ => Err(conn_err("Disconnected")),
248                }
249            }
250        );
251
252        result?;
253        self.open = false; // read by start_rollback
254        Ok(())
255    }
256
257    /// Rolls back a transaction explicitly
258    #[instrument(level = "trace")]
259    #[allow(unreachable_code, unused_mut)]
260    pub fn rollback(mut self) -> Result<(), DbErr> {
261        let result: Result<(), DbErr> = super::tracing_spans::with_db_span!(
262            "sea_orm.rollback",
263            self.backend,
264            "ROLLBACK",
265            record_stmt = false,
266            {
267                #[cfg(not(feature = "sync"))]
268                let conn = &mut *self.conn.lock();
269                #[cfg(feature = "sync")]
270                let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
271
272                match conn {
273                    #[cfg(feature = "sqlx-mysql")]
274                    InnerConnection::MySql(c) => {
275                        <sqlx::MySql as sqlx::Database>::TransactionManager::rollback(c)
276                            .map_err(sqlx_error_to_query_err)
277                    }
278                    #[cfg(feature = "sqlx-postgres")]
279                    InnerConnection::Postgres(c) => {
280                        <sqlx::Postgres as sqlx::Database>::TransactionManager::rollback(c)
281                            .map_err(sqlx_error_to_query_err)
282                    }
283                    #[cfg(feature = "sqlx-sqlite")]
284                    InnerConnection::Sqlite(c) => {
285                        <sqlx::Sqlite as sqlx::Database>::TransactionManager::rollback(c)
286                            .map_err(sqlx_error_to_query_err)
287                    }
288                    #[cfg(feature = "rusqlite")]
289                    InnerConnection::Rusqlite(c) => c.rollback(),
290                    #[cfg(feature = "mock")]
291                    InnerConnection::Mock(c) => {
292                        c.rollback();
293                        Ok(())
294                    }
295                    #[cfg(feature = "proxy")]
296                    InnerConnection::Proxy(c) => {
297                        c.rollback();
298                        Ok(())
299                    }
300                    #[allow(unreachable_patterns)]
301                    _ => Err(conn_err("Disconnected")),
302                }
303            }
304        );
305
306        result?;
307        self.open = false; // read by start_rollback
308        Ok(())
309    }
310
311    // the rollback is queued and will be performed on next operation, like returning the connection to the pool
312    #[instrument(level = "trace")]
313    fn start_rollback(&mut self) -> Result<(), DbErr> {
314        if self.open {
315            if let Some(mut conn) = self.conn.try_lock().ok() {
316                match &mut *conn {
317                    #[cfg(feature = "sqlx-mysql")]
318                    InnerConnection::MySql(c) => {
319                        <sqlx::MySql as sqlx::Database>::TransactionManager::start_rollback(c);
320                    }
321                    #[cfg(feature = "sqlx-postgres")]
322                    InnerConnection::Postgres(c) => {
323                        <sqlx::Postgres as sqlx::Database>::TransactionManager::start_rollback(c);
324                    }
325                    #[cfg(feature = "sqlx-sqlite")]
326                    InnerConnection::Sqlite(c) => {
327                        <sqlx::Sqlite as sqlx::Database>::TransactionManager::start_rollback(c);
328                    }
329                    #[cfg(feature = "rusqlite")]
330                    InnerConnection::Rusqlite(c) => {
331                        c.start_rollback()?;
332                    }
333                    #[cfg(feature = "mock")]
334                    InnerConnection::Mock(c) => {
335                        c.rollback();
336                    }
337                    #[cfg(feature = "proxy")]
338                    InnerConnection::Proxy(c) => {
339                        c.start_rollback();
340                    }
341                    #[allow(unreachable_patterns)]
342                    _ => return Err(conn_err("Disconnected")),
343                }
344            } else {
345                //this should never happen
346                return Err(conn_err("Dropping a locked Transaction"));
347            }
348        }
349        Ok(())
350    }
351}
352
353impl TransactionSession for DatabaseTransaction {
354    fn commit(self) -> Result<(), DbErr> {
355        self.commit()
356    }
357
358    fn rollback(self) -> Result<(), DbErr> {
359        self.rollback()
360    }
361}
362
363impl Drop for DatabaseTransaction {
364    fn drop(&mut self) {
365        self.start_rollback().expect("Fail to rollback transaction");
366    }
367}
368
369impl ConnectionTrait for DatabaseTransaction {
370    fn get_database_backend(&self) -> DbBackend {
371        // this way we don't need to lock just to know the backend
372        self.backend
373    }
374
375    #[instrument(level = "trace")]
376    #[allow(unused_variables)]
377    fn execute_raw(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
378        debug_print!("{}", stmt);
379
380        super::tracing_spans::with_db_span!(
381            "sea_orm.execute",
382            self.backend,
383            stmt.sql.as_str(),
384            record_stmt = self.record_stmt_in_spans,
385            {
386                #[cfg(not(feature = "sync"))]
387                let conn = &mut *self.conn.lock();
388                #[cfg(feature = "sync")]
389                let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
390
391                match conn {
392                    #[cfg(feature = "sqlx-mysql")]
393                    InnerConnection::MySql(conn) => {
394                        let query = crate::driver::sqlx_mysql::sqlx_query(&stmt);
395                        let conn: &mut sqlx::MySqlConnection = &mut *conn;
396                        crate::metric::metric!(self.metric_callback, &stmt, {
397                            query.execute(conn).map(Into::into)
398                        })
399                        .map_err(sqlx_error_to_exec_err)
400                    }
401                    #[cfg(feature = "sqlx-postgres")]
402                    InnerConnection::Postgres(conn) => {
403                        let query = crate::driver::sqlx_postgres::sqlx_query(&stmt);
404                        let conn: &mut sqlx::PgConnection = &mut *conn;
405                        crate::metric::metric!(self.metric_callback, &stmt, {
406                            query.execute(conn).map(Into::into)
407                        })
408                        .map_err(sqlx_error_to_exec_err)
409                    }
410                    #[cfg(feature = "sqlx-sqlite")]
411                    InnerConnection::Sqlite(conn) => {
412                        let query = crate::driver::sqlx_sqlite::sqlx_query(&stmt);
413                        let conn: &mut sqlx::SqliteConnection = &mut *conn;
414                        crate::metric::metric!(self.metric_callback, &stmt, {
415                            query.execute(conn).map(Into::into)
416                        })
417                        .map_err(sqlx_error_to_exec_err)
418                    }
419                    #[cfg(feature = "rusqlite")]
420                    InnerConnection::Rusqlite(conn) => conn.execute(stmt, &self.metric_callback),
421                    #[cfg(feature = "mock")]
422                    InnerConnection::Mock(conn) => conn.execute(stmt),
423                    #[cfg(feature = "proxy")]
424                    InnerConnection::Proxy(conn) => conn.execute(stmt),
425                    #[allow(unreachable_patterns)]
426                    _ => Err(conn_err("Disconnected")),
427                }
428            }
429        )
430    }
431
432    #[instrument(level = "trace")]
433    #[allow(unused_variables)]
434    fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
435        debug_print!("{}", sql);
436
437        super::tracing_spans::with_db_span!(
438            "sea_orm.execute_unprepared",
439            self.backend,
440            sql,
441            record_stmt = false,
442            {
443                #[cfg(not(feature = "sync"))]
444                let conn = &mut *self.conn.lock();
445                #[cfg(feature = "sync")]
446                let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
447
448                match conn {
449                    #[cfg(feature = "sqlx-mysql")]
450                    InnerConnection::MySql(conn) => {
451                        let conn: &mut sqlx::MySqlConnection = &mut *conn;
452                        sqlx::Executor::execute(conn, sqlx::AssertSqlSafe(sql.to_owned()))
453                            .map(Into::into)
454                            .map_err(sqlx_error_to_exec_err)
455                    }
456                    #[cfg(feature = "sqlx-postgres")]
457                    InnerConnection::Postgres(conn) => {
458                        let conn: &mut sqlx::PgConnection = &mut *conn;
459                        sqlx::Executor::execute(conn, sqlx::AssertSqlSafe(sql.to_owned()))
460                            .map(Into::into)
461                            .map_err(sqlx_error_to_exec_err)
462                    }
463                    #[cfg(feature = "sqlx-sqlite")]
464                    InnerConnection::Sqlite(conn) => {
465                        let conn: &mut sqlx::SqliteConnection = &mut *conn;
466                        sqlx::Executor::execute(conn, sqlx::AssertSqlSafe(sql.to_owned()))
467                            .map(Into::into)
468                            .map_err(sqlx_error_to_exec_err)
469                    }
470                    #[cfg(feature = "rusqlite")]
471                    InnerConnection::Rusqlite(conn) => conn.execute_unprepared(sql),
472                    #[cfg(feature = "mock")]
473                    InnerConnection::Mock(conn) => {
474                        let db_backend = conn.get_database_backend();
475                        let stmt = Statement::from_string(db_backend, sql);
476                        conn.execute(stmt)
477                    }
478                    #[cfg(feature = "proxy")]
479                    InnerConnection::Proxy(conn) => {
480                        let db_backend = conn.get_database_backend();
481                        let stmt = Statement::from_string(db_backend, sql);
482                        conn.execute(stmt)
483                    }
484                    #[allow(unreachable_patterns)]
485                    _ => Err(conn_err("Disconnected")),
486                }
487            }
488        )
489    }
490
491    #[instrument(level = "trace")]
492    #[allow(unused_variables)]
493    fn query_one_raw(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
494        debug_print!("{}", stmt);
495
496        super::tracing_spans::with_db_span!(
497            "sea_orm.query_one",
498            self.backend,
499            stmt.sql.as_str(),
500            record_stmt = self.record_stmt_in_spans,
501            {
502                #[cfg(not(feature = "sync"))]
503                let conn = &mut *self.conn.lock();
504                #[cfg(feature = "sync")]
505                let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
506
507                match conn {
508                    #[cfg(feature = "sqlx-mysql")]
509                    InnerConnection::MySql(conn) => {
510                        let query = crate::driver::sqlx_mysql::sqlx_query(&stmt);
511                        let conn: &mut sqlx::MySqlConnection = &mut *conn;
512                        crate::metric::metric!(self.metric_callback, &stmt, {
513                            crate::sqlx_map_err_ignore_not_found(
514                                query.fetch_one(conn).map(|row| Some(row.into())),
515                            )
516                        })
517                    }
518                    #[cfg(feature = "sqlx-postgres")]
519                    InnerConnection::Postgres(conn) => {
520                        let query = crate::driver::sqlx_postgres::sqlx_query(&stmt);
521                        let conn: &mut sqlx::PgConnection = &mut *conn;
522                        crate::metric::metric!(self.metric_callback, &stmt, {
523                            crate::sqlx_map_err_ignore_not_found(
524                                query.fetch_one(conn).map(|row| Some(row.into())),
525                            )
526                        })
527                    }
528                    #[cfg(feature = "sqlx-sqlite")]
529                    InnerConnection::Sqlite(conn) => {
530                        let query = crate::driver::sqlx_sqlite::sqlx_query(&stmt);
531                        let conn: &mut sqlx::SqliteConnection = &mut *conn;
532                        crate::metric::metric!(self.metric_callback, &stmt, {
533                            crate::sqlx_map_err_ignore_not_found(
534                                query.fetch_one(conn).map(|row| Some(row.into())),
535                            )
536                        })
537                    }
538                    #[cfg(feature = "rusqlite")]
539                    InnerConnection::Rusqlite(conn) => conn.query_one(stmt, &self.metric_callback),
540                    #[cfg(feature = "mock")]
541                    InnerConnection::Mock(conn) => conn.query_one(stmt),
542                    #[cfg(feature = "proxy")]
543                    InnerConnection::Proxy(conn) => conn.query_one(stmt),
544                    #[allow(unreachable_patterns)]
545                    _ => Err(conn_err("Disconnected")),
546                }
547            }
548        )
549    }
550
551    #[instrument(level = "trace")]
552    #[allow(unused_variables)]
553    fn query_all_raw(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
554        debug_print!("{}", stmt);
555
556        super::tracing_spans::with_db_span!(
557            "sea_orm.query_all",
558            self.backend,
559            stmt.sql.as_str(),
560            record_stmt = self.record_stmt_in_spans,
561            {
562                #[cfg(not(feature = "sync"))]
563                let conn = &mut *self.conn.lock();
564                #[cfg(feature = "sync")]
565                let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
566
567                match conn {
568                    #[cfg(feature = "sqlx-mysql")]
569                    InnerConnection::MySql(conn) => {
570                        let query = crate::driver::sqlx_mysql::sqlx_query(&stmt);
571                        let conn: &mut sqlx::MySqlConnection = &mut *conn;
572                        crate::metric::metric!(self.metric_callback, &stmt, {
573                            query
574                                .fetch_all(conn)
575                                .map(|rows| rows.into_iter().map(|r| r.into()).collect())
576                                .map_err(sqlx_error_to_query_err)
577                        })
578                    }
579                    #[cfg(feature = "sqlx-postgres")]
580                    InnerConnection::Postgres(conn) => {
581                        let query = crate::driver::sqlx_postgres::sqlx_query(&stmt);
582                        let conn: &mut sqlx::PgConnection = &mut *conn;
583                        crate::metric::metric!(self.metric_callback, &stmt, {
584                            query
585                                .fetch_all(conn)
586                                .map(|rows| rows.into_iter().map(|r| r.into()).collect())
587                                .map_err(sqlx_error_to_query_err)
588                        })
589                    }
590                    #[cfg(feature = "sqlx-sqlite")]
591                    InnerConnection::Sqlite(conn) => {
592                        let query = crate::driver::sqlx_sqlite::sqlx_query(&stmt);
593                        let conn: &mut sqlx::SqliteConnection = &mut *conn;
594                        crate::metric::metric!(self.metric_callback, &stmt, {
595                            query
596                                .fetch_all(conn)
597                                .map(|rows| rows.into_iter().map(|r| r.into()).collect())
598                                .map_err(sqlx_error_to_query_err)
599                        })
600                    }
601                    #[cfg(feature = "rusqlite")]
602                    InnerConnection::Rusqlite(conn) => conn.query_all(stmt, &self.metric_callback),
603                    #[cfg(feature = "mock")]
604                    InnerConnection::Mock(conn) => conn.query_all(stmt),
605                    #[cfg(feature = "proxy")]
606                    InnerConnection::Proxy(conn) => conn.query_all(stmt),
607                    #[allow(unreachable_patterns)]
608                    _ => Err(conn_err("Disconnected")),
609                }
610            }
611        )
612    }
613}
614
615impl StreamTrait for DatabaseTransaction {
616    type Stream<'a> = TransactionStream<'a>;
617
618    fn get_database_backend(&self) -> DbBackend {
619        self.backend
620    }
621
622    #[instrument(level = "trace")]
623    fn stream_raw<'a>(&'a self, stmt: Statement) -> Result<Self::Stream<'a>, DbErr> {
624        ({
625            #[cfg(not(feature = "sync"))]
626            let conn = self.conn.lock();
627            #[cfg(feature = "sync")]
628            let conn = self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
629            Ok(crate::TransactionStream::build(
630                conn,
631                stmt,
632                self.metric_callback.clone(),
633            ))
634        })
635    }
636}
637
638impl TransactionTrait for DatabaseTransaction {
639    type Transaction = DatabaseTransaction;
640
641    #[instrument(level = "trace")]
642    fn begin(&self) -> Result<DatabaseTransaction, DbErr> {
643        DatabaseTransaction::begin(
644            Arc::clone(&self.conn),
645            self.backend,
646            self.metric_callback.clone(),
647            self.record_stmt_in_spans,
648            None,
649            None,
650            None,
651        )
652    }
653
654    #[instrument(level = "trace")]
655    fn begin_with_config(
656        &self,
657        isolation_level: Option<IsolationLevel>,
658        access_mode: Option<AccessMode>,
659    ) -> Result<DatabaseTransaction, DbErr> {
660        DatabaseTransaction::begin(
661            Arc::clone(&self.conn),
662            self.backend,
663            self.metric_callback.clone(),
664            self.record_stmt_in_spans,
665            isolation_level,
666            access_mode,
667            None,
668        )
669    }
670
671    #[instrument(level = "trace")]
672    fn begin_with_options(
673        &self,
674        options: TransactionOptions,
675    ) -> Result<DatabaseTransaction, DbErr> {
676        DatabaseTransaction::begin(
677            Arc::clone(&self.conn),
678            self.backend,
679            self.metric_callback.clone(),
680            self.record_stmt_in_spans,
681            options.isolation_level,
682            options.access_mode,
683            options.sqlite_transaction_mode,
684        )
685    }
686
687    /// Execute the function inside a transaction.
688    /// If the function returns an error, the transaction will be rolled back.
689    /// Otherwise, the transaction will be committed.
690    #[instrument(level = "trace", skip(_callback))]
691    fn transaction<F, T, E>(&self, _callback: F) -> Result<T, TransactionError<E>>
692    where
693        F: for<'c> FnOnce(&'c DatabaseTransaction) -> Result<T, E>,
694        E: std::fmt::Display + std::fmt::Debug,
695    {
696        let transaction = self.begin().map_err(TransactionError::Connection)?;
697        transaction.run(_callback)
698    }
699
700    /// Execute the function inside a transaction.
701    /// If the function returns an error, the transaction will be rolled back.
702    /// Otherwise, the transaction will be committed.
703    #[instrument(level = "trace", skip(_callback))]
704    fn transaction_with_config<F, T, E>(
705        &self,
706        _callback: F,
707        isolation_level: Option<IsolationLevel>,
708        access_mode: Option<AccessMode>,
709    ) -> Result<T, TransactionError<E>>
710    where
711        F: for<'c> FnOnce(&'c DatabaseTransaction) -> Result<T, E>,
712        E: std::fmt::Display + std::fmt::Debug,
713    {
714        let transaction = self
715            .begin_with_config(isolation_level, access_mode)
716            .map_err(TransactionError::Connection)?;
717        transaction.run(_callback)
718    }
719}
720
721/// Defines errors for handling transaction failures
722#[derive(Debug)]
723pub enum TransactionError<E> {
724    /// A Database connection error
725    Connection(DbErr),
726    /// An error occurring when doing database transactions
727    Transaction(E),
728}
729
730impl<E> std::fmt::Display for TransactionError<E>
731where
732    E: std::fmt::Display + std::fmt::Debug,
733{
734    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
735        match self {
736            TransactionError::Connection(e) => std::fmt::Display::fmt(e, f),
737            TransactionError::Transaction(e) => std::fmt::Display::fmt(e, f),
738        }
739    }
740}
741
742impl<E> std::error::Error for TransactionError<E> where E: std::fmt::Display + std::fmt::Debug {}
743
744impl<E> From<DbErr> for TransactionError<E>
745where
746    E: std::fmt::Display + std::fmt::Debug,
747{
748    fn from(e: DbErr) -> Self {
749        Self::Connection(e)
750    }
751}