sea_orm/database/stream/
transaction.rs1#![allow(missing_docs)]
2
3use std::ops::DerefMut;
4use tracing::instrument;
5
6#[cfg(feature = "sqlx-dep")]
7use futures_util::TryStreamExt;
8
9use std::sync::MutexGuard;
10
11#[cfg(feature = "sqlx-dep")]
12use sqlx::Executor;
13
14use super::metric::MetricStream;
15#[cfg(feature = "sqlx-dep")]
16use crate::driver::*;
17use crate::{DbErr, InnerConnection, QueryResult, Statement};
18
19#[ouroboros::self_referencing]
22pub struct TransactionStream<'a> {
23 stmt: Statement,
24 conn: MutexGuard<'a, InnerConnection>,
25 metric_callback: Option<crate::metric::Callback>,
26 #[borrows(mut conn, stmt, metric_callback)]
27 #[not_covariant]
28 stream: MetricStream<'this>,
29}
30
31impl std::fmt::Debug for TransactionStream<'_> {
32 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33 write!(f, "TransactionStream")
34 }
35}
36
37impl TransactionStream<'_> {
38 #[instrument(level = "trace", skip(metric_callback))]
39 #[allow(unused_variables)]
40 pub(crate) fn build(
41 conn: MutexGuard<'_, InnerConnection>,
42 stmt: Statement,
43 metric_callback: Option<crate::metric::Callback>,
44 ) -> TransactionStream<'_> {
45 TransactionStreamBuilder {
46 stmt,
47 conn,
48 metric_callback,
49 stream_builder: |conn, stmt, _metric_callback| match conn.deref_mut() {
50 #[cfg(feature = "sqlx-mysql")]
51 InnerConnection::MySql(c) => {
52 let query = crate::driver::sqlx_mysql::sqlx_query(stmt);
53 let start = _metric_callback.is_some().then(std::time::SystemTime::now);
54 let stream = c
55 .fetch(query)
56 .map_ok(Into::into)
57 .map_err(sqlx_error_to_query_err);
58 let elapsed = start.map(|s| s.elapsed().unwrap_or_default());
59 MetricStream::new(_metric_callback, stmt, elapsed, stream)
60 }
61 #[cfg(feature = "sqlx-postgres")]
62 InnerConnection::Postgres(c) => {
63 let query = crate::driver::sqlx_postgres::sqlx_query(stmt);
64 let start = _metric_callback.is_some().then(std::time::SystemTime::now);
65 let stream = c
66 .fetch(query)
67 .map_ok(Into::into)
68 .map_err(sqlx_error_to_query_err);
69 let elapsed = start.map(|s| s.elapsed().unwrap_or_default());
70 MetricStream::new(_metric_callback, stmt, elapsed, stream)
71 }
72 #[cfg(feature = "sqlx-sqlite")]
73 InnerConnection::Sqlite(c) => {
74 let query = crate::driver::sqlx_sqlite::sqlx_query(stmt);
75 let start = _metric_callback.is_some().then(std::time::SystemTime::now);
76 let stream = c
77 .fetch(query)
78 .map_ok(Into::into)
79 .map_err(sqlx_error_to_query_err);
80 let elapsed = start.map(|s| s.elapsed().unwrap_or_default());
81 MetricStream::new(_metric_callback, stmt, elapsed, stream)
82 }
83 #[cfg(feature = "mock")]
84 InnerConnection::Mock(c) => {
85 let start = _metric_callback.is_some().then(std::time::SystemTime::now);
86 let stream = c.fetch(stmt);
87 let elapsed = start.map(|s| s.elapsed().unwrap_or_default());
88 MetricStream::new(_metric_callback, stmt, elapsed, stream)
89 }
90 #[cfg(feature = "proxy")]
91 InnerConnection::Proxy(c) => {
92 let start = _metric_callback.is_some().then(std::time::SystemTime::now);
93 let stream = futures_util::stream::once({
94 Err(DbErr::BackendNotSupported {
95 db: "Proxy",
96 ctx: "TransactionStream",
97 })
98 });
99 let elapsed = start.map(|s| s.elapsed().unwrap_or_default());
100 MetricStream::new(_metric_callback, stmt, elapsed, stream)
101 }
102 #[allow(unreachable_patterns)]
103 _ => unreachable!(),
104 },
105 }
106 .build()
107 }
108}
109
110#[cfg(not(feature = "sync"))]
111impl Stream for TransactionStream<'_> {
112 type Item = Result<QueryResult, DbErr>;
113
114 fn poll_next(
115 self: Pin<&mut Self>,
116 cx: &mut std::task::Context<'_>,
117 ) -> Poll<Option<Self::Item>> {
118 let this = self.get_mut();
119 this.with_stream_mut(|stream| Pin::new(stream).poll_next(cx))
120 }
121}
122
123#[cfg(feature = "sync")]
124impl Iterator for TransactionStream<'_> {
125 type Item = Result<QueryResult, DbErr>;
126
127 fn next(&mut self) -> Option<Self::Item> {
128 self.with_stream_mut(|stream| stream.next())
129 }
130}