1use futures::{StreamExt, TryStreamExt};
2use tracing::Instrument;
3
4fn record_error(err: &sqlx::Error) {
5 let span = tracing::Span::current();
6 span.record("otel.status_code", "error");
7 match err {
8 sqlx::Error::ColumnIndexOutOfBounds { .. }
9 | sqlx::Error::ColumnDecode { .. }
10 | sqlx::Error::ColumnNotFound(_)
11 | sqlx::Error::Decode { .. }
12 | sqlx::Error::Encode { .. }
13 | sqlx::Error::RowNotFound
14 | sqlx::Error::TypeNotFound { .. } => {
15 span.record("error.kind", "client");
16 }
17 _ => {
18 span.record("error.kind", "server");
19 }
20 }
21 span.record("error.message", err.to_string());
22 span.record("error.stacktrace", format!("{err:?}"));
23}
24
25impl<'p, DB> sqlx::Executor<'p> for &'_ crate::Pool<DB>
26where
27 DB: sqlx::Database,
28 DB: crate::prelude::Database,
29 for<'c> &'c mut DB::Connection: sqlx::Executor<'c, Database = DB>,
30{
31 type Database = DB;
32
33 #[doc(hidden)]
34 fn describe<'e, 'q: 'e>(
35 self,
36 sql: &'q str,
37 ) -> futures::future::BoxFuture<'e, Result<sqlx::Describe<Self::Database>, sqlx::Error>> {
38 let span = crate::query_span!("sqlx.describe", sql);
39 let fut = self.0.describe(sql).instrument(span);
40 Box::pin(async move { fut.await.inspect_err(record_error) })
41 }
42
43 fn execute<'e, 'q: 'e, E>(
44 self,
45 query: E,
46 ) -> futures::future::BoxFuture<
47 'e,
48 Result<<Self::Database as sqlx::Database>::QueryResult, sqlx::Error>,
49 >
50 where
51 E: 'q + sqlx::Execute<'q, Self::Database>,
52 {
53 let sql = query.sql();
54 let span = crate::query_span!("sqlx.execute", sql);
55 let fut = self.0.execute(query).instrument(span);
56 Box::pin(async move { fut.await.inspect_err(record_error) })
57 }
58
59 fn execute_many<'e, 'q: 'e, E>(
60 self,
61 query: E,
62 ) -> futures::stream::BoxStream<
63 'e,
64 Result<<Self::Database as sqlx::Database>::QueryResult, sqlx::Error>,
65 >
66 where
67 E: 'q + sqlx::Execute<'q, Self::Database>,
68 {
69 let sql = query.sql();
70 let span = crate::query_span!("sqlx.execute_many", sql);
71 let stream = self.0.execute_many(query);
72 use futures::StreamExt;
73 Box::pin(
74 stream
75 .inspect(move |_| {
76 let _enter = span.enter();
77 })
78 .inspect_err(record_error),
79 )
80 }
81
82 fn fetch<'e, 'q: 'e, E>(
83 self,
84 query: E,
85 ) -> futures::stream::BoxStream<'e, Result<<Self::Database as sqlx::Database>::Row, sqlx::Error>>
86 where
87 E: 'q + sqlx::Execute<'q, Self::Database>,
88 {
89 let sql = query.sql();
90 let span = crate::query_span!("sqlx.fetch", sql);
91 let stream = self.0.fetch(query);
92 use futures::StreamExt;
93 Box::pin(
94 stream
95 .inspect(move |_| {
96 let _enter = span.enter();
97 })
98 .inspect_err(record_error),
99 )
100 }
101
102 fn fetch_all<'e, 'q: 'e, E>(
103 self,
104 query: E,
105 ) -> futures::future::BoxFuture<
106 'e,
107 Result<Vec<<Self::Database as sqlx::Database>::Row>, sqlx::Error>,
108 >
109 where
110 E: 'q + sqlx::Execute<'q, Self::Database>,
111 {
112 let sql = query.sql();
113 let span = crate::query_span!("sqlx.fetch_all", sql);
114 let fut = self.0.fetch_all(query).instrument(span);
115 Box::pin(async move {
116 fut.await
117 .inspect(|res| {
118 let span = tracing::Span::current();
119 span.record("db.response.returned_rows", res.len());
120 })
121 .inspect_err(record_error)
122 })
123 }
124
125 fn fetch_many<'e, 'q: 'e, E>(
126 self,
127 query: E,
128 ) -> futures::stream::BoxStream<
129 'e,
130 Result<
131 sqlx::Either<
132 <Self::Database as sqlx::Database>::QueryResult,
133 <Self::Database as sqlx::Database>::Row,
134 >,
135 sqlx::Error,
136 >,
137 >
138 where
139 E: 'q + sqlx::Execute<'q, Self::Database>,
140 {
141 let sql = query.sql();
142 let span = crate::query_span!("sqlx.fetch_all", sql);
143 let stream = self.0.fetch_many(query);
144 Box::pin(
145 stream
146 .inspect(move |_| {
147 let _enter = span.enter();
148 })
149 .inspect_err(record_error),
150 )
151 }
152
153 fn fetch_one<'e, 'q: 'e, E>(
154 self,
155 query: E,
156 ) -> futures::future::BoxFuture<'e, Result<<Self::Database as sqlx::Database>::Row, sqlx::Error>>
157 where
158 E: 'q + sqlx::Execute<'q, Self::Database>,
159 {
160 let sql = query.sql();
161 let span = crate::query_span!("sqlx.fetch_one", sql);
162 let fut = self.0.fetch_one(query).instrument(span);
163 Box::pin(async move {
164 fut.await
165 .inspect(|_| {
166 tracing::Span::current().record("db.response.returned_rows", 1);
167 })
168 .inspect_err(record_error)
169 })
170 }
171
172 fn fetch_optional<'e, 'q: 'e, E>(
173 self,
174 query: E,
175 ) -> futures::future::BoxFuture<
176 'e,
177 Result<Option<<Self::Database as sqlx::Database>::Row>, sqlx::Error>,
178 >
179 where
180 E: 'q + sqlx::Execute<'q, Self::Database>,
181 {
182 let sql = query.sql();
183 let span = crate::query_span!("sqlx.fetch_optional", sql);
184 let fut = self.0.fetch_optional(query).instrument(span);
185 Box::pin(async move {
186 fut.await
187 .inspect(|res| {
188 tracing::Span::current().record(
189 "db.response.returned_rows",
190 if res.is_some() { 1 } else { 0 },
191 );
192 })
193 .inspect_err(record_error)
194 })
195 }
196
197 fn prepare<'e, 'q: 'e>(
198 self,
199 query: &'q str,
200 ) -> futures::future::BoxFuture<
201 'e,
202 Result<<Self::Database as sqlx::Database>::Statement<'q>, sqlx::Error>,
203 > {
204 let span = crate::query_span!("sqlx.prepare", query);
205 let fut = self.0.prepare(query).instrument(span);
206 Box::pin(async move { fut.await.inspect_err(record_error) })
207 }
208
209 fn prepare_with<'e, 'q: 'e>(
210 self,
211 sql: &'q str,
212 parameters: &'e [<Self::Database as sqlx::Database>::TypeInfo],
213 ) -> futures::future::BoxFuture<
214 'e,
215 Result<<Self::Database as sqlx::Database>::Statement<'q>, sqlx::Error>,
216 > {
217 let span = crate::query_span!("sqlx.prepare_with", sql);
218 let fut = self.0.prepare_with(sql, parameters).instrument(span);
219 Box::pin(async move { fut.await.inspect_err(record_error) })
220 }
221}