sqlx_tracing/
pool.rs

1use futures::{StreamExt, TryStreamExt};
2use tracing::Instrument;
3
4impl<'p, DB> sqlx::Executor<'p> for &'_ crate::Pool<DB>
5where
6    DB: sqlx::Database + crate::prelude::Database,
7    for<'c> &'c mut DB::Connection: sqlx::Executor<'c, Database = DB>,
8{
9    type Database = DB;
10
11    #[doc(hidden)]
12    fn describe<'e, 'q: 'e>(
13        self,
14        sql: &'q str,
15    ) -> futures::future::BoxFuture<'e, Result<sqlx::Describe<Self::Database>, sqlx::Error>> {
16        let attrs = &self.attributes;
17        let span = crate::instrument!("sqlx.describe", sql, attrs);
18        let fut = self.inner.describe(sql);
19        Box::pin(async move { fut.await.inspect_err(crate::span::record_error) }.instrument(span))
20    }
21
22    fn execute<'e, 'q: 'e, E>(
23        self,
24        query: E,
25    ) -> futures::future::BoxFuture<
26        'e,
27        Result<<Self::Database as sqlx::Database>::QueryResult, sqlx::Error>,
28    >
29    where
30        E: 'q + sqlx::Execute<'q, Self::Database>,
31    {
32        let sql = query.sql();
33        let attrs = &self.attributes;
34        let span = crate::instrument!("sqlx.execute", sql, attrs);
35        let fut = self.inner.execute(query);
36        Box::pin(async move { fut.await.inspect_err(crate::span::record_error) }.instrument(span))
37    }
38
39    fn execute_many<'e, 'q: 'e, E>(
40        self,
41        query: E,
42    ) -> futures::stream::BoxStream<
43        'e,
44        Result<<Self::Database as sqlx::Database>::QueryResult, sqlx::Error>,
45    >
46    where
47        E: 'q + sqlx::Execute<'q, Self::Database>,
48    {
49        let sql = query.sql();
50        let attrs = &self.attributes;
51        let span = crate::instrument!("sqlx.execute_many", sql, attrs);
52        let stream = self.inner.execute_many(query);
53        use futures::StreamExt;
54        Box::pin(
55            stream
56                .inspect(move |_| {
57                    let _enter = span.enter();
58                })
59                .inspect_err(crate::span::record_error),
60        )
61    }
62
63    fn fetch<'e, 'q: 'e, E>(
64        self,
65        query: E,
66    ) -> futures::stream::BoxStream<'e, Result<<Self::Database as sqlx::Database>::Row, sqlx::Error>>
67    where
68        E: 'q + sqlx::Execute<'q, Self::Database>,
69    {
70        let sql = query.sql();
71        let attrs = &self.attributes;
72        let span = crate::instrument!("sqlx.fetch", sql, attrs);
73        let stream = self.inner.fetch(query);
74        use futures::StreamExt;
75        Box::pin(
76            stream
77                .inspect(move |_| {
78                    let _enter = span.enter();
79                })
80                .inspect_err(crate::span::record_error),
81        )
82    }
83
84    fn fetch_all<'e, 'q: 'e, E>(
85        self,
86        query: E,
87    ) -> futures::future::BoxFuture<
88        'e,
89        Result<Vec<<Self::Database as sqlx::Database>::Row>, sqlx::Error>,
90    >
91    where
92        E: 'q + sqlx::Execute<'q, Self::Database>,
93    {
94        let sql = query.sql();
95        let attrs = &self.attributes;
96        let span = crate::instrument!("sqlx.fetch_all", sql, attrs);
97        let fut = self.inner.fetch_all(query);
98        Box::pin(
99            async move {
100                fut.await
101                    .inspect(|res| {
102                        let span = tracing::Span::current();
103                        span.record("db.response.returned_rows", res.len());
104                    })
105                    .inspect_err(crate::span::record_error)
106            }
107            .instrument(span),
108        )
109    }
110
111    fn fetch_many<'e, 'q: 'e, E>(
112        self,
113        query: E,
114    ) -> futures::stream::BoxStream<
115        'e,
116        Result<
117            sqlx::Either<
118                <Self::Database as sqlx::Database>::QueryResult,
119                <Self::Database as sqlx::Database>::Row,
120            >,
121            sqlx::Error,
122        >,
123    >
124    where
125        E: 'q + sqlx::Execute<'q, Self::Database>,
126    {
127        let sql = query.sql();
128        let attrs = &self.attributes;
129        let span = crate::instrument!("sqlx.fetch_all", sql, attrs);
130        let stream = self.inner.fetch_many(query);
131        Box::pin(
132            stream
133                .inspect(move |_| {
134                    let _enter = span.enter();
135                })
136                .inspect_err(crate::span::record_error),
137        )
138    }
139
140    fn fetch_one<'e, 'q: 'e, E>(
141        self,
142        query: E,
143    ) -> futures::future::BoxFuture<'e, Result<<Self::Database as sqlx::Database>::Row, sqlx::Error>>
144    where
145        E: 'q + sqlx::Execute<'q, Self::Database>,
146    {
147        let sql = query.sql();
148        let attrs = &self.attributes;
149        let span = crate::instrument!("sqlx.fetch_one", sql, attrs);
150        let fut = self.inner.fetch_one(query);
151        Box::pin(
152            async move {
153                fut.await
154                    .inspect(crate::span::record_one)
155                    .inspect_err(crate::span::record_error)
156            }
157            .instrument(span),
158        )
159    }
160
161    fn fetch_optional<'e, 'q: 'e, E>(
162        self,
163        query: E,
164    ) -> futures::future::BoxFuture<
165        'e,
166        Result<Option<<Self::Database as sqlx::Database>::Row>, sqlx::Error>,
167    >
168    where
169        E: 'q + sqlx::Execute<'q, Self::Database>,
170    {
171        let sql = query.sql();
172        let attrs = &self.attributes;
173        let span = crate::instrument!("sqlx.fetch_optional", sql, attrs);
174        let fut = self.inner.fetch_optional(query);
175        Box::pin(
176            async move {
177                fut.await
178                    .inspect(crate::span::record_optional)
179                    .inspect_err(crate::span::record_error)
180            }
181            .instrument(span),
182        )
183    }
184
185    fn prepare<'e, 'q: 'e>(
186        self,
187        query: &'q str,
188    ) -> futures::future::BoxFuture<
189        'e,
190        Result<<Self::Database as sqlx::Database>::Statement<'q>, sqlx::Error>,
191    > {
192        let attrs = &self.attributes;
193        let span = crate::instrument!("sqlx.prepare", query, attrs);
194        let fut = self.inner.prepare(query);
195        Box::pin(async move { fut.await.inspect_err(crate::span::record_error) }.instrument(span))
196    }
197
198    fn prepare_with<'e, 'q: 'e>(
199        self,
200        sql: &'q str,
201        parameters: &'e [<Self::Database as sqlx::Database>::TypeInfo],
202    ) -> futures::future::BoxFuture<
203        'e,
204        Result<<Self::Database as sqlx::Database>::Statement<'q>, sqlx::Error>,
205    > {
206        let attrs = &self.attributes;
207        let span = crate::instrument!("sqlx.prepare_with", sql, attrs);
208        let fut = self.inner.prepare_with(sql, parameters);
209        Box::pin(async move { fut.await.inspect_err(crate::span::record_error) }.instrument(span))
210    }
211}