1#![allow(unused_assignments)]
2use std::sync::Arc;
3
4#[cfg(feature = "sqlx-sqlite")]
5use sqlx_core::sql_str::SqlSafeStr;
6#[cfg(feature = "sqlx-dep")]
7use sqlx_core::transaction::TransactionManager;
8use std::sync::Mutex;
9use tracing::instrument;
10
11use crate::{
12 AccessMode, ConnectionTrait, DbBackend, DbErr, ExecResult, InnerConnection, IsolationLevel,
13 QueryResult, SqliteTransactionMode, Statement, StreamTrait, TransactionOptions,
14 TransactionSession, TransactionStream, TransactionTrait, debug_print, error::*,
15};
16#[cfg(feature = "sqlx-dep")]
17use crate::{sqlx_error_to_exec_err, sqlx_error_to_query_err};
18
19pub struct DatabaseTransaction {
24 conn: Arc<Mutex<InnerConnection>>,
25 backend: DbBackend,
26 open: bool,
27 metric_callback: Option<crate::metric::Callback>,
28 record_stmt_in_spans: bool,
29}
30
31#[instrument(level = "trace", skip(transaction, callback))]
32pub(crate) fn run_async_transaction_callback<Txn, F, T, E>(
33 transaction: Txn,
34 callback: F,
35) -> Result<T, TransactionError<E>>
36where
37 Txn: TransactionSession,
38 F: for<'b> FnOnce(&'b Txn) -> Result<T, E>,
39 E: std::fmt::Display + std::fmt::Debug,
40{
41 let res = callback(&transaction).map_err(TransactionError::Transaction);
42 if res.is_ok() {
43 transaction.commit().map_err(TransactionError::Connection)?;
44 } else {
45 transaction
46 .rollback()
47 .map_err(TransactionError::Connection)?;
48 }
49 res
50}
51
52impl std::fmt::Debug for DatabaseTransaction {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 write!(f, "DatabaseTransaction")
55 }
56}
57
58impl DatabaseTransaction {
59 #[instrument(level = "trace", skip(metric_callback))]
60 pub(crate) fn begin(
61 conn: Arc<Mutex<InnerConnection>>,
62 backend: DbBackend,
63 metric_callback: Option<crate::metric::Callback>,
64 record_stmt_in_spans: bool,
65 isolation_level: Option<IsolationLevel>,
66 access_mode: Option<AccessMode>,
67 sqlite_transaction_mode: Option<SqliteTransactionMode>,
68 ) -> Result<DatabaseTransaction, DbErr> {
69 let res = DatabaseTransaction {
70 conn,
71 backend,
72 open: true,
73 metric_callback,
74 record_stmt_in_spans,
75 };
76
77 let begin_result: Result<(), DbErr> = super::tracing_spans::with_db_span!(
78 "sea_orm.begin",
79 backend,
80 "BEGIN",
81 record_stmt = false,
82 {
83 #[cfg(not(feature = "sync"))]
84 let conn = &mut *res.conn.lock();
85 #[cfg(feature = "sync")]
86 let conn = &mut *res.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
87
88 match conn {
89 #[cfg(feature = "sqlx-mysql")]
90 InnerConnection::MySql(c) => {
91 crate::driver::sqlx_mysql::set_transaction_config(
93 c,
94 isolation_level,
95 access_mode,
96 )?;
97 <sqlx::MySql as sqlx::Database>::TransactionManager::begin(c, None)
98 .map_err(sqlx_error_to_query_err)
99 }
100 #[cfg(feature = "sqlx-postgres")]
101 InnerConnection::Postgres(c) => {
102 <sqlx::Postgres as sqlx::Database>::TransactionManager::begin(c, None)
103 .map_err(sqlx_error_to_query_err)?;
104 crate::driver::sqlx_postgres::set_transaction_config(
106 c,
107 isolation_level,
108 access_mode,
109 )
110 }
111 #[cfg(feature = "sqlx-sqlite")]
112 InnerConnection::Sqlite(c) => {
113 crate::driver::sqlx_sqlite::set_transaction_config(
114 c,
115 isolation_level,
116 access_mode,
117 )?;
118 let depth = <sqlx::Sqlite as sqlx::Database>::TransactionManager::get_transaction_depth(c);
119 let statement = if depth == 0 {
120 sqlite_transaction_mode.map(|mode| {
121 sqlx::AssertSqlSafe(format!("BEGIN {}", mode.sqlite_keyword()))
122 .into_sql_str()
123 })
124 } else {
125 None
127 };
128 <sqlx::Sqlite as sqlx::Database>::TransactionManager::begin(c, statement)
129 .map_err(sqlx_error_to_query_err)
130 }
131 #[cfg(feature = "rusqlite")]
132 InnerConnection::Rusqlite(c) => c.begin(sqlite_transaction_mode),
133 #[cfg(feature = "mock")]
134 InnerConnection::Mock(c) => {
135 c.begin();
136 Ok(())
137 }
138 #[cfg(feature = "proxy")]
139 InnerConnection::Proxy(c) => {
140 c.begin();
141 Ok(())
142 }
143 #[allow(unreachable_patterns)]
144 _ => Err(conn_err("Disconnected")),
145 }
146 }
147 );
148
149 begin_result?;
150 Ok(res)
151 }
152
153 #[instrument(level = "trace", skip(callback))]
156 pub(crate) fn run<F, T, E>(self, callback: F) -> Result<T, TransactionError<E>>
157 where
158 F: for<'b> FnOnce(&'b DatabaseTransaction) -> Result<T, E>,
159 E: std::fmt::Display + std::fmt::Debug,
160 {
161 let res = callback(&self).map_err(TransactionError::Transaction);
162 if res.is_ok() {
163 self.commit().map_err(TransactionError::Connection)?;
164 } else {
165 self.rollback().map_err(TransactionError::Connection)?;
166 }
167 res
168 }
169
170 #[instrument(level = "trace", skip(callback))]
174 pub fn transaction<F, T, E>(&self, callback: F) -> Result<T, TransactionError<E>>
175 where
176 F: for<'c> FnOnce(&'c DatabaseTransaction) -> Result<T, E>,
177 E: std::fmt::Display + std::fmt::Debug,
178 {
179 let transaction = self.begin().map_err(TransactionError::Connection)?;
180 run_async_transaction_callback(transaction, callback)
181 }
182
183 #[instrument(level = "trace", skip(callback))]
187 pub fn transaction_with_config<F, T, E>(
188 &self,
189 callback: F,
190 isolation_level: Option<IsolationLevel>,
191 access_mode: Option<AccessMode>,
192 ) -> Result<T, TransactionError<E>>
193 where
194 F: for<'c> FnOnce(&'c DatabaseTransaction) -> Result<T, E>,
195 E: std::fmt::Display + std::fmt::Debug,
196 {
197 let transaction = self
198 .begin_with_config(isolation_level, access_mode)
199 .map_err(TransactionError::Connection)?;
200 run_async_transaction_callback(transaction, callback)
201 }
202
203 #[instrument(level = "trace")]
205 #[allow(unreachable_code, unused_mut)]
206 pub fn commit(mut self) -> Result<(), DbErr> {
207 let result: Result<(), DbErr> = super::tracing_spans::with_db_span!(
208 "sea_orm.commit",
209 self.backend,
210 "COMMIT",
211 record_stmt = false,
212 {
213 #[cfg(not(feature = "sync"))]
214 let conn = &mut *self.conn.lock();
215 #[cfg(feature = "sync")]
216 let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
217
218 match conn {
219 #[cfg(feature = "sqlx-mysql")]
220 InnerConnection::MySql(c) => {
221 <sqlx::MySql as sqlx::Database>::TransactionManager::commit(c)
222 .map_err(sqlx_error_to_query_err)
223 }
224 #[cfg(feature = "sqlx-postgres")]
225 InnerConnection::Postgres(c) => {
226 <sqlx::Postgres as sqlx::Database>::TransactionManager::commit(c)
227 .map_err(sqlx_error_to_query_err)
228 }
229 #[cfg(feature = "sqlx-sqlite")]
230 InnerConnection::Sqlite(c) => {
231 <sqlx::Sqlite as sqlx::Database>::TransactionManager::commit(c)
232 .map_err(sqlx_error_to_query_err)
233 }
234 #[cfg(feature = "rusqlite")]
235 InnerConnection::Rusqlite(c) => c.commit(),
236 #[cfg(feature = "mock")]
237 InnerConnection::Mock(c) => {
238 c.commit();
239 Ok(())
240 }
241 #[cfg(feature = "proxy")]
242 InnerConnection::Proxy(c) => {
243 c.commit();
244 Ok(())
245 }
246 #[allow(unreachable_patterns)]
247 _ => Err(conn_err("Disconnected")),
248 }
249 }
250 );
251
252 result?;
253 self.open = false; Ok(())
255 }
256
257 #[instrument(level = "trace")]
259 #[allow(unreachable_code, unused_mut)]
260 pub fn rollback(mut self) -> Result<(), DbErr> {
261 let result: Result<(), DbErr> = super::tracing_spans::with_db_span!(
262 "sea_orm.rollback",
263 self.backend,
264 "ROLLBACK",
265 record_stmt = false,
266 {
267 #[cfg(not(feature = "sync"))]
268 let conn = &mut *self.conn.lock();
269 #[cfg(feature = "sync")]
270 let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
271
272 match conn {
273 #[cfg(feature = "sqlx-mysql")]
274 InnerConnection::MySql(c) => {
275 <sqlx::MySql as sqlx::Database>::TransactionManager::rollback(c)
276 .map_err(sqlx_error_to_query_err)
277 }
278 #[cfg(feature = "sqlx-postgres")]
279 InnerConnection::Postgres(c) => {
280 <sqlx::Postgres as sqlx::Database>::TransactionManager::rollback(c)
281 .map_err(sqlx_error_to_query_err)
282 }
283 #[cfg(feature = "sqlx-sqlite")]
284 InnerConnection::Sqlite(c) => {
285 <sqlx::Sqlite as sqlx::Database>::TransactionManager::rollback(c)
286 .map_err(sqlx_error_to_query_err)
287 }
288 #[cfg(feature = "rusqlite")]
289 InnerConnection::Rusqlite(c) => c.rollback(),
290 #[cfg(feature = "mock")]
291 InnerConnection::Mock(c) => {
292 c.rollback();
293 Ok(())
294 }
295 #[cfg(feature = "proxy")]
296 InnerConnection::Proxy(c) => {
297 c.rollback();
298 Ok(())
299 }
300 #[allow(unreachable_patterns)]
301 _ => Err(conn_err("Disconnected")),
302 }
303 }
304 );
305
306 result?;
307 self.open = false; Ok(())
309 }
310
311 #[instrument(level = "trace")]
313 fn start_rollback(&mut self) -> Result<(), DbErr> {
314 if self.open {
315 if let Some(mut conn) = self.conn.try_lock().ok() {
316 match &mut *conn {
317 #[cfg(feature = "sqlx-mysql")]
318 InnerConnection::MySql(c) => {
319 <sqlx::MySql as sqlx::Database>::TransactionManager::start_rollback(c);
320 }
321 #[cfg(feature = "sqlx-postgres")]
322 InnerConnection::Postgres(c) => {
323 <sqlx::Postgres as sqlx::Database>::TransactionManager::start_rollback(c);
324 }
325 #[cfg(feature = "sqlx-sqlite")]
326 InnerConnection::Sqlite(c) => {
327 <sqlx::Sqlite as sqlx::Database>::TransactionManager::start_rollback(c);
328 }
329 #[cfg(feature = "rusqlite")]
330 InnerConnection::Rusqlite(c) => {
331 c.start_rollback()?;
332 }
333 #[cfg(feature = "mock")]
334 InnerConnection::Mock(c) => {
335 c.rollback();
336 }
337 #[cfg(feature = "proxy")]
338 InnerConnection::Proxy(c) => {
339 c.start_rollback();
340 }
341 #[allow(unreachable_patterns)]
342 _ => return Err(conn_err("Disconnected")),
343 }
344 } else {
345 return Err(conn_err("Dropping a locked Transaction"));
347 }
348 }
349 Ok(())
350 }
351}
352
353impl TransactionSession for DatabaseTransaction {
354 fn commit(self) -> Result<(), DbErr> {
355 self.commit()
356 }
357
358 fn rollback(self) -> Result<(), DbErr> {
359 self.rollback()
360 }
361}
362
363impl Drop for DatabaseTransaction {
364 fn drop(&mut self) {
365 self.start_rollback().expect("Fail to rollback transaction");
366 }
367}
368
369impl ConnectionTrait for DatabaseTransaction {
370 fn get_database_backend(&self) -> DbBackend {
371 self.backend
373 }
374
375 #[instrument(level = "trace")]
376 #[allow(unused_variables)]
377 fn execute_raw(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
378 debug_print!("{}", stmt);
379
380 super::tracing_spans::with_db_span!(
381 "sea_orm.execute",
382 self.backend,
383 stmt.sql.as_str(),
384 record_stmt = self.record_stmt_in_spans,
385 {
386 #[cfg(not(feature = "sync"))]
387 let conn = &mut *self.conn.lock();
388 #[cfg(feature = "sync")]
389 let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
390
391 match conn {
392 #[cfg(feature = "sqlx-mysql")]
393 InnerConnection::MySql(conn) => {
394 let query = crate::driver::sqlx_mysql::sqlx_query(&stmt);
395 let conn: &mut sqlx::MySqlConnection = &mut *conn;
396 crate::metric::metric!(self.metric_callback, &stmt, {
397 query.execute(conn).map(Into::into)
398 })
399 .map_err(sqlx_error_to_exec_err)
400 }
401 #[cfg(feature = "sqlx-postgres")]
402 InnerConnection::Postgres(conn) => {
403 let query = crate::driver::sqlx_postgres::sqlx_query(&stmt);
404 let conn: &mut sqlx::PgConnection = &mut *conn;
405 crate::metric::metric!(self.metric_callback, &stmt, {
406 query.execute(conn).map(Into::into)
407 })
408 .map_err(sqlx_error_to_exec_err)
409 }
410 #[cfg(feature = "sqlx-sqlite")]
411 InnerConnection::Sqlite(conn) => {
412 let query = crate::driver::sqlx_sqlite::sqlx_query(&stmt);
413 let conn: &mut sqlx::SqliteConnection = &mut *conn;
414 crate::metric::metric!(self.metric_callback, &stmt, {
415 query.execute(conn).map(Into::into)
416 })
417 .map_err(sqlx_error_to_exec_err)
418 }
419 #[cfg(feature = "rusqlite")]
420 InnerConnection::Rusqlite(conn) => conn.execute(stmt, &self.metric_callback),
421 #[cfg(feature = "mock")]
422 InnerConnection::Mock(conn) => conn.execute(stmt),
423 #[cfg(feature = "proxy")]
424 InnerConnection::Proxy(conn) => conn.execute(stmt),
425 #[allow(unreachable_patterns)]
426 _ => Err(conn_err("Disconnected")),
427 }
428 }
429 )
430 }
431
432 #[instrument(level = "trace")]
433 #[allow(unused_variables)]
434 fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
435 debug_print!("{}", sql);
436
437 super::tracing_spans::with_db_span!(
438 "sea_orm.execute_unprepared",
439 self.backend,
440 sql,
441 record_stmt = false,
442 {
443 #[cfg(not(feature = "sync"))]
444 let conn = &mut *self.conn.lock();
445 #[cfg(feature = "sync")]
446 let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
447
448 match conn {
449 #[cfg(feature = "sqlx-mysql")]
450 InnerConnection::MySql(conn) => {
451 let conn: &mut sqlx::MySqlConnection = &mut *conn;
452 sqlx::Executor::execute(conn, sqlx::AssertSqlSafe(sql.to_owned()))
453 .map(Into::into)
454 .map_err(sqlx_error_to_exec_err)
455 }
456 #[cfg(feature = "sqlx-postgres")]
457 InnerConnection::Postgres(conn) => {
458 let conn: &mut sqlx::PgConnection = &mut *conn;
459 sqlx::Executor::execute(conn, sqlx::AssertSqlSafe(sql.to_owned()))
460 .map(Into::into)
461 .map_err(sqlx_error_to_exec_err)
462 }
463 #[cfg(feature = "sqlx-sqlite")]
464 InnerConnection::Sqlite(conn) => {
465 let conn: &mut sqlx::SqliteConnection = &mut *conn;
466 sqlx::Executor::execute(conn, sqlx::AssertSqlSafe(sql.to_owned()))
467 .map(Into::into)
468 .map_err(sqlx_error_to_exec_err)
469 }
470 #[cfg(feature = "rusqlite")]
471 InnerConnection::Rusqlite(conn) => conn.execute_unprepared(sql),
472 #[cfg(feature = "mock")]
473 InnerConnection::Mock(conn) => {
474 let db_backend = conn.get_database_backend();
475 let stmt = Statement::from_string(db_backend, sql);
476 conn.execute(stmt)
477 }
478 #[cfg(feature = "proxy")]
479 InnerConnection::Proxy(conn) => {
480 let db_backend = conn.get_database_backend();
481 let stmt = Statement::from_string(db_backend, sql);
482 conn.execute(stmt)
483 }
484 #[allow(unreachable_patterns)]
485 _ => Err(conn_err("Disconnected")),
486 }
487 }
488 )
489 }
490
491 #[instrument(level = "trace")]
492 #[allow(unused_variables)]
493 fn query_one_raw(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
494 debug_print!("{}", stmt);
495
496 super::tracing_spans::with_db_span!(
497 "sea_orm.query_one",
498 self.backend,
499 stmt.sql.as_str(),
500 record_stmt = self.record_stmt_in_spans,
501 {
502 #[cfg(not(feature = "sync"))]
503 let conn = &mut *self.conn.lock();
504 #[cfg(feature = "sync")]
505 let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
506
507 match conn {
508 #[cfg(feature = "sqlx-mysql")]
509 InnerConnection::MySql(conn) => {
510 let query = crate::driver::sqlx_mysql::sqlx_query(&stmt);
511 let conn: &mut sqlx::MySqlConnection = &mut *conn;
512 crate::metric::metric!(self.metric_callback, &stmt, {
513 crate::sqlx_map_err_ignore_not_found(
514 query.fetch_one(conn).map(|row| Some(row.into())),
515 )
516 })
517 }
518 #[cfg(feature = "sqlx-postgres")]
519 InnerConnection::Postgres(conn) => {
520 let query = crate::driver::sqlx_postgres::sqlx_query(&stmt);
521 let conn: &mut sqlx::PgConnection = &mut *conn;
522 crate::metric::metric!(self.metric_callback, &stmt, {
523 crate::sqlx_map_err_ignore_not_found(
524 query.fetch_one(conn).map(|row| Some(row.into())),
525 )
526 })
527 }
528 #[cfg(feature = "sqlx-sqlite")]
529 InnerConnection::Sqlite(conn) => {
530 let query = crate::driver::sqlx_sqlite::sqlx_query(&stmt);
531 let conn: &mut sqlx::SqliteConnection = &mut *conn;
532 crate::metric::metric!(self.metric_callback, &stmt, {
533 crate::sqlx_map_err_ignore_not_found(
534 query.fetch_one(conn).map(|row| Some(row.into())),
535 )
536 })
537 }
538 #[cfg(feature = "rusqlite")]
539 InnerConnection::Rusqlite(conn) => conn.query_one(stmt, &self.metric_callback),
540 #[cfg(feature = "mock")]
541 InnerConnection::Mock(conn) => conn.query_one(stmt),
542 #[cfg(feature = "proxy")]
543 InnerConnection::Proxy(conn) => conn.query_one(stmt),
544 #[allow(unreachable_patterns)]
545 _ => Err(conn_err("Disconnected")),
546 }
547 }
548 )
549 }
550
551 #[instrument(level = "trace")]
552 #[allow(unused_variables)]
553 fn query_all_raw(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
554 debug_print!("{}", stmt);
555
556 super::tracing_spans::with_db_span!(
557 "sea_orm.query_all",
558 self.backend,
559 stmt.sql.as_str(),
560 record_stmt = self.record_stmt_in_spans,
561 {
562 #[cfg(not(feature = "sync"))]
563 let conn = &mut *self.conn.lock();
564 #[cfg(feature = "sync")]
565 let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
566
567 match conn {
568 #[cfg(feature = "sqlx-mysql")]
569 InnerConnection::MySql(conn) => {
570 let query = crate::driver::sqlx_mysql::sqlx_query(&stmt);
571 let conn: &mut sqlx::MySqlConnection = &mut *conn;
572 crate::metric::metric!(self.metric_callback, &stmt, {
573 query
574 .fetch_all(conn)
575 .map(|rows| rows.into_iter().map(|r| r.into()).collect())
576 .map_err(sqlx_error_to_query_err)
577 })
578 }
579 #[cfg(feature = "sqlx-postgres")]
580 InnerConnection::Postgres(conn) => {
581 let query = crate::driver::sqlx_postgres::sqlx_query(&stmt);
582 let conn: &mut sqlx::PgConnection = &mut *conn;
583 crate::metric::metric!(self.metric_callback, &stmt, {
584 query
585 .fetch_all(conn)
586 .map(|rows| rows.into_iter().map(|r| r.into()).collect())
587 .map_err(sqlx_error_to_query_err)
588 })
589 }
590 #[cfg(feature = "sqlx-sqlite")]
591 InnerConnection::Sqlite(conn) => {
592 let query = crate::driver::sqlx_sqlite::sqlx_query(&stmt);
593 let conn: &mut sqlx::SqliteConnection = &mut *conn;
594 crate::metric::metric!(self.metric_callback, &stmt, {
595 query
596 .fetch_all(conn)
597 .map(|rows| rows.into_iter().map(|r| r.into()).collect())
598 .map_err(sqlx_error_to_query_err)
599 })
600 }
601 #[cfg(feature = "rusqlite")]
602 InnerConnection::Rusqlite(conn) => conn.query_all(stmt, &self.metric_callback),
603 #[cfg(feature = "mock")]
604 InnerConnection::Mock(conn) => conn.query_all(stmt),
605 #[cfg(feature = "proxy")]
606 InnerConnection::Proxy(conn) => conn.query_all(stmt),
607 #[allow(unreachable_patterns)]
608 _ => Err(conn_err("Disconnected")),
609 }
610 }
611 )
612 }
613}
614
615impl StreamTrait for DatabaseTransaction {
616 type Stream<'a> = TransactionStream<'a>;
617
618 fn get_database_backend(&self) -> DbBackend {
619 self.backend
620 }
621
622 #[instrument(level = "trace")]
623 fn stream_raw<'a>(&'a self, stmt: Statement) -> Result<Self::Stream<'a>, DbErr> {
624 ({
625 #[cfg(not(feature = "sync"))]
626 let conn = self.conn.lock();
627 #[cfg(feature = "sync")]
628 let conn = self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
629 Ok(crate::TransactionStream::build(
630 conn,
631 stmt,
632 self.metric_callback.clone(),
633 ))
634 })
635 }
636}
637
638impl TransactionTrait for DatabaseTransaction {
639 type Transaction = DatabaseTransaction;
640
641 #[instrument(level = "trace")]
642 fn begin(&self) -> Result<DatabaseTransaction, DbErr> {
643 DatabaseTransaction::begin(
644 Arc::clone(&self.conn),
645 self.backend,
646 self.metric_callback.clone(),
647 self.record_stmt_in_spans,
648 None,
649 None,
650 None,
651 )
652 }
653
654 #[instrument(level = "trace")]
655 fn begin_with_config(
656 &self,
657 isolation_level: Option<IsolationLevel>,
658 access_mode: Option<AccessMode>,
659 ) -> Result<DatabaseTransaction, DbErr> {
660 DatabaseTransaction::begin(
661 Arc::clone(&self.conn),
662 self.backend,
663 self.metric_callback.clone(),
664 self.record_stmt_in_spans,
665 isolation_level,
666 access_mode,
667 None,
668 )
669 }
670
671 #[instrument(level = "trace")]
672 fn begin_with_options(
673 &self,
674 options: TransactionOptions,
675 ) -> Result<DatabaseTransaction, DbErr> {
676 DatabaseTransaction::begin(
677 Arc::clone(&self.conn),
678 self.backend,
679 self.metric_callback.clone(),
680 self.record_stmt_in_spans,
681 options.isolation_level,
682 options.access_mode,
683 options.sqlite_transaction_mode,
684 )
685 }
686
687 #[instrument(level = "trace", skip(_callback))]
691 fn transaction<F, T, E>(&self, _callback: F) -> Result<T, TransactionError<E>>
692 where
693 F: for<'c> FnOnce(&'c DatabaseTransaction) -> Result<T, E>,
694 E: std::fmt::Display + std::fmt::Debug,
695 {
696 let transaction = self.begin().map_err(TransactionError::Connection)?;
697 transaction.run(_callback)
698 }
699
700 #[instrument(level = "trace", skip(_callback))]
704 fn transaction_with_config<F, T, E>(
705 &self,
706 _callback: F,
707 isolation_level: Option<IsolationLevel>,
708 access_mode: Option<AccessMode>,
709 ) -> Result<T, TransactionError<E>>
710 where
711 F: for<'c> FnOnce(&'c DatabaseTransaction) -> Result<T, E>,
712 E: std::fmt::Display + std::fmt::Debug,
713 {
714 let transaction = self
715 .begin_with_config(isolation_level, access_mode)
716 .map_err(TransactionError::Connection)?;
717 transaction.run(_callback)
718 }
719}
720
721#[derive(Debug)]
723pub enum TransactionError<E> {
724 Connection(DbErr),
726 Transaction(E),
728}
729
730impl<E> std::fmt::Display for TransactionError<E>
731where
732 E: std::fmt::Display + std::fmt::Debug,
733{
734 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
735 match self {
736 TransactionError::Connection(e) => std::fmt::Display::fmt(e, f),
737 TransactionError::Transaction(e) => std::fmt::Display::fmt(e, f),
738 }
739 }
740}
741
742impl<E> std::error::Error for TransactionError<E> where E: std::fmt::Display + std::fmt::Debug {}
743
744impl<E> From<DbErr> for TransactionError<E>
745where
746 E: std::fmt::Display + std::fmt::Debug,
747{
748 fn from(e: DbErr) -> Self {
749 Self::Connection(e)
750 }
751}