sea_orm/database/stream/
transaction.rs1#![allow(missing_docs)]
2
3use std::{ops::DerefMut, pin::Pin, task::Poll};
4use tracing::instrument;
5
6#[cfg(feature = "sqlx-dep")]
7use futures_util::TryStreamExt;
8use futures_util::{Stream, lock::MutexGuard};
9
10#[cfg(feature = "sqlx-dep")]
11use sqlx::Executor;
12
13use super::metric::MetricStream;
14#[cfg(feature = "sqlx-dep")]
15use crate::driver::*;
16use crate::{DbErr, InnerConnection, QueryResult, Statement};
17
18#[ouroboros::self_referencing]
21pub struct TransactionStream<'a> {
22 stmt: Statement,
23 conn: MutexGuard<'a, InnerConnection>,
24 metric_callback: Option<crate::metric::Callback>,
25 #[borrows(mut conn, stmt, metric_callback)]
26 #[not_covariant]
27 stream: MetricStream<'this>,
28}
29
30impl std::fmt::Debug for TransactionStream<'_> {
31 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32 write!(f, "TransactionStream")
33 }
34}
35
36impl TransactionStream<'_> {
37 #[instrument(level = "trace", skip(metric_callback))]
38 #[allow(unused_variables)]
39 pub(crate) fn build(
40 conn: MutexGuard<'_, InnerConnection>,
41 stmt: Statement,
42 metric_callback: Option<crate::metric::Callback>,
43 ) -> TransactionStream<'_> {
44 TransactionStreamBuilder {
45 stmt,
46 conn,
47 metric_callback,
48 stream_builder: |conn, stmt, _metric_callback| match conn.deref_mut() {
49 #[cfg(feature = "sqlx-mysql")]
50 InnerConnection::MySql(c) => {
51 let query = crate::driver::sqlx_mysql::sqlx_query(stmt);
52 let start = _metric_callback.is_some().then(std::time::SystemTime::now);
53 let stream = c
54 .fetch(query)
55 .map_ok(Into::into)
56 .map_err(sqlx_error_to_query_err);
57 let elapsed = start.map(|s| s.elapsed().unwrap_or_default());
58 MetricStream::new(_metric_callback, stmt, elapsed, stream)
59 }
60 #[cfg(feature = "sqlx-postgres")]
61 InnerConnection::Postgres(c) => {
62 let query = crate::driver::sqlx_postgres::sqlx_query(stmt);
63 let start = _metric_callback.is_some().then(std::time::SystemTime::now);
64 let stream = c
65 .fetch(query)
66 .map_ok(Into::into)
67 .map_err(sqlx_error_to_query_err);
68 let elapsed = start.map(|s| s.elapsed().unwrap_or_default());
69 MetricStream::new(_metric_callback, stmt, elapsed, stream)
70 }
71 #[cfg(feature = "sqlx-sqlite")]
72 InnerConnection::Sqlite(c) => {
73 let query = crate::driver::sqlx_sqlite::sqlx_query(stmt);
74 let start = _metric_callback.is_some().then(std::time::SystemTime::now);
75 let stream = c
76 .fetch(query)
77 .map_ok(Into::into)
78 .map_err(sqlx_error_to_query_err);
79 let elapsed = start.map(|s| s.elapsed().unwrap_or_default());
80 MetricStream::new(_metric_callback, stmt, elapsed, stream)
81 }
82 #[cfg(feature = "mock")]
83 InnerConnection::Mock(c) => {
84 let start = _metric_callback.is_some().then(std::time::SystemTime::now);
85 let stream = c.fetch(stmt);
86 let elapsed = start.map(|s| s.elapsed().unwrap_or_default());
87 MetricStream::new(_metric_callback, stmt, elapsed, stream)
88 }
89 #[cfg(feature = "proxy")]
90 InnerConnection::Proxy(c) => {
91 let start = _metric_callback.is_some().then(std::time::SystemTime::now);
92 let stream = futures_util::stream::once(async {
93 Err(DbErr::BackendNotSupported {
94 db: "Proxy",
95 ctx: "TransactionStream",
96 })
97 });
98 let elapsed = start.map(|s| s.elapsed().unwrap_or_default());
99 MetricStream::new(_metric_callback, stmt, elapsed, stream)
100 }
101 #[allow(unreachable_patterns)]
102 _ => unreachable!(),
103 },
104 }
105 .build()
106 }
107}
108
109impl Stream for TransactionStream<'_> {
110 type Item = Result<QueryResult, DbErr>;
111
112 fn poll_next(
113 self: Pin<&mut Self>,
114 cx: &mut std::task::Context<'_>,
115 ) -> Poll<Option<Self::Item>> {
116 let this = self.get_mut();
117 this.with_stream_mut(|stream| Pin::new(stream).poll_next(cx))
118 }
119}