1#![allow(unused_assignments)]
2use std::sync::Arc;
3
4#[cfg(feature = "sqlx-dep")]
5use sqlx::TransactionManager;
6use std::sync::Mutex;
7use tracing::instrument;
8
9use crate::{
10 AccessMode, ConnectionTrait, DbBackend, DbErr, ExecResult, InnerConnection, IsolationLevel,
11 QueryResult, SqliteTransactionMode, Statement, StreamTrait, TransactionOptions,
12 TransactionSession, TransactionStream, TransactionTrait, debug_print, error::*,
13};
14#[cfg(feature = "sqlx-dep")]
15use crate::{sqlx_error_to_exec_err, sqlx_error_to_query_err};
16
17pub struct DatabaseTransaction {
22 conn: Arc<Mutex<InnerConnection>>,
23 backend: DbBackend,
24 open: bool,
25 metric_callback: Option<crate::metric::Callback>,
26}
27
28impl std::fmt::Debug for DatabaseTransaction {
29 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30 write!(f, "DatabaseTransaction")
31 }
32}
33
34impl DatabaseTransaction {
35 #[instrument(level = "trace", skip(metric_callback))]
36 pub(crate) fn begin(
37 conn: Arc<Mutex<InnerConnection>>,
38 backend: DbBackend,
39 metric_callback: Option<crate::metric::Callback>,
40 isolation_level: Option<IsolationLevel>,
41 access_mode: Option<AccessMode>,
42 sqlite_transaction_mode: Option<SqliteTransactionMode>,
43 ) -> Result<DatabaseTransaction, DbErr> {
44 let res = DatabaseTransaction {
45 conn,
46 backend,
47 open: true,
48 metric_callback,
49 };
50
51 let begin_result: Result<(), DbErr> = super::tracing_spans::with_db_span!(
52 "sea_orm.begin",
53 backend,
54 "BEGIN",
55 record_stmt = false,
56 {
57 #[cfg(not(feature = "sync"))]
58 let conn = &mut *res.conn.lock();
59 #[cfg(feature = "sync")]
60 let conn = &mut *res.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
61
62 match conn {
63 #[cfg(feature = "sqlx-mysql")]
64 InnerConnection::MySql(c) => {
65 crate::driver::sqlx_mysql::set_transaction_config(
67 c,
68 isolation_level,
69 access_mode,
70 )?;
71 <sqlx::MySql as sqlx::Database>::TransactionManager::begin(c, None)
72 .map_err(sqlx_error_to_query_err)
73 }
74 #[cfg(feature = "sqlx-postgres")]
75 InnerConnection::Postgres(c) => {
76 <sqlx::Postgres as sqlx::Database>::TransactionManager::begin(c, None)
77 .map_err(sqlx_error_to_query_err)?;
78 crate::driver::sqlx_postgres::set_transaction_config(
80 c,
81 isolation_level,
82 access_mode,
83 )
84 }
85 #[cfg(feature = "sqlx-sqlite")]
86 InnerConnection::Sqlite(c) => {
87 crate::driver::sqlx_sqlite::set_transaction_config(
88 c,
89 isolation_level,
90 access_mode,
91 )?;
92 let depth = <sqlx::Sqlite as sqlx::Database>::TransactionManager::get_transaction_depth(c);
93 let statement = if depth == 0 {
94 sqlite_transaction_mode.map(|mode| {
95 std::borrow::Cow::from(format!("BEGIN {}", mode.sqlite_keyword()))
96 })
97 } else {
98 None
100 };
101 <sqlx::Sqlite as sqlx::Database>::TransactionManager::begin(c, statement)
102 .map_err(sqlx_error_to_query_err)
103 }
104 #[cfg(feature = "rusqlite")]
105 InnerConnection::Rusqlite(c) => c.begin(sqlite_transaction_mode),
106 #[cfg(feature = "mock")]
107 InnerConnection::Mock(c) => {
108 c.begin();
109 Ok(())
110 }
111 #[cfg(feature = "proxy")]
112 InnerConnection::Proxy(c) => {
113 c.begin();
114 Ok(())
115 }
116 #[allow(unreachable_patterns)]
117 _ => Err(conn_err("Disconnected")),
118 }
119 }
120 );
121
122 begin_result?;
123 Ok(res)
124 }
125
126 #[instrument(level = "trace", skip(callback))]
129 pub(crate) fn run<F, T, E>(self, callback: F) -> Result<T, TransactionError<E>>
130 where
131 F: for<'b> FnOnce(&'b DatabaseTransaction) -> Result<T, E>,
132 E: std::fmt::Display + std::fmt::Debug,
133 {
134 let res = callback(&self).map_err(TransactionError::Transaction);
135 if res.is_ok() {
136 self.commit().map_err(TransactionError::Connection)?;
137 } else {
138 self.rollback().map_err(TransactionError::Connection)?;
139 }
140 res
141 }
142
143 #[instrument(level = "trace")]
145 #[allow(unreachable_code, unused_mut)]
146 pub fn commit(mut self) -> Result<(), DbErr> {
147 let result: Result<(), DbErr> = super::tracing_spans::with_db_span!(
148 "sea_orm.commit",
149 self.backend,
150 "COMMIT",
151 record_stmt = false,
152 {
153 #[cfg(not(feature = "sync"))]
154 let conn = &mut *self.conn.lock();
155 #[cfg(feature = "sync")]
156 let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
157
158 match conn {
159 #[cfg(feature = "sqlx-mysql")]
160 InnerConnection::MySql(c) => {
161 <sqlx::MySql as sqlx::Database>::TransactionManager::commit(c)
162 .map_err(sqlx_error_to_query_err)
163 }
164 #[cfg(feature = "sqlx-postgres")]
165 InnerConnection::Postgres(c) => {
166 <sqlx::Postgres as sqlx::Database>::TransactionManager::commit(c)
167 .map_err(sqlx_error_to_query_err)
168 }
169 #[cfg(feature = "sqlx-sqlite")]
170 InnerConnection::Sqlite(c) => {
171 <sqlx::Sqlite as sqlx::Database>::TransactionManager::commit(c)
172 .map_err(sqlx_error_to_query_err)
173 }
174 #[cfg(feature = "rusqlite")]
175 InnerConnection::Rusqlite(c) => c.commit(),
176 #[cfg(feature = "mock")]
177 InnerConnection::Mock(c) => {
178 c.commit();
179 Ok(())
180 }
181 #[cfg(feature = "proxy")]
182 InnerConnection::Proxy(c) => {
183 c.commit();
184 Ok(())
185 }
186 #[allow(unreachable_patterns)]
187 _ => Err(conn_err("Disconnected")),
188 }
189 }
190 );
191
192 result?;
193 self.open = false; Ok(())
195 }
196
197 #[instrument(level = "trace")]
199 #[allow(unreachable_code, unused_mut)]
200 pub fn rollback(mut self) -> Result<(), DbErr> {
201 let result: Result<(), DbErr> = super::tracing_spans::with_db_span!(
202 "sea_orm.rollback",
203 self.backend,
204 "ROLLBACK",
205 record_stmt = false,
206 {
207 #[cfg(not(feature = "sync"))]
208 let conn = &mut *self.conn.lock();
209 #[cfg(feature = "sync")]
210 let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
211
212 match conn {
213 #[cfg(feature = "sqlx-mysql")]
214 InnerConnection::MySql(c) => {
215 <sqlx::MySql as sqlx::Database>::TransactionManager::rollback(c)
216 .map_err(sqlx_error_to_query_err)
217 }
218 #[cfg(feature = "sqlx-postgres")]
219 InnerConnection::Postgres(c) => {
220 <sqlx::Postgres as sqlx::Database>::TransactionManager::rollback(c)
221 .map_err(sqlx_error_to_query_err)
222 }
223 #[cfg(feature = "sqlx-sqlite")]
224 InnerConnection::Sqlite(c) => {
225 <sqlx::Sqlite as sqlx::Database>::TransactionManager::rollback(c)
226 .map_err(sqlx_error_to_query_err)
227 }
228 #[cfg(feature = "rusqlite")]
229 InnerConnection::Rusqlite(c) => c.rollback(),
230 #[cfg(feature = "mock")]
231 InnerConnection::Mock(c) => {
232 c.rollback();
233 Ok(())
234 }
235 #[cfg(feature = "proxy")]
236 InnerConnection::Proxy(c) => {
237 c.rollback();
238 Ok(())
239 }
240 #[allow(unreachable_patterns)]
241 _ => Err(conn_err("Disconnected")),
242 }
243 }
244 );
245
246 result?;
247 self.open = false; Ok(())
249 }
250
251 #[instrument(level = "trace")]
253 fn start_rollback(&mut self) -> Result<(), DbErr> {
254 if self.open {
255 if let Some(mut conn) = self.conn.try_lock().ok() {
256 match &mut *conn {
257 #[cfg(feature = "sqlx-mysql")]
258 InnerConnection::MySql(c) => {
259 <sqlx::MySql as sqlx::Database>::TransactionManager::start_rollback(c);
260 }
261 #[cfg(feature = "sqlx-postgres")]
262 InnerConnection::Postgres(c) => {
263 <sqlx::Postgres as sqlx::Database>::TransactionManager::start_rollback(c);
264 }
265 #[cfg(feature = "sqlx-sqlite")]
266 InnerConnection::Sqlite(c) => {
267 <sqlx::Sqlite as sqlx::Database>::TransactionManager::start_rollback(c);
268 }
269 #[cfg(feature = "rusqlite")]
270 InnerConnection::Rusqlite(c) => {
271 c.start_rollback()?;
272 }
273 #[cfg(feature = "mock")]
274 InnerConnection::Mock(c) => {
275 c.rollback();
276 }
277 #[cfg(feature = "proxy")]
278 InnerConnection::Proxy(c) => {
279 c.start_rollback();
280 }
281 #[allow(unreachable_patterns)]
282 _ => return Err(conn_err("Disconnected")),
283 }
284 } else {
285 return Err(conn_err("Dropping a locked Transaction"));
287 }
288 }
289 Ok(())
290 }
291}
292
293impl TransactionSession for DatabaseTransaction {
294 fn commit(self) -> Result<(), DbErr> {
295 self.commit()
296 }
297
298 fn rollback(self) -> Result<(), DbErr> {
299 self.rollback()
300 }
301}
302
303impl Drop for DatabaseTransaction {
304 fn drop(&mut self) {
305 self.start_rollback().expect("Fail to rollback transaction");
306 }
307}
308
309impl ConnectionTrait for DatabaseTransaction {
310 fn get_database_backend(&self) -> DbBackend {
311 self.backend
313 }
314
315 #[instrument(level = "trace")]
316 #[allow(unused_variables)]
317 fn execute_raw(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
318 debug_print!("{}", stmt);
319
320 super::tracing_spans::with_db_span!(
321 "sea_orm.execute",
322 self.backend,
323 stmt.sql.as_str(),
324 record_stmt = true,
325 {
326 #[cfg(not(feature = "sync"))]
327 let conn = &mut *self.conn.lock();
328 #[cfg(feature = "sync")]
329 let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
330
331 match conn {
332 #[cfg(feature = "sqlx-mysql")]
333 InnerConnection::MySql(conn) => {
334 let query = crate::driver::sqlx_mysql::sqlx_query(&stmt);
335 let conn: &mut sqlx::MySqlConnection = &mut *conn;
336 crate::metric::metric!(self.metric_callback, &stmt, {
337 query.execute(conn).map(Into::into)
338 })
339 .map_err(sqlx_error_to_exec_err)
340 }
341 #[cfg(feature = "sqlx-postgres")]
342 InnerConnection::Postgres(conn) => {
343 let query = crate::driver::sqlx_postgres::sqlx_query(&stmt);
344 let conn: &mut sqlx::PgConnection = &mut *conn;
345 crate::metric::metric!(self.metric_callback, &stmt, {
346 query.execute(conn).map(Into::into)
347 })
348 .map_err(sqlx_error_to_exec_err)
349 }
350 #[cfg(feature = "sqlx-sqlite")]
351 InnerConnection::Sqlite(conn) => {
352 let query = crate::driver::sqlx_sqlite::sqlx_query(&stmt);
353 let conn: &mut sqlx::SqliteConnection = &mut *conn;
354 crate::metric::metric!(self.metric_callback, &stmt, {
355 query.execute(conn).map(Into::into)
356 })
357 .map_err(sqlx_error_to_exec_err)
358 }
359 #[cfg(feature = "rusqlite")]
360 InnerConnection::Rusqlite(conn) => conn.execute(stmt, &self.metric_callback),
361 #[cfg(feature = "mock")]
362 InnerConnection::Mock(conn) => conn.execute(stmt),
363 #[cfg(feature = "proxy")]
364 InnerConnection::Proxy(conn) => conn.execute(stmt),
365 #[allow(unreachable_patterns)]
366 _ => Err(conn_err("Disconnected")),
367 }
368 }
369 )
370 }
371
372 #[instrument(level = "trace")]
373 #[allow(unused_variables)]
374 fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
375 debug_print!("{}", sql);
376
377 super::tracing_spans::with_db_span!(
378 "sea_orm.execute_unprepared",
379 self.backend,
380 sql,
381 record_stmt = false,
382 {
383 #[cfg(not(feature = "sync"))]
384 let conn = &mut *self.conn.lock();
385 #[cfg(feature = "sync")]
386 let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
387
388 match conn {
389 #[cfg(feature = "sqlx-mysql")]
390 InnerConnection::MySql(conn) => {
391 let conn: &mut sqlx::MySqlConnection = &mut *conn;
392 sqlx::Executor::execute(conn, sql)
393 .map(Into::into)
394 .map_err(sqlx_error_to_exec_err)
395 }
396 #[cfg(feature = "sqlx-postgres")]
397 InnerConnection::Postgres(conn) => {
398 let conn: &mut sqlx::PgConnection = &mut *conn;
399 sqlx::Executor::execute(conn, sql)
400 .map(Into::into)
401 .map_err(sqlx_error_to_exec_err)
402 }
403 #[cfg(feature = "sqlx-sqlite")]
404 InnerConnection::Sqlite(conn) => {
405 let conn: &mut sqlx::SqliteConnection = &mut *conn;
406 sqlx::Executor::execute(conn, sql)
407 .map(Into::into)
408 .map_err(sqlx_error_to_exec_err)
409 }
410 #[cfg(feature = "rusqlite")]
411 InnerConnection::Rusqlite(conn) => conn.execute_unprepared(sql),
412 #[cfg(feature = "mock")]
413 InnerConnection::Mock(conn) => {
414 let db_backend = conn.get_database_backend();
415 let stmt = Statement::from_string(db_backend, sql);
416 conn.execute(stmt)
417 }
418 #[cfg(feature = "proxy")]
419 InnerConnection::Proxy(conn) => {
420 let db_backend = conn.get_database_backend();
421 let stmt = Statement::from_string(db_backend, sql);
422 conn.execute(stmt)
423 }
424 #[allow(unreachable_patterns)]
425 _ => Err(conn_err("Disconnected")),
426 }
427 }
428 )
429 }
430
431 #[instrument(level = "trace")]
432 #[allow(unused_variables)]
433 fn query_one_raw(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
434 debug_print!("{}", stmt);
435
436 super::tracing_spans::with_db_span!(
437 "sea_orm.query_one",
438 self.backend,
439 stmt.sql.as_str(),
440 record_stmt = true,
441 {
442 #[cfg(not(feature = "sync"))]
443 let conn = &mut *self.conn.lock();
444 #[cfg(feature = "sync")]
445 let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
446
447 match conn {
448 #[cfg(feature = "sqlx-mysql")]
449 InnerConnection::MySql(conn) => {
450 let query = crate::driver::sqlx_mysql::sqlx_query(&stmt);
451 let conn: &mut sqlx::MySqlConnection = &mut *conn;
452 crate::metric::metric!(self.metric_callback, &stmt, {
453 crate::sqlx_map_err_ignore_not_found(
454 query.fetch_one(conn).map(|row| Some(row.into())),
455 )
456 })
457 }
458 #[cfg(feature = "sqlx-postgres")]
459 InnerConnection::Postgres(conn) => {
460 let query = crate::driver::sqlx_postgres::sqlx_query(&stmt);
461 let conn: &mut sqlx::PgConnection = &mut *conn;
462 crate::metric::metric!(self.metric_callback, &stmt, {
463 crate::sqlx_map_err_ignore_not_found(
464 query.fetch_one(conn).map(|row| Some(row.into())),
465 )
466 })
467 }
468 #[cfg(feature = "sqlx-sqlite")]
469 InnerConnection::Sqlite(conn) => {
470 let query = crate::driver::sqlx_sqlite::sqlx_query(&stmt);
471 let conn: &mut sqlx::SqliteConnection = &mut *conn;
472 crate::metric::metric!(self.metric_callback, &stmt, {
473 crate::sqlx_map_err_ignore_not_found(
474 query.fetch_one(conn).map(|row| Some(row.into())),
475 )
476 })
477 }
478 #[cfg(feature = "rusqlite")]
479 InnerConnection::Rusqlite(conn) => conn.query_one(stmt, &self.metric_callback),
480 #[cfg(feature = "mock")]
481 InnerConnection::Mock(conn) => conn.query_one(stmt),
482 #[cfg(feature = "proxy")]
483 InnerConnection::Proxy(conn) => conn.query_one(stmt),
484 #[allow(unreachable_patterns)]
485 _ => Err(conn_err("Disconnected")),
486 }
487 }
488 )
489 }
490
491 #[instrument(level = "trace")]
492 #[allow(unused_variables)]
493 fn query_all_raw(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
494 debug_print!("{}", stmt);
495
496 super::tracing_spans::with_db_span!(
497 "sea_orm.query_all",
498 self.backend,
499 stmt.sql.as_str(),
500 record_stmt = true,
501 {
502 #[cfg(not(feature = "sync"))]
503 let conn = &mut *self.conn.lock();
504 #[cfg(feature = "sync")]
505 let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
506
507 match conn {
508 #[cfg(feature = "sqlx-mysql")]
509 InnerConnection::MySql(conn) => {
510 let query = crate::driver::sqlx_mysql::sqlx_query(&stmt);
511 let conn: &mut sqlx::MySqlConnection = &mut *conn;
512 crate::metric::metric!(self.metric_callback, &stmt, {
513 query
514 .fetch_all(conn)
515 .map(|rows| rows.into_iter().map(|r| r.into()).collect())
516 .map_err(sqlx_error_to_query_err)
517 })
518 }
519 #[cfg(feature = "sqlx-postgres")]
520 InnerConnection::Postgres(conn) => {
521 let query = crate::driver::sqlx_postgres::sqlx_query(&stmt);
522 let conn: &mut sqlx::PgConnection = &mut *conn;
523 crate::metric::metric!(self.metric_callback, &stmt, {
524 query
525 .fetch_all(conn)
526 .map(|rows| rows.into_iter().map(|r| r.into()).collect())
527 .map_err(sqlx_error_to_query_err)
528 })
529 }
530 #[cfg(feature = "sqlx-sqlite")]
531 InnerConnection::Sqlite(conn) => {
532 let query = crate::driver::sqlx_sqlite::sqlx_query(&stmt);
533 let conn: &mut sqlx::SqliteConnection = &mut *conn;
534 crate::metric::metric!(self.metric_callback, &stmt, {
535 query
536 .fetch_all(conn)
537 .map(|rows| rows.into_iter().map(|r| r.into()).collect())
538 .map_err(sqlx_error_to_query_err)
539 })
540 }
541 #[cfg(feature = "rusqlite")]
542 InnerConnection::Rusqlite(conn) => conn.query_all(stmt, &self.metric_callback),
543 #[cfg(feature = "mock")]
544 InnerConnection::Mock(conn) => conn.query_all(stmt),
545 #[cfg(feature = "proxy")]
546 InnerConnection::Proxy(conn) => conn.query_all(stmt),
547 #[allow(unreachable_patterns)]
548 _ => Err(conn_err("Disconnected")),
549 }
550 }
551 )
552 }
553}
554
555impl StreamTrait for DatabaseTransaction {
556 type Stream<'a> = TransactionStream<'a>;
557
558 fn get_database_backend(&self) -> DbBackend {
559 self.backend
560 }
561
562 #[instrument(level = "trace")]
563 fn stream_raw<'a>(&'a self, stmt: Statement) -> Result<Self::Stream<'a>, DbErr> {
564 ({
565 #[cfg(not(feature = "sync"))]
566 let conn = self.conn.lock();
567 #[cfg(feature = "sync")]
568 let conn = self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?;
569 Ok(crate::TransactionStream::build(
570 conn,
571 stmt,
572 self.metric_callback.clone(),
573 ))
574 })
575 }
576}
577
578impl TransactionTrait for DatabaseTransaction {
579 type Transaction = DatabaseTransaction;
580
581 #[instrument(level = "trace")]
582 fn begin(&self) -> Result<DatabaseTransaction, DbErr> {
583 DatabaseTransaction::begin(
584 Arc::clone(&self.conn),
585 self.backend,
586 self.metric_callback.clone(),
587 None,
588 None,
589 None,
590 )
591 }
592
593 #[instrument(level = "trace")]
594 fn begin_with_config(
595 &self,
596 isolation_level: Option<IsolationLevel>,
597 access_mode: Option<AccessMode>,
598 ) -> Result<DatabaseTransaction, DbErr> {
599 DatabaseTransaction::begin(
600 Arc::clone(&self.conn),
601 self.backend,
602 self.metric_callback.clone(),
603 isolation_level,
604 access_mode,
605 None,
606 )
607 }
608
609 #[instrument(level = "trace")]
610 fn begin_with_options(
611 &self,
612 options: TransactionOptions,
613 ) -> Result<DatabaseTransaction, DbErr> {
614 DatabaseTransaction::begin(
615 Arc::clone(&self.conn),
616 self.backend,
617 self.metric_callback.clone(),
618 options.isolation_level,
619 options.access_mode,
620 options.sqlite_transaction_mode,
621 )
622 }
623
624 #[instrument(level = "trace", skip(_callback))]
628 fn transaction<F, T, E>(&self, _callback: F) -> Result<T, TransactionError<E>>
629 where
630 F: for<'c> FnOnce(&'c DatabaseTransaction) -> Result<T, E>,
631 E: std::fmt::Display + std::fmt::Debug,
632 {
633 let transaction = self.begin().map_err(TransactionError::Connection)?;
634 transaction.run(_callback)
635 }
636
637 #[instrument(level = "trace", skip(_callback))]
641 fn transaction_with_config<F, T, E>(
642 &self,
643 _callback: F,
644 isolation_level: Option<IsolationLevel>,
645 access_mode: Option<AccessMode>,
646 ) -> Result<T, TransactionError<E>>
647 where
648 F: for<'c> FnOnce(&'c DatabaseTransaction) -> Result<T, E>,
649 E: std::fmt::Display + std::fmt::Debug,
650 {
651 let transaction = self
652 .begin_with_config(isolation_level, access_mode)
653 .map_err(TransactionError::Connection)?;
654 transaction.run(_callback)
655 }
656}
657
658#[derive(Debug)]
660pub enum TransactionError<E> {
661 Connection(DbErr),
663 Transaction(E),
665}
666
667impl<E> std::fmt::Display for TransactionError<E>
668where
669 E: std::fmt::Display + std::fmt::Debug,
670{
671 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
672 match self {
673 TransactionError::Connection(e) => std::fmt::Display::fmt(e, f),
674 TransactionError::Transaction(e) => std::fmt::Display::fmt(e, f),
675 }
676 }
677}
678
679impl<E> std::error::Error for TransactionError<E> where E: std::fmt::Display + std::fmt::Debug {}
680
681impl<E> From<DbErr> for TransactionError<E>
682where
683 E: std::fmt::Display + std::fmt::Debug,
684{
685 fn from(e: DbErr) -> Self {
686 Self::Connection(e)
687 }
688}