Skip to main content

sea_orm/database/
transaction.rs

1#![allow(unused_assignments)]
2use std::sync::Arc;
3
4#[cfg(feature = "sqlx-sqlite")]
5use sqlx_core::sql_str::SqlSafeStr;
6#[cfg(feature = "sqlx-dep")]
7use sqlx_core::transaction::TransactionManager;
8use std::sync::Mutex;
9use tracing::instrument;
10
11use crate::{
12    AccessMode, ConnectionTrait, DbBackend, DbErr, ExecResult, InnerConnection, IsolationLevel,
13    QueryResult, SqliteTransactionMode, Statement, TransactionOptions, TransactionSession,
14    TransactionTrait, debug_print, error::*,
15};
16#[cfg(feature = "sqlx-dep")]
17use crate::{sqlx_error_to_exec_err, sqlx_error_to_query_err};
18
19#[cfg(feature = "stream")]
20use crate::{StreamTrait, TransactionStream};
21
22/// A live database transaction.
23///
24/// Obtain one with
25/// [`TransactionTrait::begin`](crate::TransactionTrait::begin) (manual
26/// commit/rollback) or
27/// [`TransactionTrait::transaction`](crate::TransactionTrait::transaction)
28/// (auto commit on `Ok`, auto rollback on `Err`). Like
29/// [`DatabaseConnection`](crate::DatabaseConnection) it implements
30/// [`ConnectionTrait`](crate::ConnectionTrait), so SeaORM's query and
31/// mutation methods work against it transparently. Calling `begin` on a
32/// transaction starts a nested transaction via `SAVEPOINT`.
33pub struct DatabaseTransaction {
34    conn: Arc<Mutex<InnerConnection>>,
35    backend: DbBackend,
36    open: bool,
37    metric_callback: Option<crate::metric::Callback>,
38    record_stmt_in_spans: bool,
39}
40
41#[instrument(level = "trace", skip(transaction, callback))]
42pub(crate) fn run_async_transaction_callback<Txn, F, T, E>(
43    transaction: Txn,
44    callback: F,
45) -> Result<T, TransactionError<E>>
46where
47    Txn: TransactionSession,
48    F: for<'b> FnOnce(&'b Txn) -> Result<T, E>,
49    E: std::fmt::Display + std::fmt::Debug,
50{
51    let res = callback(&transaction).map_err(TransactionError::Transaction);
52    if res.is_ok() {
53        transaction.commit().map_err(TransactionError::Connection)?;
54    } else {
55        transaction
56            .rollback()
57            .map_err(TransactionError::Connection)?;
58    }
59    res
60}
61
62impl std::fmt::Debug for DatabaseTransaction {
63    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64        write!(f, "DatabaseTransaction")
65    }
66}
67
68impl DatabaseTransaction {
69    #[instrument(level = "trace", skip(metric_callback))]
70    pub(crate) fn begin(
71        conn: Arc<Mutex<InnerConnection>>,
72        backend: DbBackend,
73        metric_callback: Option<crate::metric::Callback>,
74        record_stmt_in_spans: bool,
75        isolation_level: Option<IsolationLevel>,
76        access_mode: Option<AccessMode>,
77        sqlite_transaction_mode: Option<SqliteTransactionMode>,
78    ) -> Result<DatabaseTransaction, DbErr> {
79        let res = DatabaseTransaction {
80            conn,
81            backend,
82            open: true,
83            metric_callback,
84            record_stmt_in_spans,
85        };
86
87        let begin_result: Result<(), DbErr> = super::tracing_spans::with_db_span!(
88            "sea_orm.begin",
89            backend,
90            "BEGIN",
91            record_stmt = false,
92            {
93                #[cfg(not(feature = "sync"))]
94                let conn = &mut *res.conn.lock();
95                #[cfg(feature = "sync")]
96                let conn = &mut *res.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
97
98                match conn {
99                    #[cfg(feature = "sqlx-mysql")]
100                    InnerConnection::MySql(c) => {
101                        // in MySQL SET TRANSACTION operations must be executed before transaction start
102                        crate::driver::sqlx_mysql::set_transaction_config(
103                            c,
104                            isolation_level,
105                            access_mode,
106                        )?;
107                        <sqlx::MySql as sqlx::Database>::TransactionManager::begin(c, None)
108                            .map_err(sqlx_error_to_query_err)
109                    }
110                    #[cfg(feature = "sqlx-postgres")]
111                    InnerConnection::Postgres(c) => {
112                        <sqlx::Postgres as sqlx::Database>::TransactionManager::begin(c, None)
113                            .map_err(sqlx_error_to_query_err)?;
114                        // in PostgreSQL SET TRANSACTION operations must be executed inside transaction
115                        crate::driver::sqlx_postgres::set_transaction_config(
116                            c,
117                            isolation_level,
118                            access_mode,
119                        )
120                    }
121                    #[cfg(feature = "sqlx-sqlite")]
122                    InnerConnection::Sqlite(c) => {
123                        crate::driver::sqlx_sqlite::set_transaction_config(
124                            c,
125                            isolation_level,
126                            access_mode,
127                        )?;
128                        let depth = <sqlx::Sqlite as sqlx::Database>::TransactionManager::get_transaction_depth(c);
129                        let statement = if depth == 0 {
130                            sqlite_transaction_mode.map(|mode| {
131                                sqlx::AssertSqlSafe(format!("BEGIN {}", mode.sqlite_keyword()))
132                                    .into_sql_str()
133                            })
134                        } else {
135                            // Nested transaction uses SAVEPOINT; the mode only applies to the top-level BEGIN
136                            None
137                        };
138                        <sqlx::Sqlite as sqlx::Database>::TransactionManager::begin(c, statement)
139                            .map_err(sqlx_error_to_query_err)
140                    }
141                    #[cfg(feature = "rusqlite")]
142                    InnerConnection::Rusqlite(c) => c.begin(sqlite_transaction_mode),
143                    #[cfg(feature = "mock")]
144                    InnerConnection::Mock(c) => {
145                        c.begin();
146                        Ok(())
147                    }
148                    #[cfg(feature = "proxy")]
149                    InnerConnection::Proxy(c) => {
150                        c.begin();
151                        Ok(())
152                    }
153                    #[allow(unreachable_patterns)]
154                    _ => Err(conn_err("Disconnected")),
155                }
156            }
157        );
158
159        begin_result?;
160        Ok(res)
161    }
162
163    /// Runs a transaction to completion passing through the result.
164    /// Rolling back the transaction on encountering an error.
165    #[instrument(level = "trace", skip(callback))]
166    pub(crate) fn run<F, T, E>(self, callback: F) -> Result<T, TransactionError<E>>
167    where
168        F: for<'b> FnOnce(&'b DatabaseTransaction) -> Result<T, E>,
169        E: std::fmt::Display + std::fmt::Debug,
170    {
171        let res = callback(&self).map_err(TransactionError::Transaction);
172        if res.is_ok() {
173            self.commit().map_err(TransactionError::Connection)?;
174        } else {
175            self.rollback().map_err(TransactionError::Connection)?;
176        }
177        res
178    }
179
180    /// Execute the function inside a transaction.
181    /// If the function returns an error, the transaction will be rolled back.
182    /// Otherwise, the transaction will be committed.
183    #[instrument(level = "trace", skip(callback))]
184    pub fn transaction<F, T, E>(&self, callback: F) -> Result<T, TransactionError<E>>
185    where
186        F: for<'c> FnOnce(&'c DatabaseTransaction) -> Result<T, E>,
187        E: std::fmt::Display + std::fmt::Debug,
188    {
189        let transaction = self.begin().map_err(TransactionError::Connection)?;
190        run_async_transaction_callback(transaction, callback)
191    }
192
193    /// Execute the function inside a transaction with isolation level and/or access mode.
194    /// If the function returns an error, the transaction will be rolled back.
195    /// Otherwise, the transaction will be committed.
196    #[instrument(level = "trace", skip(callback))]
197    pub fn transaction_with_config<F, T, E>(
198        &self,
199        callback: F,
200        isolation_level: Option<IsolationLevel>,
201        access_mode: Option<AccessMode>,
202    ) -> Result<T, TransactionError<E>>
203    where
204        F: for<'c> FnOnce(&'c DatabaseTransaction) -> Result<T, E>,
205        E: std::fmt::Display + std::fmt::Debug,
206    {
207        let transaction = self
208            .begin_with_config(isolation_level, access_mode)
209            .map_err(TransactionError::Connection)?;
210        run_async_transaction_callback(transaction, callback)
211    }
212
213    /// Commit a transaction
214    #[instrument(level = "trace")]
215    #[allow(unreachable_code, unused_mut)]
216    pub fn commit(mut self) -> Result<(), DbErr> {
217        let result: Result<(), DbErr> = super::tracing_spans::with_db_span!(
218            "sea_orm.commit",
219            self.backend,
220            "COMMIT",
221            record_stmt = false,
222            {
223                #[cfg(not(feature = "sync"))]
224                let conn = &mut *self.conn.lock();
225                #[cfg(feature = "sync")]
226                let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
227
228                match conn {
229                    #[cfg(feature = "sqlx-mysql")]
230                    InnerConnection::MySql(c) => {
231                        <sqlx::MySql as sqlx::Database>::TransactionManager::commit(c)
232                            .map_err(sqlx_error_to_query_err)
233                    }
234                    #[cfg(feature = "sqlx-postgres")]
235                    InnerConnection::Postgres(c) => {
236                        <sqlx::Postgres as sqlx::Database>::TransactionManager::commit(c)
237                            .map_err(sqlx_error_to_query_err)
238                    }
239                    #[cfg(feature = "sqlx-sqlite")]
240                    InnerConnection::Sqlite(c) => {
241                        <sqlx::Sqlite as sqlx::Database>::TransactionManager::commit(c)
242                            .map_err(sqlx_error_to_query_err)
243                    }
244                    #[cfg(feature = "rusqlite")]
245                    InnerConnection::Rusqlite(c) => c.commit(),
246                    #[cfg(feature = "mock")]
247                    InnerConnection::Mock(c) => {
248                        c.commit();
249                        Ok(())
250                    }
251                    #[cfg(feature = "proxy")]
252                    InnerConnection::Proxy(c) => {
253                        c.commit();
254                        Ok(())
255                    }
256                    #[allow(unreachable_patterns)]
257                    _ => Err(conn_err("Disconnected")),
258                }
259            }
260        );
261
262        result?;
263        self.open = false; // read by start_rollback
264        Ok(())
265    }
266
267    /// Rolls back a transaction explicitly
268    #[instrument(level = "trace")]
269    #[allow(unreachable_code, unused_mut)]
270    pub fn rollback(mut self) -> Result<(), DbErr> {
271        let result: Result<(), DbErr> = super::tracing_spans::with_db_span!(
272            "sea_orm.rollback",
273            self.backend,
274            "ROLLBACK",
275            record_stmt = false,
276            {
277                #[cfg(not(feature = "sync"))]
278                let conn = &mut *self.conn.lock();
279                #[cfg(feature = "sync")]
280                let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
281
282                match conn {
283                    #[cfg(feature = "sqlx-mysql")]
284                    InnerConnection::MySql(c) => {
285                        <sqlx::MySql as sqlx::Database>::TransactionManager::rollback(c)
286                            .map_err(sqlx_error_to_query_err)
287                    }
288                    #[cfg(feature = "sqlx-postgres")]
289                    InnerConnection::Postgres(c) => {
290                        <sqlx::Postgres as sqlx::Database>::TransactionManager::rollback(c)
291                            .map_err(sqlx_error_to_query_err)
292                    }
293                    #[cfg(feature = "sqlx-sqlite")]
294                    InnerConnection::Sqlite(c) => {
295                        <sqlx::Sqlite as sqlx::Database>::TransactionManager::rollback(c)
296                            .map_err(sqlx_error_to_query_err)
297                    }
298                    #[cfg(feature = "rusqlite")]
299                    InnerConnection::Rusqlite(c) => c.rollback(),
300                    #[cfg(feature = "mock")]
301                    InnerConnection::Mock(c) => {
302                        c.rollback();
303                        Ok(())
304                    }
305                    #[cfg(feature = "proxy")]
306                    InnerConnection::Proxy(c) => {
307                        c.rollback();
308                        Ok(())
309                    }
310                    #[allow(unreachable_patterns)]
311                    _ => Err(conn_err("Disconnected")),
312                }
313            }
314        );
315
316        result?;
317        self.open = false; // read by start_rollback
318        Ok(())
319    }
320
321    // the rollback is queued and will be performed on next operation, like returning the connection to the pool
322    #[instrument(level = "trace")]
323    fn start_rollback(&mut self) -> Result<(), DbErr> {
324        if self.open {
325            if let Some(mut conn) = self.conn.try_lock().ok() {
326                match &mut *conn {
327                    #[cfg(feature = "sqlx-mysql")]
328                    InnerConnection::MySql(c) => {
329                        <sqlx::MySql as sqlx::Database>::TransactionManager::start_rollback(c);
330                    }
331                    #[cfg(feature = "sqlx-postgres")]
332                    InnerConnection::Postgres(c) => {
333                        <sqlx::Postgres as sqlx::Database>::TransactionManager::start_rollback(c);
334                    }
335                    #[cfg(feature = "sqlx-sqlite")]
336                    InnerConnection::Sqlite(c) => {
337                        <sqlx::Sqlite as sqlx::Database>::TransactionManager::start_rollback(c);
338                    }
339                    #[cfg(feature = "rusqlite")]
340                    InnerConnection::Rusqlite(c) => {
341                        c.start_rollback()?;
342                    }
343                    #[cfg(feature = "mock")]
344                    InnerConnection::Mock(c) => {
345                        c.rollback();
346                    }
347                    #[cfg(feature = "proxy")]
348                    InnerConnection::Proxy(c) => {
349                        c.start_rollback();
350                    }
351                    #[allow(unreachable_patterns)]
352                    _ => return Err(conn_err("Disconnected")),
353                }
354            } else {
355                //this should never happen
356                return Err(conn_err("Dropping a locked Transaction"));
357            }
358        }
359        Ok(())
360    }
361}
362
363impl TransactionSession for DatabaseTransaction {
364    fn commit(self) -> Result<(), DbErr> {
365        self.commit()
366    }
367
368    fn rollback(self) -> Result<(), DbErr> {
369        self.rollback()
370    }
371}
372
373impl Drop for DatabaseTransaction {
374    fn drop(&mut self) {
375        self.start_rollback().expect("Fail to rollback transaction");
376    }
377}
378
379impl ConnectionTrait for DatabaseTransaction {
380    fn get_database_backend(&self) -> DbBackend {
381        // this way we don't need to lock just to know the backend
382        self.backend
383    }
384
385    #[instrument(level = "trace", skip(stmt))]
386    #[allow(unused_variables)]
387    fn execute_raw(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
388        debug_print!("{}", stmt);
389
390        super::tracing_spans::with_db_span!(
391            "sea_orm.execute",
392            self.backend,
393            stmt.sql.as_str(),
394            record_stmt = self.record_stmt_in_spans,
395            {
396                #[cfg(not(feature = "sync"))]
397                let conn = &mut *self.conn.lock();
398                #[cfg(feature = "sync")]
399                let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
400
401                match conn {
402                    #[cfg(feature = "sqlx-mysql")]
403                    InnerConnection::MySql(conn) => {
404                        let query = crate::driver::sqlx_mysql::sqlx_query(&stmt);
405                        let conn: &mut sqlx::MySqlConnection = &mut *conn;
406                        crate::metric::metric!(self.metric_callback, &stmt, {
407                            query.execute(conn).map(Into::into)
408                        })
409                        .map_err(sqlx_error_to_exec_err)
410                    }
411                    #[cfg(feature = "sqlx-postgres")]
412                    InnerConnection::Postgres(conn) => {
413                        let query = crate::driver::sqlx_postgres::sqlx_query(&stmt);
414                        let conn: &mut sqlx::PgConnection = &mut *conn;
415                        crate::metric::metric!(self.metric_callback, &stmt, {
416                            query.execute(conn).map(Into::into)
417                        })
418                        .map_err(sqlx_error_to_exec_err)
419                    }
420                    #[cfg(feature = "sqlx-sqlite")]
421                    InnerConnection::Sqlite(conn) => {
422                        let query = crate::driver::sqlx_sqlite::sqlx_query(&stmt);
423                        let conn: &mut sqlx::SqliteConnection = &mut *conn;
424                        crate::metric::metric!(self.metric_callback, &stmt, {
425                            query.execute(conn).map(Into::into)
426                        })
427                        .map_err(sqlx_error_to_exec_err)
428                    }
429                    #[cfg(feature = "rusqlite")]
430                    InnerConnection::Rusqlite(conn) => conn.execute(stmt, &self.metric_callback),
431                    #[cfg(feature = "mock")]
432                    InnerConnection::Mock(conn) => conn.execute(stmt),
433                    #[cfg(feature = "proxy")]
434                    InnerConnection::Proxy(conn) => conn.execute(stmt),
435                    #[allow(unreachable_patterns)]
436                    _ => Err(conn_err("Disconnected")),
437                }
438            }
439        )
440    }
441
442    #[instrument(level = "trace", skip(sql))]
443    #[allow(unused_variables)]
444    fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
445        debug_print!("{}", sql);
446
447        super::tracing_spans::with_db_span!(
448            "sea_orm.execute_unprepared",
449            self.backend,
450            sql,
451            record_stmt = false,
452            {
453                #[cfg(not(feature = "sync"))]
454                let conn = &mut *self.conn.lock();
455                #[cfg(feature = "sync")]
456                let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
457
458                match conn {
459                    #[cfg(feature = "sqlx-mysql")]
460                    InnerConnection::MySql(conn) => {
461                        let conn: &mut sqlx::MySqlConnection = &mut *conn;
462                        sqlx::Executor::execute(conn, sqlx::AssertSqlSafe(sql.to_owned()))
463                            .map(Into::into)
464                            .map_err(sqlx_error_to_exec_err)
465                    }
466                    #[cfg(feature = "sqlx-postgres")]
467                    InnerConnection::Postgres(conn) => {
468                        let conn: &mut sqlx::PgConnection = &mut *conn;
469                        sqlx::Executor::execute(conn, sqlx::AssertSqlSafe(sql.to_owned()))
470                            .map(Into::into)
471                            .map_err(sqlx_error_to_exec_err)
472                    }
473                    #[cfg(feature = "sqlx-sqlite")]
474                    InnerConnection::Sqlite(conn) => {
475                        let conn: &mut sqlx::SqliteConnection = &mut *conn;
476                        sqlx::Executor::execute(conn, sqlx::AssertSqlSafe(sql.to_owned()))
477                            .map(Into::into)
478                            .map_err(sqlx_error_to_exec_err)
479                    }
480                    #[cfg(feature = "rusqlite")]
481                    InnerConnection::Rusqlite(conn) => conn.execute_unprepared(sql),
482                    #[cfg(feature = "mock")]
483                    InnerConnection::Mock(conn) => {
484                        let db_backend = conn.get_database_backend();
485                        let stmt = Statement::from_string(db_backend, sql);
486                        conn.execute(stmt)
487                    }
488                    #[cfg(feature = "proxy")]
489                    InnerConnection::Proxy(conn) => {
490                        let db_backend = conn.get_database_backend();
491                        let stmt = Statement::from_string(db_backend, sql);
492                        conn.execute(stmt)
493                    }
494                    #[allow(unreachable_patterns)]
495                    _ => Err(conn_err("Disconnected")),
496                }
497            }
498        )
499    }
500
501    #[instrument(level = "trace", skip(stmt))]
502    #[allow(unused_variables)]
503    fn query_one_raw(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
504        debug_print!("{}", stmt);
505
506        super::tracing_spans::with_db_span!(
507            "sea_orm.query_one",
508            self.backend,
509            stmt.sql.as_str(),
510            record_stmt = self.record_stmt_in_spans,
511            {
512                #[cfg(not(feature = "sync"))]
513                let conn = &mut *self.conn.lock();
514                #[cfg(feature = "sync")]
515                let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
516
517                match conn {
518                    #[cfg(feature = "sqlx-mysql")]
519                    InnerConnection::MySql(conn) => {
520                        let query = crate::driver::sqlx_mysql::sqlx_query(&stmt);
521                        let conn: &mut sqlx::MySqlConnection = &mut *conn;
522                        crate::metric::metric!(self.metric_callback, &stmt, {
523                            crate::sqlx_map_err_ignore_not_found(
524                                query.fetch_one(conn).map(|row| Some(row.into())),
525                            )
526                        })
527                    }
528                    #[cfg(feature = "sqlx-postgres")]
529                    InnerConnection::Postgres(conn) => {
530                        let query = crate::driver::sqlx_postgres::sqlx_query(&stmt);
531                        let conn: &mut sqlx::PgConnection = &mut *conn;
532                        crate::metric::metric!(self.metric_callback, &stmt, {
533                            crate::sqlx_map_err_ignore_not_found(
534                                query.fetch_one(conn).map(|row| Some(row.into())),
535                            )
536                        })
537                    }
538                    #[cfg(feature = "sqlx-sqlite")]
539                    InnerConnection::Sqlite(conn) => {
540                        let query = crate::driver::sqlx_sqlite::sqlx_query(&stmt);
541                        let conn: &mut sqlx::SqliteConnection = &mut *conn;
542                        crate::metric::metric!(self.metric_callback, &stmt, {
543                            crate::sqlx_map_err_ignore_not_found(
544                                query.fetch_one(conn).map(|row| Some(row.into())),
545                            )
546                        })
547                    }
548                    #[cfg(feature = "rusqlite")]
549                    InnerConnection::Rusqlite(conn) => conn.query_one(stmt, &self.metric_callback),
550                    #[cfg(feature = "mock")]
551                    InnerConnection::Mock(conn) => conn.query_one(stmt),
552                    #[cfg(feature = "proxy")]
553                    InnerConnection::Proxy(conn) => conn.query_one(stmt),
554                    #[allow(unreachable_patterns)]
555                    _ => Err(conn_err("Disconnected")),
556                }
557            }
558        )
559    }
560
561    #[instrument(level = "trace", skip(stmt))]
562    #[allow(unused_variables)]
563    fn query_all_raw(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
564        debug_print!("{}", stmt);
565
566        super::tracing_spans::with_db_span!(
567            "sea_orm.query_all",
568            self.backend,
569            stmt.sql.as_str(),
570            record_stmt = self.record_stmt_in_spans,
571            {
572                #[cfg(not(feature = "sync"))]
573                let conn = &mut *self.conn.lock();
574                #[cfg(feature = "sync")]
575                let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
576
577                match conn {
578                    #[cfg(feature = "sqlx-mysql")]
579                    InnerConnection::MySql(conn) => {
580                        let query = crate::driver::sqlx_mysql::sqlx_query(&stmt);
581                        let conn: &mut sqlx::MySqlConnection = &mut *conn;
582                        crate::metric::metric!(self.metric_callback, &stmt, {
583                            query
584                                .fetch_all(conn)
585                                .map(|rows| rows.into_iter().map(|r| r.into()).collect())
586                                .map_err(sqlx_error_to_query_err)
587                        })
588                    }
589                    #[cfg(feature = "sqlx-postgres")]
590                    InnerConnection::Postgres(conn) => {
591                        let query = crate::driver::sqlx_postgres::sqlx_query(&stmt);
592                        let conn: &mut sqlx::PgConnection = &mut *conn;
593                        crate::metric::metric!(self.metric_callback, &stmt, {
594                            query
595                                .fetch_all(conn)
596                                .map(|rows| rows.into_iter().map(|r| r.into()).collect())
597                                .map_err(sqlx_error_to_query_err)
598                        })
599                    }
600                    #[cfg(feature = "sqlx-sqlite")]
601                    InnerConnection::Sqlite(conn) => {
602                        let query = crate::driver::sqlx_sqlite::sqlx_query(&stmt);
603                        let conn: &mut sqlx::SqliteConnection = &mut *conn;
604                        crate::metric::metric!(self.metric_callback, &stmt, {
605                            query
606                                .fetch_all(conn)
607                                .map(|rows| rows.into_iter().map(|r| r.into()).collect())
608                                .map_err(sqlx_error_to_query_err)
609                        })
610                    }
611                    #[cfg(feature = "rusqlite")]
612                    InnerConnection::Rusqlite(conn) => conn.query_all(stmt, &self.metric_callback),
613                    #[cfg(feature = "mock")]
614                    InnerConnection::Mock(conn) => conn.query_all(stmt),
615                    #[cfg(feature = "proxy")]
616                    InnerConnection::Proxy(conn) => conn.query_all(stmt),
617                    #[allow(unreachable_patterns)]
618                    _ => Err(conn_err("Disconnected")),
619                }
620            }
621        )
622    }
623}
624
625#[cfg(feature = "stream")]
626impl StreamTrait for DatabaseTransaction {
627    type Stream<'a> = TransactionStream<'a>;
628
629    fn get_database_backend(&self) -> DbBackend {
630        self.backend
631    }
632
633    #[instrument(level = "trace", skip(stmt))]
634    fn stream_raw<'a>(&'a self, stmt: Statement) -> Result<Self::Stream<'a>, DbErr> {
635        ({
636            #[cfg(not(feature = "sync"))]
637            let conn = self.conn.lock();
638            #[cfg(feature = "sync")]
639            let conn = self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
640            Ok(crate::TransactionStream::build(
641                conn,
642                stmt,
643                self.metric_callback.clone(),
644            ))
645        })
646    }
647}
648
649impl TransactionTrait for DatabaseTransaction {
650    type Transaction = DatabaseTransaction;
651
652    #[instrument(level = "trace")]
653    fn begin(&self) -> Result<DatabaseTransaction, DbErr> {
654        DatabaseTransaction::begin(
655            Arc::clone(&self.conn),
656            self.backend,
657            self.metric_callback.clone(),
658            self.record_stmt_in_spans,
659            None,
660            None,
661            None,
662        )
663    }
664
665    #[instrument(level = "trace")]
666    fn begin_with_config(
667        &self,
668        isolation_level: Option<IsolationLevel>,
669        access_mode: Option<AccessMode>,
670    ) -> Result<DatabaseTransaction, DbErr> {
671        DatabaseTransaction::begin(
672            Arc::clone(&self.conn),
673            self.backend,
674            self.metric_callback.clone(),
675            self.record_stmt_in_spans,
676            isolation_level,
677            access_mode,
678            None,
679        )
680    }
681
682    #[instrument(level = "trace")]
683    fn begin_with_options(
684        &self,
685        options: TransactionOptions,
686    ) -> Result<DatabaseTransaction, DbErr> {
687        DatabaseTransaction::begin(
688            Arc::clone(&self.conn),
689            self.backend,
690            self.metric_callback.clone(),
691            self.record_stmt_in_spans,
692            options.isolation_level,
693            options.access_mode,
694            options.sqlite_transaction_mode,
695        )
696    }
697
698    /// Execute the function inside a transaction.
699    /// If the function returns an error, the transaction will be rolled back.
700    /// Otherwise, the transaction will be committed.
701    #[instrument(level = "trace", skip(_callback))]
702    fn transaction<F, T, E>(&self, _callback: F) -> Result<T, TransactionError<E>>
703    where
704        F: for<'c> FnOnce(&'c DatabaseTransaction) -> Result<T, E>,
705        E: std::fmt::Display + std::fmt::Debug,
706    {
707        let transaction = self.begin().map_err(TransactionError::Connection)?;
708        transaction.run(_callback)
709    }
710
711    /// Execute the function inside a transaction.
712    /// If the function returns an error, the transaction will be rolled back.
713    /// Otherwise, the transaction will be committed.
714    #[instrument(level = "trace", skip(_callback))]
715    fn transaction_with_config<F, T, E>(
716        &self,
717        _callback: F,
718        isolation_level: Option<IsolationLevel>,
719        access_mode: Option<AccessMode>,
720    ) -> Result<T, TransactionError<E>>
721    where
722        F: for<'c> FnOnce(&'c DatabaseTransaction) -> Result<T, E>,
723        E: std::fmt::Display + std::fmt::Debug,
724    {
725        let transaction = self
726            .begin_with_config(isolation_level, access_mode)
727            .map_err(TransactionError::Connection)?;
728        transaction.run(_callback)
729    }
730}
731
732/// Error returned by [`TransactionTrait::transaction`](crate::TransactionTrait::transaction):
733/// either the database itself failed, or the user closure returned an `Err`
734/// (causing a rollback).
735#[derive(Debug)]
736pub enum TransactionError<E> {
737    /// Failure issuing `BEGIN`, `COMMIT`, or `ROLLBACK`.
738    Connection(DbErr),
739    /// The closure returned an `Err` (the transaction has been rolled back).
740    Transaction(E),
741}
742
743impl<E> std::fmt::Display for TransactionError<E>
744where
745    E: std::fmt::Display + std::fmt::Debug,
746{
747    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
748        match self {
749            TransactionError::Connection(e) => std::fmt::Display::fmt(e, f),
750            TransactionError::Transaction(e) => std::fmt::Display::fmt(e, f),
751        }
752    }
753}
754
755impl<E> std::error::Error for TransactionError<E> where E: std::fmt::Display + std::fmt::Debug {}
756
757impl<E> From<DbErr> for TransactionError<E>
758where
759    E: std::fmt::Display + std::fmt::Debug,
760{
761    fn from(e: DbErr) -> Self {
762        Self::Connection(e)
763    }
764}