sqlx_tracing/
connection.rs

1use futures::{StreamExt, TryStreamExt};
2use tracing::Instrument;
3
4impl<DB> AsMut<<DB as sqlx::Database>::Connection> for crate::PoolConnection<DB>
5where
6    DB: crate::prelude::Database + sqlx::Database,
7{
8    fn as_mut(&mut self) -> &mut <DB as sqlx::Database>::Connection {
9        self.inner.as_mut()
10    }
11}
12
13impl<'c, DB> sqlx::Executor<'c> for &'c mut crate::PoolConnection<DB>
14where
15    DB: crate::prelude::Database + sqlx::Database,
16    for<'a> &'a mut DB::Connection: sqlx::Executor<'a, Database = DB>,
17{
18    type Database = DB;
19
20    #[doc(hidden)]
21    fn describe<'e, 'q: 'e>(
22        self,
23        sql: &'q str,
24    ) -> futures::future::BoxFuture<'e, Result<sqlx::Describe<Self::Database>, sqlx::Error>>
25    where
26        'c: 'e,
27    {
28        let attrs = &self.attributes;
29        let span = crate::instrument!("sqlx.describe", sql, attrs);
30        let fut = self.inner.as_mut().describe(sql);
31        Box::pin(async move { fut.await.inspect_err(crate::span::record_error) }.instrument(span))
32    }
33
34    fn execute<'e, 'q: 'e, E>(
35        self,
36        query: E,
37    ) -> futures::future::BoxFuture<
38        'e,
39        Result<<Self::Database as sqlx::Database>::QueryResult, sqlx::Error>,
40    >
41    where
42        E: 'q + sqlx::Execute<'q, Self::Database>,
43        'c: 'e,
44    {
45        let sql = query.sql();
46        let attrs = &self.attributes;
47        let span = crate::instrument!("sqlx.execute", sql, attrs);
48        let fut = self.inner.execute(query);
49        Box::pin(async move { fut.await.inspect_err(crate::span::record_error) }.instrument(span))
50    }
51
52    fn execute_many<'e, 'q: 'e, E>(
53        self,
54        query: E,
55    ) -> futures::stream::BoxStream<
56        'e,
57        Result<<Self::Database as sqlx::Database>::QueryResult, sqlx::Error>,
58    >
59    where
60        E: 'q + sqlx::Execute<'q, Self::Database>,
61        'c: 'e,
62    {
63        let sql = query.sql();
64        let attrs = &self.attributes;
65        let span = crate::instrument!("sqlx.execute_many", sql, attrs);
66        let stream = self.inner.execute_many(query);
67        use futures::StreamExt;
68        Box::pin(
69            stream
70                .inspect(move |_| {
71                    let _enter = span.enter();
72                })
73                .inspect_err(crate::span::record_error),
74        )
75    }
76
77    fn fetch<'e, 'q: 'e, E>(
78        self,
79        query: E,
80    ) -> futures::stream::BoxStream<'e, Result<<Self::Database as sqlx::Database>::Row, sqlx::Error>>
81    where
82        E: 'q + sqlx::Execute<'q, Self::Database>,
83        'c: 'e,
84    {
85        let sql = query.sql();
86        let attrs = &self.attributes;
87        let span = crate::instrument!("sqlx.fetch", sql, attrs);
88        let stream = self.inner.fetch(query);
89        use futures::StreamExt;
90        Box::pin(
91            stream
92                .inspect(move |_| {
93                    let _enter = span.enter();
94                })
95                .inspect_err(crate::span::record_error),
96        )
97    }
98
99    fn fetch_all<'e, 'q: 'e, E>(
100        self,
101        query: E,
102    ) -> futures::future::BoxFuture<
103        'e,
104        Result<Vec<<Self::Database as sqlx::Database>::Row>, sqlx::Error>,
105    >
106    where
107        E: 'q + sqlx::Execute<'q, Self::Database>,
108        'c: 'e,
109    {
110        let sql = query.sql();
111        let attrs = &self.attributes;
112        let span = crate::instrument!("sqlx.fetch_all", sql, attrs);
113        let fut = self.inner.fetch_all(query);
114        Box::pin(
115            async move {
116                fut.await
117                    .inspect(|res| {
118                        let span = tracing::Span::current();
119                        span.record("db.response.returned_rows", res.len());
120                    })
121                    .inspect_err(crate::span::record_error)
122            }
123            .instrument(span),
124        )
125    }
126
127    fn fetch_many<'e, 'q: 'e, E>(
128        self,
129        query: E,
130    ) -> futures::stream::BoxStream<
131        'e,
132        Result<
133            sqlx::Either<
134                <Self::Database as sqlx::Database>::QueryResult,
135                <Self::Database as sqlx::Database>::Row,
136            >,
137            sqlx::Error,
138        >,
139    >
140    where
141        E: 'q + sqlx::Execute<'q, Self::Database>,
142        'c: 'e,
143    {
144        let sql = query.sql();
145        let attrs = &self.attributes;
146        let span = crate::instrument!("sqlx.fetch_all", sql, attrs);
147        let stream = self.inner.fetch_many(query);
148        Box::pin(
149            stream
150                .inspect(move |_| {
151                    let _enter = span.enter();
152                })
153                .inspect_err(crate::span::record_error),
154        )
155    }
156
157    fn fetch_one<'e, 'q: 'e, E>(
158        self,
159        query: E,
160    ) -> futures::future::BoxFuture<'e, Result<<Self::Database as sqlx::Database>::Row, sqlx::Error>>
161    where
162        E: 'q + sqlx::Execute<'q, Self::Database>,
163        'c: 'e,
164    {
165        let sql = query.sql();
166        let attrs = &self.attributes;
167        let span = crate::instrument!("sqlx.fetch_one", sql, attrs);
168        let fut = self.inner.fetch_one(query);
169        Box::pin(
170            async move {
171                fut.await
172                    .inspect(|_| {
173                        tracing::Span::current().record("db.response.returned_rows", 1);
174                    })
175                    .inspect_err(crate::span::record_error)
176            }
177            .instrument(span),
178        )
179    }
180
181    fn fetch_optional<'e, 'q: 'e, E>(
182        self,
183        query: E,
184    ) -> futures::future::BoxFuture<
185        'e,
186        Result<Option<<Self::Database as sqlx::Database>::Row>, sqlx::Error>,
187    >
188    where
189        E: 'q + sqlx::Execute<'q, Self::Database>,
190        'c: 'e,
191    {
192        let sql = query.sql();
193        let attrs = &self.attributes;
194        let span = crate::instrument!("sqlx.fetch_optional", sql, attrs);
195        let fut = self.inner.fetch_optional(query);
196        Box::pin(
197            async move {
198                fut.await
199                    .inspect(|res| {
200                        tracing::Span::current().record(
201                            "db.response.returned_rows",
202                            if res.is_some() { 1 } else { 0 },
203                        );
204                    })
205                    .inspect_err(crate::span::record_error)
206            }
207            .instrument(span),
208        )
209    }
210
211    fn prepare<'e, 'q: 'e>(
212        self,
213        query: &'q str,
214    ) -> futures::future::BoxFuture<
215        'e,
216        Result<<Self::Database as sqlx::Database>::Statement<'q>, sqlx::Error>,
217    >
218    where
219        'c: 'e,
220    {
221        let attrs = &self.attributes;
222        let span = crate::instrument!("sqlx.prepare", query, attrs);
223        let fut = self.inner.prepare(query);
224        Box::pin(async move { fut.await.inspect_err(crate::span::record_error) }.instrument(span))
225    }
226
227    fn prepare_with<'e, 'q: 'e>(
228        self,
229        sql: &'q str,
230        parameters: &'e [<Self::Database as sqlx::Database>::TypeInfo],
231    ) -> futures::future::BoxFuture<
232        'e,
233        Result<<Self::Database as sqlx::Database>::Statement<'q>, sqlx::Error>,
234    >
235    where
236        'c: 'e,
237    {
238        let attrs = &self.attributes;
239        let span = crate::instrument!("sqlx.prepare_with", sql, attrs);
240        let fut = self.inner.prepare_with(sql, parameters);
241        Box::pin(async move { fut.await.inspect_err(crate::span::record_error) }.instrument(span))
242    }
243}
244
245impl<'c, DB> sqlx::Executor<'c> for &'c mut crate::Connection<'c, DB>
246where
247    DB: crate::prelude::Database + sqlx::Database,
248    for<'a> &'a mut DB::Connection: sqlx::Executor<'a, Database = DB>,
249{
250    type Database = DB;
251
252    #[doc(hidden)]
253    fn describe<'e, 'q: 'e>(
254        self,
255        sql: &'q str,
256    ) -> futures::future::BoxFuture<'e, Result<sqlx::Describe<Self::Database>, sqlx::Error>>
257    where
258        'c: 'e,
259    {
260        let attrs = &self.attributes;
261        let span = crate::instrument!("sqlx.describe", sql, attrs);
262        let fut = self.inner.describe(sql);
263        Box::pin(async move { fut.await.inspect_err(crate::span::record_error) }.instrument(span))
264    }
265
266    fn execute<'e, 'q: 'e, E>(
267        self,
268        query: E,
269    ) -> futures::future::BoxFuture<
270        'e,
271        Result<<Self::Database as sqlx::Database>::QueryResult, sqlx::Error>,
272    >
273    where
274        E: 'q + sqlx::Execute<'q, Self::Database>,
275        'c: 'e,
276    {
277        let sql = query.sql();
278        let attrs = &self.attributes;
279        let span = crate::instrument!("sqlx.execute", sql, attrs);
280        let fut = self.inner.execute(query);
281        Box::pin(async move { fut.await.inspect_err(crate::span::record_error) }.instrument(span))
282    }
283
284    fn execute_many<'e, 'q: 'e, E>(
285        self,
286        query: E,
287    ) -> futures::stream::BoxStream<
288        'e,
289        Result<<Self::Database as sqlx::Database>::QueryResult, sqlx::Error>,
290    >
291    where
292        E: 'q + sqlx::Execute<'q, Self::Database>,
293        'c: 'e,
294    {
295        let sql = query.sql();
296        let attrs = &self.attributes;
297        let span = crate::instrument!("sqlx.execute_many", sql, attrs);
298        let stream = self.inner.execute_many(query);
299        use futures::StreamExt;
300        Box::pin(
301            stream
302                .inspect(move |_| {
303                    let _enter = span.enter();
304                })
305                .inspect_err(crate::span::record_error),
306        )
307    }
308
309    fn fetch<'e, 'q: 'e, E>(
310        self,
311        query: E,
312    ) -> futures::stream::BoxStream<'e, Result<<Self::Database as sqlx::Database>::Row, sqlx::Error>>
313    where
314        E: 'q + sqlx::Execute<'q, Self::Database>,
315        'c: 'e,
316    {
317        let sql = query.sql();
318        let attrs = &self.attributes;
319        let span = crate::instrument!("sqlx.fetch", sql, attrs);
320        let stream = self.inner.fetch(query);
321        use futures::StreamExt;
322        Box::pin(
323            stream
324                .inspect(move |_| {
325                    let _enter = span.enter();
326                })
327                .inspect_err(crate::span::record_error),
328        )
329    }
330
331    fn fetch_all<'e, 'q: 'e, E>(
332        self,
333        query: E,
334    ) -> futures::future::BoxFuture<
335        'e,
336        Result<Vec<<Self::Database as sqlx::Database>::Row>, sqlx::Error>,
337    >
338    where
339        E: 'q + sqlx::Execute<'q, Self::Database>,
340        'c: 'e,
341    {
342        let sql = query.sql();
343        let attrs = &self.attributes;
344        let span = crate::instrument!("sqlx.fetch_all", sql, attrs);
345        let fut = self.inner.fetch_all(query);
346        Box::pin(
347            async move {
348                fut.await
349                    .inspect(|res| {
350                        let span = tracing::Span::current();
351                        span.record("db.response.returned_rows", res.len());
352                    })
353                    .inspect_err(crate::span::record_error)
354            }
355            .instrument(span),
356        )
357    }
358
359    fn fetch_many<'e, 'q: 'e, E>(
360        self,
361        query: E,
362    ) -> futures::stream::BoxStream<
363        'e,
364        Result<
365            sqlx::Either<
366                <Self::Database as sqlx::Database>::QueryResult,
367                <Self::Database as sqlx::Database>::Row,
368            >,
369            sqlx::Error,
370        >,
371    >
372    where
373        E: 'q + sqlx::Execute<'q, Self::Database>,
374        'c: 'e,
375    {
376        let sql = query.sql();
377        let attrs = &self.attributes;
378        let span = crate::instrument!("sqlx.fetch_all", sql, attrs);
379        let stream = self.inner.fetch_many(query);
380        Box::pin(
381            stream
382                .inspect(move |_| {
383                    let _enter = span.enter();
384                })
385                .inspect_err(crate::span::record_error),
386        )
387    }
388
389    fn fetch_one<'e, 'q: 'e, E>(
390        self,
391        query: E,
392    ) -> futures::future::BoxFuture<'e, Result<<Self::Database as sqlx::Database>::Row, sqlx::Error>>
393    where
394        E: 'q + sqlx::Execute<'q, Self::Database>,
395        'c: 'e,
396    {
397        let sql = query.sql();
398        let attrs = &self.attributes;
399        let span = crate::instrument!("sqlx.fetch_one", sql, attrs);
400        let fut = self.inner.fetch_one(query);
401        Box::pin(
402            async move {
403                fut.await
404                    .inspect(crate::span::record_one)
405                    .inspect_err(crate::span::record_error)
406            }
407            .instrument(span),
408        )
409    }
410
411    fn fetch_optional<'e, 'q: 'e, E>(
412        self,
413        query: E,
414    ) -> futures::future::BoxFuture<
415        'e,
416        Result<Option<<Self::Database as sqlx::Database>::Row>, sqlx::Error>,
417    >
418    where
419        E: 'q + sqlx::Execute<'q, Self::Database>,
420        'c: 'e,
421    {
422        let sql = query.sql();
423        let attrs = &self.attributes;
424        let span = crate::instrument!("sqlx.fetch_optional", sql, attrs);
425        let fut = self.inner.fetch_optional(query);
426        Box::pin(
427            async move {
428                fut.await
429                    .inspect(crate::span::record_optional)
430                    .inspect_err(crate::span::record_error)
431            }
432            .instrument(span),
433        )
434    }
435
436    fn prepare<'e, 'q: 'e>(
437        self,
438        query: &'q str,
439    ) -> futures::future::BoxFuture<
440        'e,
441        Result<<Self::Database as sqlx::Database>::Statement<'q>, sqlx::Error>,
442    >
443    where
444        'c: 'e,
445    {
446        let attrs = &self.attributes;
447        let span = crate::instrument!("sqlx.prepare", query, attrs);
448        let fut = self.inner.prepare(query);
449        Box::pin(async move { fut.await.inspect_err(crate::span::record_error) }.instrument(span))
450    }
451
452    fn prepare_with<'e, 'q: 'e>(
453        self,
454        sql: &'q str,
455        parameters: &'e [<Self::Database as sqlx::Database>::TypeInfo],
456    ) -> futures::future::BoxFuture<
457        'e,
458        Result<<Self::Database as sqlx::Database>::Statement<'q>, sqlx::Error>,
459    >
460    where
461        'c: 'e,
462    {
463        let attrs = &self.attributes;
464        let span = crate::instrument!("sqlx.prepare_with", sql, attrs);
465        let fut = self.inner.prepare_with(sql, parameters);
466        Box::pin(async move { fut.await.inspect_err(crate::span::record_error) }.instrument(span))
467    }
468}