sea_orm/database/stream/
transaction.rs

1#![allow(missing_docs)]
2
3use std::ops::DerefMut;
4use tracing::instrument;
5
6#[cfg(feature = "sqlx-dep")]
7use futures_util::TryStreamExt;
8
9use std::sync::MutexGuard;
10
11#[cfg(feature = "sqlx-dep")]
12use sqlx::Executor;
13
14use super::metric::MetricStream;
15#[cfg(feature = "sqlx-dep")]
16use crate::driver::*;
17use crate::{DbErr, InnerConnection, QueryResult, Statement};
18
19/// `TransactionStream` cannot be used in a `transaction` closure as it does not impl `Send`.
20/// It seems to be a Rust limitation right now, and solution to work around this deemed to be extremely hard.
21#[ouroboros::self_referencing]
22pub struct TransactionStream<'a> {
23    stmt: Statement,
24    conn: MutexGuard<'a, InnerConnection>,
25    metric_callback: Option<crate::metric::Callback>,
26    #[borrows(mut conn, stmt, metric_callback)]
27    #[not_covariant]
28    stream: MetricStream<'this>,
29}
30
31impl std::fmt::Debug for TransactionStream<'_> {
32    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33        write!(f, "TransactionStream")
34    }
35}
36
37impl TransactionStream<'_> {
38    #[instrument(level = "trace", skip(metric_callback))]
39    #[allow(unused_variables)]
40    pub(crate) fn build(
41        conn: MutexGuard<'_, InnerConnection>,
42        stmt: Statement,
43        metric_callback: Option<crate::metric::Callback>,
44    ) -> TransactionStream<'_> {
45        TransactionStreamBuilder {
46            stmt,
47            conn,
48            metric_callback,
49            stream_builder: |conn, stmt, _metric_callback| match conn.deref_mut() {
50                #[cfg(feature = "sqlx-mysql")]
51                InnerConnection::MySql(c) => {
52                    let query = crate::driver::sqlx_mysql::sqlx_query(stmt);
53                    let start = _metric_callback.is_some().then(std::time::SystemTime::now);
54                    let stream = c
55                        .fetch(query)
56                        .map_ok(Into::into)
57                        .map_err(sqlx_error_to_query_err);
58                    let elapsed = start.map(|s| s.elapsed().unwrap_or_default());
59                    MetricStream::new(_metric_callback, stmt, elapsed, stream)
60                }
61                #[cfg(feature = "sqlx-postgres")]
62                InnerConnection::Postgres(c) => {
63                    let query = crate::driver::sqlx_postgres::sqlx_query(stmt);
64                    let start = _metric_callback.is_some().then(std::time::SystemTime::now);
65                    let stream = c
66                        .fetch(query)
67                        .map_ok(Into::into)
68                        .map_err(sqlx_error_to_query_err);
69                    let elapsed = start.map(|s| s.elapsed().unwrap_or_default());
70                    MetricStream::new(_metric_callback, stmt, elapsed, stream)
71                }
72                #[cfg(feature = "sqlx-sqlite")]
73                InnerConnection::Sqlite(c) => {
74                    let query = crate::driver::sqlx_sqlite::sqlx_query(stmt);
75                    let start = _metric_callback.is_some().then(std::time::SystemTime::now);
76                    let stream = c
77                        .fetch(query)
78                        .map_ok(Into::into)
79                        .map_err(sqlx_error_to_query_err);
80                    let elapsed = start.map(|s| s.elapsed().unwrap_or_default());
81                    MetricStream::new(_metric_callback, stmt, elapsed, stream)
82                }
83                #[cfg(feature = "mock")]
84                InnerConnection::Mock(c) => {
85                    let start = _metric_callback.is_some().then(std::time::SystemTime::now);
86                    let stream = c.fetch(stmt);
87                    let elapsed = start.map(|s| s.elapsed().unwrap_or_default());
88                    MetricStream::new(_metric_callback, stmt, elapsed, stream)
89                }
90                #[cfg(feature = "proxy")]
91                InnerConnection::Proxy(c) => {
92                    let start = _metric_callback.is_some().then(std::time::SystemTime::now);
93                    let stream = futures_util::stream::once({
94                        Err(DbErr::BackendNotSupported {
95                            db: "Proxy",
96                            ctx: "TransactionStream",
97                        })
98                    });
99                    let elapsed = start.map(|s| s.elapsed().unwrap_or_default());
100                    MetricStream::new(_metric_callback, stmt, elapsed, stream)
101                }
102                #[allow(unreachable_patterns)]
103                _ => unreachable!(),
104            },
105        }
106        .build()
107    }
108}
109
110#[cfg(not(feature = "sync"))]
111impl Stream for TransactionStream<'_> {
112    type Item = Result<QueryResult, DbErr>;
113
114    fn poll_next(
115        self: Pin<&mut Self>,
116        cx: &mut std::task::Context<'_>,
117    ) -> Poll<Option<Self::Item>> {
118        let this = self.get_mut();
119        this.with_stream_mut(|stream| Pin::new(stream).poll_next(cx))
120    }
121}
122
123#[cfg(feature = "sync")]
124impl Iterator for TransactionStream<'_> {
125    type Item = Result<QueryResult, DbErr>;
126
127    fn next(&mut self) -> Option<Self::Item> {
128        self.with_stream_mut(|stream| stream.next())
129    }
130}