1use futures::{StreamExt, TryStreamExt};
2use tracing::Instrument;
3
4impl<'c, DB> crate::Transaction<'c, DB>
5where
6 DB: crate::prelude::Database + sqlx::Database,
7 for<'a> &'a mut DB::Connection: sqlx::Executor<'a, Database = DB>,
8{
9 pub fn executor(&mut self) -> crate::Connection<'_, DB> {
13 crate::Connection {
14 inner: &mut *self.inner,
15 attributes: self.attributes.clone(),
16 }
17 }
18}
19
20impl<'c, DB> sqlx::Executor<'c> for &'c mut crate::Transaction<'c, DB>
25where
26 DB: crate::prelude::Database + sqlx::Database,
27 for<'a> &'a mut DB::Connection: sqlx::Executor<'a, Database = DB>,
28{
29 type Database = DB;
30
31 #[doc(hidden)]
32 fn describe<'e, 'q: 'e>(
33 self,
34 sql: &'q str,
35 ) -> futures::future::BoxFuture<'e, Result<sqlx::Describe<Self::Database>, sqlx::Error>>
36 where
37 'c: 'e,
38 {
39 let attrs = &self.attributes;
40 let span = crate::instrument!("sqlx.describe", sql, attrs);
41 Box::pin(
42 async move {
43 let fut = (&mut self.inner).describe(sql);
44 fut.await.inspect_err(crate::span::record_error)
45 }
46 .instrument(span),
47 )
48 }
49
50 fn execute<'e, 'q: 'e, E>(
51 self,
52 query: E,
53 ) -> futures::future::BoxFuture<
54 'e,
55 Result<<Self::Database as sqlx::Database>::QueryResult, sqlx::Error>,
56 >
57 where
58 E: 'q + sqlx::Execute<'q, Self::Database>,
59 'c: 'e,
60 {
61 let sql = query.sql();
62 let attrs = &self.attributes;
63 let span = crate::instrument!("sqlx.execute", sql, attrs);
64 let fut = (&mut self.inner).execute(query);
65 Box::pin(async move { fut.await.inspect_err(crate::span::record_error) }.instrument(span))
66 }
67
68 fn execute_many<'e, 'q: 'e, E>(
69 self,
70 query: E,
71 ) -> futures::stream::BoxStream<
72 'e,
73 Result<<Self::Database as sqlx::Database>::QueryResult, sqlx::Error>,
74 >
75 where
76 E: 'q + sqlx::Execute<'q, Self::Database>,
77 'c: 'e,
78 {
79 let sql = query.sql();
80 let attrs = &self.attributes;
81 let span = crate::instrument!("sqlx.execute_many", sql, attrs);
82 let stream = (&mut self.inner).execute_many(query);
83 use futures::StreamExt;
84 Box::pin(
85 stream
86 .inspect(move |_| {
87 let _enter = span.enter();
88 })
89 .inspect_err(crate::span::record_error),
90 )
91 }
92
93 fn fetch<'e, 'q: 'e, E>(
94 self,
95 query: E,
96 ) -> futures::stream::BoxStream<'e, Result<<Self::Database as sqlx::Database>::Row, sqlx::Error>>
97 where
98 E: 'q + sqlx::Execute<'q, Self::Database>,
99 'c: 'e,
100 {
101 let sql = query.sql();
102 let attrs = &self.attributes;
103 let span = crate::instrument!("sqlx.fetch", sql, attrs);
104 let stream = (&mut self.inner).fetch(query);
105 use futures::StreamExt;
106 Box::pin(
107 stream
108 .inspect(move |_| {
109 let _enter = span.enter();
110 })
111 .inspect_err(crate::span::record_error),
112 )
113 }
114
115 fn fetch_all<'e, 'q: 'e, E>(
116 self,
117 query: E,
118 ) -> futures::future::BoxFuture<
119 'e,
120 Result<Vec<<Self::Database as sqlx::Database>::Row>, sqlx::Error>,
121 >
122 where
123 E: 'q + sqlx::Execute<'q, Self::Database>,
124 'c: 'e,
125 {
126 let sql = query.sql();
127 let attrs = &self.attributes;
128 let span = crate::instrument!("sqlx.fetch_all", sql, attrs);
129 let fut = (&mut self.inner).fetch_all(query);
130 Box::pin(
131 async move {
132 fut.await
133 .inspect(|res| {
134 let span = tracing::Span::current();
135 span.record("db.response.returned_rows", res.len());
136 })
137 .inspect_err(crate::span::record_error)
138 }
139 .instrument(span),
140 )
141 }
142
143 fn fetch_many<'e, 'q: 'e, E>(
144 self,
145 query: E,
146 ) -> futures::stream::BoxStream<
147 'e,
148 Result<
149 sqlx::Either<
150 <Self::Database as sqlx::Database>::QueryResult,
151 <Self::Database as sqlx::Database>::Row,
152 >,
153 sqlx::Error,
154 >,
155 >
156 where
157 E: 'q + sqlx::Execute<'q, Self::Database>,
158 'c: 'e,
159 {
160 let sql = query.sql();
161 let attrs = &self.attributes;
162 let span = crate::instrument!("sqlx.fetch_all", sql, attrs);
163 let stream = (&mut self.inner).fetch_many(query);
164 Box::pin(
165 stream
166 .inspect(move |_| {
167 let _enter = span.enter();
168 })
169 .inspect_err(crate::span::record_error),
170 )
171 }
172
173 fn fetch_one<'e, 'q: 'e, E>(
174 self,
175 query: E,
176 ) -> futures::future::BoxFuture<'e, Result<<Self::Database as sqlx::Database>::Row, sqlx::Error>>
177 where
178 E: 'q + sqlx::Execute<'q, Self::Database>,
179 'c: 'e,
180 {
181 let sql = query.sql();
182 let attrs = &self.attributes;
183 let span = crate::instrument!("sqlx.fetch_one", sql, attrs);
184 let fut = (&mut self.inner).fetch_one(query);
185 Box::pin(
186 async move {
187 fut.await
188 .inspect(crate::span::record_one)
189 .inspect_err(crate::span::record_error)
190 }
191 .instrument(span),
192 )
193 }
194
195 fn fetch_optional<'e, 'q: 'e, E>(
196 self,
197 query: E,
198 ) -> futures::future::BoxFuture<
199 'e,
200 Result<Option<<Self::Database as sqlx::Database>::Row>, sqlx::Error>,
201 >
202 where
203 E: 'q + sqlx::Execute<'q, Self::Database>,
204 'c: 'e,
205 {
206 let sql = query.sql();
207 let attrs = &self.attributes;
208 let span = crate::instrument!("sqlx.fetch_optional", sql, attrs);
209 let fut = (&mut self.inner).fetch_optional(query);
210 Box::pin(
211 async move {
212 fut.await
213 .inspect(crate::span::record_optional)
214 .inspect_err(crate::span::record_error)
215 }
216 .instrument(span),
217 )
218 }
219
220 fn prepare<'e, 'q: 'e>(
221 self,
222 query: &'q str,
223 ) -> futures::future::BoxFuture<
224 'e,
225 Result<<Self::Database as sqlx::Database>::Statement<'q>, sqlx::Error>,
226 >
227 where
228 'c: 'e,
229 {
230 let attrs = &self.attributes;
231 let span = crate::instrument!("sqlx.prepare", query, attrs);
232 let fut = (&mut self.inner).prepare(query);
233 Box::pin(async move { fut.await.inspect_err(crate::span::record_error) }.instrument(span))
234 }
235
236 fn prepare_with<'e, 'q: 'e>(
237 self,
238 sql: &'q str,
239 parameters: &'e [<Self::Database as sqlx::Database>::TypeInfo],
240 ) -> futures::future::BoxFuture<
241 'e,
242 Result<<Self::Database as sqlx::Database>::Statement<'q>, sqlx::Error>,
243 >
244 where
245 'c: 'e,
246 {
247 let attrs = &self.attributes;
248 let span = crate::instrument!("sqlx.prepare_with", sql, attrs);
249 let fut = (&mut self.inner).prepare_with(sql, parameters);
250 Box::pin(async move { fut.await.inspect_err(crate::span::record_error) }.instrument(span))
251 }
252}