1use futures::{StreamExt, TryStreamExt};
2use tracing::Instrument;
3
4impl<DB> AsMut<<DB as sqlx::Database>::Connection> for crate::PoolConnection<DB>
5where
6 DB: crate::prelude::Database + sqlx::Database,
7{
8 fn as_mut(&mut self) -> &mut <DB as sqlx::Database>::Connection {
9 self.inner.as_mut()
10 }
11}
12
13impl<'c, DB> sqlx::Executor<'c> for &'c mut crate::PoolConnection<DB>
14where
15 DB: crate::prelude::Database + sqlx::Database,
16 for<'a> &'a mut DB::Connection: sqlx::Executor<'a, Database = DB>,
17{
18 type Database = DB;
19
20 #[doc(hidden)]
21 fn describe<'e, 'q: 'e>(
22 self,
23 sql: &'q str,
24 ) -> futures::future::BoxFuture<'e, Result<sqlx::Describe<Self::Database>, sqlx::Error>>
25 where
26 'c: 'e,
27 {
28 let attrs = &self.attributes;
29 let span = crate::instrument!("sqlx.describe", sql, attrs);
30 let fut = self.inner.as_mut().describe(sql);
31 Box::pin(async move { fut.await.inspect_err(crate::span::record_error) }.instrument(span))
32 }
33
34 fn execute<'e, 'q: 'e, E>(
35 self,
36 query: E,
37 ) -> futures::future::BoxFuture<
38 'e,
39 Result<<Self::Database as sqlx::Database>::QueryResult, sqlx::Error>,
40 >
41 where
42 E: 'q + sqlx::Execute<'q, Self::Database>,
43 'c: 'e,
44 {
45 let sql = query.sql();
46 let attrs = &self.attributes;
47 let span = crate::instrument!("sqlx.execute", sql, attrs);
48 let fut = self.inner.execute(query);
49 Box::pin(async move { fut.await.inspect_err(crate::span::record_error) }.instrument(span))
50 }
51
52 fn execute_many<'e, 'q: 'e, E>(
53 self,
54 query: E,
55 ) -> futures::stream::BoxStream<
56 'e,
57 Result<<Self::Database as sqlx::Database>::QueryResult, sqlx::Error>,
58 >
59 where
60 E: 'q + sqlx::Execute<'q, Self::Database>,
61 'c: 'e,
62 {
63 let sql = query.sql();
64 let attrs = &self.attributes;
65 let span = crate::instrument!("sqlx.execute_many", sql, attrs);
66 let stream = self.inner.execute_many(query);
67 use futures::StreamExt;
68 Box::pin(
69 stream
70 .inspect(move |_| {
71 let _enter = span.enter();
72 })
73 .inspect_err(crate::span::record_error),
74 )
75 }
76
77 fn fetch<'e, 'q: 'e, E>(
78 self,
79 query: E,
80 ) -> futures::stream::BoxStream<'e, Result<<Self::Database as sqlx::Database>::Row, sqlx::Error>>
81 where
82 E: 'q + sqlx::Execute<'q, Self::Database>,
83 'c: 'e,
84 {
85 let sql = query.sql();
86 let attrs = &self.attributes;
87 let span = crate::instrument!("sqlx.fetch", sql, attrs);
88 let stream = self.inner.fetch(query);
89 use futures::StreamExt;
90 Box::pin(
91 stream
92 .inspect(move |_| {
93 let _enter = span.enter();
94 })
95 .inspect_err(crate::span::record_error),
96 )
97 }
98
99 fn fetch_all<'e, 'q: 'e, E>(
100 self,
101 query: E,
102 ) -> futures::future::BoxFuture<
103 'e,
104 Result<Vec<<Self::Database as sqlx::Database>::Row>, sqlx::Error>,
105 >
106 where
107 E: 'q + sqlx::Execute<'q, Self::Database>,
108 'c: 'e,
109 {
110 let sql = query.sql();
111 let attrs = &self.attributes;
112 let span = crate::instrument!("sqlx.fetch_all", sql, attrs);
113 let fut = self.inner.fetch_all(query);
114 Box::pin(
115 async move {
116 fut.await
117 .inspect(|res| {
118 let span = tracing::Span::current();
119 span.record("db.response.returned_rows", res.len());
120 })
121 .inspect_err(crate::span::record_error)
122 }
123 .instrument(span),
124 )
125 }
126
127 fn fetch_many<'e, 'q: 'e, E>(
128 self,
129 query: E,
130 ) -> futures::stream::BoxStream<
131 'e,
132 Result<
133 sqlx::Either<
134 <Self::Database as sqlx::Database>::QueryResult,
135 <Self::Database as sqlx::Database>::Row,
136 >,
137 sqlx::Error,
138 >,
139 >
140 where
141 E: 'q + sqlx::Execute<'q, Self::Database>,
142 'c: 'e,
143 {
144 let sql = query.sql();
145 let attrs = &self.attributes;
146 let span = crate::instrument!("sqlx.fetch_all", sql, attrs);
147 let stream = self.inner.fetch_many(query);
148 Box::pin(
149 stream
150 .inspect(move |_| {
151 let _enter = span.enter();
152 })
153 .inspect_err(crate::span::record_error),
154 )
155 }
156
157 fn fetch_one<'e, 'q: 'e, E>(
158 self,
159 query: E,
160 ) -> futures::future::BoxFuture<'e, Result<<Self::Database as sqlx::Database>::Row, sqlx::Error>>
161 where
162 E: 'q + sqlx::Execute<'q, Self::Database>,
163 'c: 'e,
164 {
165 let sql = query.sql();
166 let attrs = &self.attributes;
167 let span = crate::instrument!("sqlx.fetch_one", sql, attrs);
168 let fut = self.inner.fetch_one(query);
169 Box::pin(
170 async move {
171 fut.await
172 .inspect(|_| {
173 tracing::Span::current().record("db.response.returned_rows", 1);
174 })
175 .inspect_err(crate::span::record_error)
176 }
177 .instrument(span),
178 )
179 }
180
181 fn fetch_optional<'e, 'q: 'e, E>(
182 self,
183 query: E,
184 ) -> futures::future::BoxFuture<
185 'e,
186 Result<Option<<Self::Database as sqlx::Database>::Row>, sqlx::Error>,
187 >
188 where
189 E: 'q + sqlx::Execute<'q, Self::Database>,
190 'c: 'e,
191 {
192 let sql = query.sql();
193 let attrs = &self.attributes;
194 let span = crate::instrument!("sqlx.fetch_optional", sql, attrs);
195 let fut = self.inner.fetch_optional(query);
196 Box::pin(
197 async move {
198 fut.await
199 .inspect(|res| {
200 tracing::Span::current().record(
201 "db.response.returned_rows",
202 if res.is_some() { 1 } else { 0 },
203 );
204 })
205 .inspect_err(crate::span::record_error)
206 }
207 .instrument(span),
208 )
209 }
210
211 fn prepare<'e, 'q: 'e>(
212 self,
213 query: &'q str,
214 ) -> futures::future::BoxFuture<
215 'e,
216 Result<<Self::Database as sqlx::Database>::Statement<'q>, sqlx::Error>,
217 >
218 where
219 'c: 'e,
220 {
221 let attrs = &self.attributes;
222 let span = crate::instrument!("sqlx.prepare", query, attrs);
223 let fut = self.inner.prepare(query);
224 Box::pin(async move { fut.await.inspect_err(crate::span::record_error) }.instrument(span))
225 }
226
227 fn prepare_with<'e, 'q: 'e>(
228 self,
229 sql: &'q str,
230 parameters: &'e [<Self::Database as sqlx::Database>::TypeInfo],
231 ) -> futures::future::BoxFuture<
232 'e,
233 Result<<Self::Database as sqlx::Database>::Statement<'q>, sqlx::Error>,
234 >
235 where
236 'c: 'e,
237 {
238 let attrs = &self.attributes;
239 let span = crate::instrument!("sqlx.prepare_with", sql, attrs);
240 let fut = self.inner.prepare_with(sql, parameters);
241 Box::pin(async move { fut.await.inspect_err(crate::span::record_error) }.instrument(span))
242 }
243}
244
245impl<'c, DB> sqlx::Executor<'c> for &'c mut crate::Connection<'c, DB>
246where
247 DB: crate::prelude::Database + sqlx::Database,
248 for<'a> &'a mut DB::Connection: sqlx::Executor<'a, Database = DB>,
249{
250 type Database = DB;
251
252 #[doc(hidden)]
253 fn describe<'e, 'q: 'e>(
254 self,
255 sql: &'q str,
256 ) -> futures::future::BoxFuture<'e, Result<sqlx::Describe<Self::Database>, sqlx::Error>>
257 where
258 'c: 'e,
259 {
260 let attrs = &self.attributes;
261 let span = crate::instrument!("sqlx.describe", sql, attrs);
262 let fut = self.inner.describe(sql);
263 Box::pin(async move { fut.await.inspect_err(crate::span::record_error) }.instrument(span))
264 }
265
266 fn execute<'e, 'q: 'e, E>(
267 self,
268 query: E,
269 ) -> futures::future::BoxFuture<
270 'e,
271 Result<<Self::Database as sqlx::Database>::QueryResult, sqlx::Error>,
272 >
273 where
274 E: 'q + sqlx::Execute<'q, Self::Database>,
275 'c: 'e,
276 {
277 let sql = query.sql();
278 let attrs = &self.attributes;
279 let span = crate::instrument!("sqlx.execute", sql, attrs);
280 let fut = self.inner.execute(query);
281 Box::pin(async move { fut.await.inspect_err(crate::span::record_error) }.instrument(span))
282 }
283
284 fn execute_many<'e, 'q: 'e, E>(
285 self,
286 query: E,
287 ) -> futures::stream::BoxStream<
288 'e,
289 Result<<Self::Database as sqlx::Database>::QueryResult, sqlx::Error>,
290 >
291 where
292 E: 'q + sqlx::Execute<'q, Self::Database>,
293 'c: 'e,
294 {
295 let sql = query.sql();
296 let attrs = &self.attributes;
297 let span = crate::instrument!("sqlx.execute_many", sql, attrs);
298 let stream = self.inner.execute_many(query);
299 use futures::StreamExt;
300 Box::pin(
301 stream
302 .inspect(move |_| {
303 let _enter = span.enter();
304 })
305 .inspect_err(crate::span::record_error),
306 )
307 }
308
309 fn fetch<'e, 'q: 'e, E>(
310 self,
311 query: E,
312 ) -> futures::stream::BoxStream<'e, Result<<Self::Database as sqlx::Database>::Row, sqlx::Error>>
313 where
314 E: 'q + sqlx::Execute<'q, Self::Database>,
315 'c: 'e,
316 {
317 let sql = query.sql();
318 let attrs = &self.attributes;
319 let span = crate::instrument!("sqlx.fetch", sql, attrs);
320 let stream = self.inner.fetch(query);
321 use futures::StreamExt;
322 Box::pin(
323 stream
324 .inspect(move |_| {
325 let _enter = span.enter();
326 })
327 .inspect_err(crate::span::record_error),
328 )
329 }
330
331 fn fetch_all<'e, 'q: 'e, E>(
332 self,
333 query: E,
334 ) -> futures::future::BoxFuture<
335 'e,
336 Result<Vec<<Self::Database as sqlx::Database>::Row>, sqlx::Error>,
337 >
338 where
339 E: 'q + sqlx::Execute<'q, Self::Database>,
340 'c: 'e,
341 {
342 let sql = query.sql();
343 let attrs = &self.attributes;
344 let span = crate::instrument!("sqlx.fetch_all", sql, attrs);
345 let fut = self.inner.fetch_all(query);
346 Box::pin(
347 async move {
348 fut.await
349 .inspect(|res| {
350 let span = tracing::Span::current();
351 span.record("db.response.returned_rows", res.len());
352 })
353 .inspect_err(crate::span::record_error)
354 }
355 .instrument(span),
356 )
357 }
358
359 fn fetch_many<'e, 'q: 'e, E>(
360 self,
361 query: E,
362 ) -> futures::stream::BoxStream<
363 'e,
364 Result<
365 sqlx::Either<
366 <Self::Database as sqlx::Database>::QueryResult,
367 <Self::Database as sqlx::Database>::Row,
368 >,
369 sqlx::Error,
370 >,
371 >
372 where
373 E: 'q + sqlx::Execute<'q, Self::Database>,
374 'c: 'e,
375 {
376 let sql = query.sql();
377 let attrs = &self.attributes;
378 let span = crate::instrument!("sqlx.fetch_all", sql, attrs);
379 let stream = self.inner.fetch_many(query);
380 Box::pin(
381 stream
382 .inspect(move |_| {
383 let _enter = span.enter();
384 })
385 .inspect_err(crate::span::record_error),
386 )
387 }
388
389 fn fetch_one<'e, 'q: 'e, E>(
390 self,
391 query: E,
392 ) -> futures::future::BoxFuture<'e, Result<<Self::Database as sqlx::Database>::Row, sqlx::Error>>
393 where
394 E: 'q + sqlx::Execute<'q, Self::Database>,
395 'c: 'e,
396 {
397 let sql = query.sql();
398 let attrs = &self.attributes;
399 let span = crate::instrument!("sqlx.fetch_one", sql, attrs);
400 let fut = self.inner.fetch_one(query);
401 Box::pin(
402 async move {
403 fut.await
404 .inspect(crate::span::record_one)
405 .inspect_err(crate::span::record_error)
406 }
407 .instrument(span),
408 )
409 }
410
411 fn fetch_optional<'e, 'q: 'e, E>(
412 self,
413 query: E,
414 ) -> futures::future::BoxFuture<
415 'e,
416 Result<Option<<Self::Database as sqlx::Database>::Row>, sqlx::Error>,
417 >
418 where
419 E: 'q + sqlx::Execute<'q, Self::Database>,
420 'c: 'e,
421 {
422 let sql = query.sql();
423 let attrs = &self.attributes;
424 let span = crate::instrument!("sqlx.fetch_optional", sql, attrs);
425 let fut = self.inner.fetch_optional(query);
426 Box::pin(
427 async move {
428 fut.await
429 .inspect(crate::span::record_optional)
430 .inspect_err(crate::span::record_error)
431 }
432 .instrument(span),
433 )
434 }
435
436 fn prepare<'e, 'q: 'e>(
437 self,
438 query: &'q str,
439 ) -> futures::future::BoxFuture<
440 'e,
441 Result<<Self::Database as sqlx::Database>::Statement<'q>, sqlx::Error>,
442 >
443 where
444 'c: 'e,
445 {
446 let attrs = &self.attributes;
447 let span = crate::instrument!("sqlx.prepare", query, attrs);
448 let fut = self.inner.prepare(query);
449 Box::pin(async move { fut.await.inspect_err(crate::span::record_error) }.instrument(span))
450 }
451
452 fn prepare_with<'e, 'q: 'e>(
453 self,
454 sql: &'q str,
455 parameters: &'e [<Self::Database as sqlx::Database>::TypeInfo],
456 ) -> futures::future::BoxFuture<
457 'e,
458 Result<<Self::Database as sqlx::Database>::Statement<'q>, sqlx::Error>,
459 >
460 where
461 'c: 'e,
462 {
463 let attrs = &self.attributes;
464 let span = crate::instrument!("sqlx.prepare_with", sql, attrs);
465 let fut = self.inner.prepare_with(sql, parameters);
466 Box::pin(async move { fut.await.inspect_err(crate::span::record_error) }.instrument(span))
467 }
468}