sea_orm/database/stream/
transaction.rs1#![allow(missing_docs)]
2
3use std::{ops::DerefMut, pin::Pin, task::Poll};
4use tracing::instrument;
5
6#[cfg(feature = "sqlx-dep")]
7use futures_util::TryStreamExt;
8use futures_util::{lock::MutexGuard, Stream};
9
10#[cfg(feature = "sqlx-dep")]
11use sqlx::Executor;
12
13use super::metric::MetricStream;
14#[cfg(feature = "sqlx-dep")]
15use crate::driver::*;
16use crate::{DbErr, InnerConnection, QueryResult, Statement};
17
18#[ouroboros::self_referencing]
21pub struct TransactionStream<'a> {
22 stmt: Statement,
23 conn: MutexGuard<'a, InnerConnection>,
24 metric_callback: Option<crate::metric::Callback>,
25 #[borrows(mut conn, stmt, metric_callback)]
26 #[not_covariant]
27 stream: MetricStream<'this>,
28}
29
30impl std::fmt::Debug for TransactionStream<'_> {
31 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32 write!(f, "TransactionStream")
33 }
34}
35
36impl TransactionStream<'_> {
37 #[instrument(level = "trace", skip(metric_callback))]
38 #[allow(unused_variables)]
39 pub(crate) fn build(
40 conn: MutexGuard<'_, InnerConnection>,
41 stmt: Statement,
42 metric_callback: Option<crate::metric::Callback>,
43 ) -> TransactionStream<'_> {
44 TransactionStreamBuilder {
45 stmt,
46 conn,
47 metric_callback,
48 stream_builder: |conn, stmt, _metric_callback| match conn.deref_mut() {
49 #[cfg(feature = "sqlx-mysql")]
50 InnerConnection::MySql(c) => {
51 let query = crate::driver::sqlx_mysql::sqlx_query(stmt);
52 let _start = _metric_callback.is_some().then(std::time::SystemTime::now);
53 let stream = c
54 .fetch(query)
55 .map_ok(Into::into)
56 .map_err(sqlx_error_to_query_err);
57 let elapsed = _start.map(|s| s.elapsed().unwrap_or_default());
58 MetricStream::new(_metric_callback, stmt, elapsed, stream)
59 }
60 #[cfg(feature = "sqlx-postgres")]
61 InnerConnection::Postgres(c) => {
62 let query = crate::driver::sqlx_postgres::sqlx_query(stmt);
63 let _start = _metric_callback.is_some().then(std::time::SystemTime::now);
64 let stream = c
65 .fetch(query)
66 .map_ok(Into::into)
67 .map_err(sqlx_error_to_query_err);
68 let elapsed = _start.map(|s| s.elapsed().unwrap_or_default());
69 MetricStream::new(_metric_callback, stmt, elapsed, stream)
70 }
71 #[cfg(feature = "sqlx-sqlite")]
72 InnerConnection::Sqlite(c) => {
73 let query = crate::driver::sqlx_sqlite::sqlx_query(stmt);
74 let _start = _metric_callback.is_some().then(std::time::SystemTime::now);
75 let stream = c
76 .fetch(query)
77 .map_ok(Into::into)
78 .map_err(sqlx_error_to_query_err);
79 let elapsed = _start.map(|s| s.elapsed().unwrap_or_default());
80 MetricStream::new(_metric_callback, stmt, elapsed, stream)
81 }
82 #[cfg(feature = "mock")]
83 InnerConnection::Mock(c) => {
84 let _start = _metric_callback.is_some().then(std::time::SystemTime::now);
85 let stream = c.fetch(stmt);
86 let elapsed = _start.map(|s| s.elapsed().unwrap_or_default());
87 MetricStream::new(_metric_callback, stmt, elapsed, stream)
88 }
89 #[cfg(feature = "proxy")]
90 InnerConnection::Proxy(c) => {
91 todo!("Proxy connection is not supported")
92 }
93 #[allow(unreachable_patterns)]
94 _ => unreachable!(),
95 },
96 }
97 .build()
98 }
99}
100
101impl Stream for TransactionStream<'_> {
102 type Item = Result<QueryResult, DbErr>;
103
104 fn poll_next(
105 self: Pin<&mut Self>,
106 cx: &mut std::task::Context<'_>,
107 ) -> Poll<Option<Self::Item>> {
108 let this = self.get_mut();
109 this.with_stream_mut(|stream| Pin::new(stream).poll_next(cx))
110 }
111}