sqlx_tracing/
pool.rs

1use futures::{StreamExt, TryStreamExt};
2use tracing::Instrument;
3
4fn record_error(err: &sqlx::Error) {
5    let span = tracing::Span::current();
6    span.record("otel.status_code", "error");
7    match err {
8        sqlx::Error::ColumnIndexOutOfBounds { .. }
9        | sqlx::Error::ColumnDecode { .. }
10        | sqlx::Error::ColumnNotFound(_)
11        | sqlx::Error::Decode { .. }
12        | sqlx::Error::Encode { .. }
13        | sqlx::Error::RowNotFound
14        | sqlx::Error::TypeNotFound { .. } => {
15            span.record("error.kind", "client");
16        }
17        _ => {
18            span.record("error.kind", "server");
19        }
20    }
21    span.record("error.message", err.to_string());
22    span.record("error.stacktrace", format!("{err:?}"));
23}
24
25impl<'p, DB> sqlx::Executor<'p> for &'_ crate::Pool<DB>
26where
27    DB: sqlx::Database,
28    DB: crate::prelude::Database,
29    for<'c> &'c mut DB::Connection: sqlx::Executor<'c, Database = DB>,
30{
31    type Database = DB;
32
33    #[doc(hidden)]
34    fn describe<'e, 'q: 'e>(
35        self,
36        sql: &'q str,
37    ) -> futures::future::BoxFuture<'e, Result<sqlx::Describe<Self::Database>, sqlx::Error>> {
38        let span = crate::query_span!("sqlx.describe", sql);
39        let fut = self.0.describe(sql).instrument(span);
40        Box::pin(async move { fut.await.inspect_err(record_error) })
41    }
42
43    fn execute<'e, 'q: 'e, E>(
44        self,
45        query: E,
46    ) -> futures::future::BoxFuture<
47        'e,
48        Result<<Self::Database as sqlx::Database>::QueryResult, sqlx::Error>,
49    >
50    where
51        E: 'q + sqlx::Execute<'q, Self::Database>,
52    {
53        let sql = query.sql();
54        let span = crate::query_span!("sqlx.execute", sql);
55        let fut = self.0.execute(query).instrument(span);
56        Box::pin(async move { fut.await.inspect_err(record_error) })
57    }
58
59    fn execute_many<'e, 'q: 'e, E>(
60        self,
61        query: E,
62    ) -> futures::stream::BoxStream<
63        'e,
64        Result<<Self::Database as sqlx::Database>::QueryResult, sqlx::Error>,
65    >
66    where
67        E: 'q + sqlx::Execute<'q, Self::Database>,
68    {
69        let sql = query.sql();
70        let span = crate::query_span!("sqlx.execute_many", sql);
71        let stream = self.0.execute_many(query);
72        use futures::StreamExt;
73        Box::pin(
74            stream
75                .inspect(move |_| {
76                    let _enter = span.enter();
77                })
78                .inspect_err(record_error),
79        )
80    }
81
82    fn fetch<'e, 'q: 'e, E>(
83        self,
84        query: E,
85    ) -> futures::stream::BoxStream<'e, Result<<Self::Database as sqlx::Database>::Row, sqlx::Error>>
86    where
87        E: 'q + sqlx::Execute<'q, Self::Database>,
88    {
89        let sql = query.sql();
90        let span = crate::query_span!("sqlx.fetch", sql);
91        let stream = self.0.fetch(query);
92        use futures::StreamExt;
93        Box::pin(
94            stream
95                .inspect(move |_| {
96                    let _enter = span.enter();
97                })
98                .inspect_err(record_error),
99        )
100    }
101
102    fn fetch_all<'e, 'q: 'e, E>(
103        self,
104        query: E,
105    ) -> futures::future::BoxFuture<
106        'e,
107        Result<Vec<<Self::Database as sqlx::Database>::Row>, sqlx::Error>,
108    >
109    where
110        E: 'q + sqlx::Execute<'q, Self::Database>,
111    {
112        let sql = query.sql();
113        let span = crate::query_span!("sqlx.fetch_all", sql);
114        let fut = self.0.fetch_all(query).instrument(span);
115        Box::pin(async move {
116            fut.await
117                .inspect(|res| {
118                    let span = tracing::Span::current();
119                    span.record("db.response.returned_rows", res.len());
120                })
121                .inspect_err(record_error)
122        })
123    }
124
125    fn fetch_many<'e, 'q: 'e, E>(
126        self,
127        query: E,
128    ) -> futures::stream::BoxStream<
129        'e,
130        Result<
131            sqlx::Either<
132                <Self::Database as sqlx::Database>::QueryResult,
133                <Self::Database as sqlx::Database>::Row,
134            >,
135            sqlx::Error,
136        >,
137    >
138    where
139        E: 'q + sqlx::Execute<'q, Self::Database>,
140    {
141        let sql = query.sql();
142        let span = crate::query_span!("sqlx.fetch_all", sql);
143        let stream = self.0.fetch_many(query);
144        Box::pin(
145            stream
146                .inspect(move |_| {
147                    let _enter = span.enter();
148                })
149                .inspect_err(record_error),
150        )
151    }
152
153    fn fetch_one<'e, 'q: 'e, E>(
154        self,
155        query: E,
156    ) -> futures::future::BoxFuture<'e, Result<<Self::Database as sqlx::Database>::Row, sqlx::Error>>
157    where
158        E: 'q + sqlx::Execute<'q, Self::Database>,
159    {
160        let sql = query.sql();
161        let span = crate::query_span!("sqlx.fetch_one", sql);
162        let fut = self.0.fetch_one(query).instrument(span);
163        Box::pin(async move {
164            fut.await
165                .inspect(|_| {
166                    tracing::Span::current().record("db.response.returned_rows", 1);
167                })
168                .inspect_err(record_error)
169        })
170    }
171
172    fn fetch_optional<'e, 'q: 'e, E>(
173        self,
174        query: E,
175    ) -> futures::future::BoxFuture<
176        'e,
177        Result<Option<<Self::Database as sqlx::Database>::Row>, sqlx::Error>,
178    >
179    where
180        E: 'q + sqlx::Execute<'q, Self::Database>,
181    {
182        let sql = query.sql();
183        let span = crate::query_span!("sqlx.fetch_optional", sql);
184        let fut = self.0.fetch_optional(query).instrument(span);
185        Box::pin(async move {
186            fut.await
187                .inspect(|res| {
188                    tracing::Span::current().record(
189                        "db.response.returned_rows",
190                        if res.is_some() { 1 } else { 0 },
191                    );
192                })
193                .inspect_err(record_error)
194        })
195    }
196
197    fn prepare<'e, 'q: 'e>(
198        self,
199        query: &'q str,
200    ) -> futures::future::BoxFuture<
201        'e,
202        Result<<Self::Database as sqlx::Database>::Statement<'q>, sqlx::Error>,
203    > {
204        let span = crate::query_span!("sqlx.prepare", query);
205        let fut = self.0.prepare(query).instrument(span);
206        Box::pin(async move { fut.await.inspect_err(record_error) })
207    }
208
209    fn prepare_with<'e, 'q: 'e>(
210        self,
211        sql: &'q str,
212        parameters: &'e [<Self::Database as sqlx::Database>::TypeInfo],
213    ) -> futures::future::BoxFuture<
214        'e,
215        Result<<Self::Database as sqlx::Database>::Statement<'q>, sqlx::Error>,
216    > {
217        let span = crate::query_span!("sqlx.prepare_with", sql);
218        let fut = self.0.prepare_with(sql, parameters).instrument(span);
219        Box::pin(async move { fut.await.inspect_err(record_error) })
220    }
221}