1#[cfg(feature = "mysql")]
2use sqlx::MySqlPool;
3#[cfg(feature = "postgres")]
4use sqlx::PgPool;
5use sqlx::{Database, Execute, Executor};
6use tracing::Instrument;
7
8use crate::sqlx_otel_span_macro::query_span_with_metadata;
9
10#[derive(Debug, Clone)]
11pub struct ConnectionInfo {
12 pub host: String,
13 pub port: u16,
14 pub database: String,
15 pub system: &'static str,
16}
17
18pub trait InstrumentedPool: Sized {
19 type Database: Database;
20
21 fn connection_info(&self) -> ConnectionInfo;
22
23 fn as_executor(&self) -> &Self;
24}
25
26#[cfg(feature = "postgres")]
27impl InstrumentedPool for PgPool {
28 type Database = sqlx::Postgres;
29
30 fn connection_info(&self) -> ConnectionInfo {
31 let options = self.connect_options();
32 ConnectionInfo {
33 host: options.get_host().to_string(),
34 port: options.get_port(),
35 database: options.get_database().unwrap_or("postgres").to_string(),
36 system: "postgresql",
37 }
38 }
39
40 fn as_executor(&self) -> &Self {
41 self
42 }
43}
44
45#[cfg(feature = "mysql")]
46impl InstrumentedPool for MySqlPool {
47 type Database = sqlx::MySql;
48
49 fn connection_info(&self) -> ConnectionInfo {
50 let options = self.connect_options();
51 ConnectionInfo {
52 host: options.get_host().to_string(),
53 port: options.get_port(),
54 database: options.get_database().unwrap_or("mysql").to_string(),
55 system: "mysql",
56 }
57 }
58
59 fn as_executor(&self) -> &Self {
60 self
61 }
62}
63
64pub trait InstrumentedFetch<'q, DB: Database>: Sized + Send {
65 type Output: Send;
66
67 fn fetch_one_instrumented<P>(
68 self,
69 pool: &P,
70 sql: impl AsRef<str> + Send,
71 ) -> impl Future<Output = Result<Self::Output, sqlx::Error>> + Send
72 where
73 P: InstrumentedPool<Database = DB> + Send + Sync,
74 for<'c> &'c P: Executor<'c, Database = DB>;
75
76 fn fetch_optional_instrumented<P>(
77 self,
78 pool: &P,
79 sql: impl AsRef<str> + Send,
80 ) -> impl Future<Output = Result<Option<Self::Output>, sqlx::Error>> + Send
81 where
82 P: InstrumentedPool<Database = DB> + Send + Sync,
83 for<'c> &'c P: Executor<'c, Database = DB>;
84
85 fn fetch_all_instrumented<P>(
86 self,
87 pool: &P,
88 sql: impl AsRef<str> + Send,
89 ) -> impl Future<Output = Result<Vec<Self::Output>, sqlx::Error>> + Send
90 where
91 P: InstrumentedPool<Database = DB> + Send + Sync,
92 for<'c> &'c P: Executor<'c, Database = DB>;
93}
94
95pub trait InstrumentedExecute<'q, DB: Database>: Sized + Execute<'q, DB> + Send {
96 fn execute_instrumented<P>(
97 self,
98 pool: &P,
99 sql: impl AsRef<str> + Send,
100 ) -> impl Future<Output = Result<DB::QueryResult, sqlx::Error>> + Send
101 where
102 P: InstrumentedPool<Database = DB> + Send + Sync,
103 for<'c> &'c P: Executor<'c, Database = DB>;
104}
105
106impl<'q, DB, A> InstrumentedFetch<'q, DB> for sqlx::query::Query<'q, DB, A>
107where
108 DB: Database,
109 A: 'q + Send + sqlx::IntoArguments<'q, DB>,
110{
111 type Output = DB::Row;
112
113 async fn fetch_one_instrumented<P>(
114 self,
115 pool: &P,
116 sql: impl AsRef<str> + Send,
117 ) -> Result<Self::Output, sqlx::Error>
118 where
119 P: InstrumentedPool<Database = DB> + Send + Sync,
120 for<'c> &'c P: Executor<'c, Database = DB>,
121 {
122 let span = query_span_with_metadata(sql.as_ref(), pool);
123 self.fetch_one(pool.as_executor()).instrument(span).await
124 }
125
126 async fn fetch_optional_instrumented<P>(
127 self,
128 pool: &P,
129 sql: impl AsRef<str> + Send,
130 ) -> Result<Option<Self::Output>, sqlx::Error>
131 where
132 P: InstrumentedPool<Database = DB> + Send + Sync,
133 for<'c> &'c P: Executor<'c, Database = DB>,
134 {
135 let span = query_span_with_metadata(sql.as_ref(), pool);
136 self.fetch_optional(pool.as_executor())
137 .instrument(span)
138 .await
139 }
140
141 async fn fetch_all_instrumented<P>(
142 self,
143 pool: &P,
144 sql: impl AsRef<str> + Send,
145 ) -> Result<Vec<Self::Output>, sqlx::Error>
146 where
147 P: InstrumentedPool<Database = DB> + Send + Sync,
148 for<'c> &'c P: Executor<'c, Database = DB>,
149 {
150 let span = query_span_with_metadata(sql.as_ref(), pool);
151 self.fetch_all(pool.as_executor()).instrument(span).await
152 }
153}
154
155impl<'q, DB, F, A, O> InstrumentedFetch<'q, DB> for sqlx::query::Map<'q, DB, F, A>
156where
157 DB: Database,
158 F: FnMut(DB::Row) -> Result<O, sqlx::Error> + Send,
159 O: Send + Unpin,
160 A: 'q + Send + sqlx::IntoArguments<'q, DB>,
161{
162 type Output = O;
163
164 async fn fetch_one_instrumented<P>(
165 self,
166 pool: &P,
167 sql: impl AsRef<str> + Send,
168 ) -> Result<Self::Output, sqlx::Error>
169 where
170 P: InstrumentedPool<Database = DB> + Send + Sync,
171 for<'c> &'c P: Executor<'c, Database = DB>,
172 {
173 let span = query_span_with_metadata(sql.as_ref(), pool);
174 self.fetch_one(pool.as_executor()).instrument(span).await
175 }
176
177 async fn fetch_optional_instrumented<P>(
178 self,
179 pool: &P,
180 sql: impl AsRef<str> + Send,
181 ) -> Result<Option<Self::Output>, sqlx::Error>
182 where
183 P: InstrumentedPool<Database = DB> + Send + Sync,
184 for<'c> &'c P: Executor<'c, Database = DB>,
185 {
186 let span = query_span_with_metadata(sql.as_ref(), pool);
187 self.fetch_optional(pool.as_executor())
188 .instrument(span)
189 .await
190 }
191
192 async fn fetch_all_instrumented<P>(
193 self,
194 pool: &P,
195 sql: impl AsRef<str> + Send,
196 ) -> Result<Vec<Self::Output>, sqlx::Error>
197 where
198 P: InstrumentedPool<Database = DB> + Send + Sync,
199 for<'c> &'c P: Executor<'c, Database = DB>,
200 {
201 let span = query_span_with_metadata(sql.as_ref(), pool);
202 self.fetch_all(pool.as_executor()).instrument(span).await
203 }
204}
205
206impl<'q, DB, O, A> InstrumentedFetch<'q, DB> for sqlx::query::QueryScalar<'q, DB, O, A>
207where
208 DB: Database,
209 O: Send + Unpin + for<'r> sqlx::Decode<'r, DB> + sqlx::Type<DB>,
210 A: 'q + Send + sqlx::IntoArguments<'q, DB>,
211 usize: sqlx::ColumnIndex<DB::Row>,
212{
213 type Output = O;
214
215 async fn fetch_one_instrumented<P>(
216 self,
217 pool: &P,
218 sql: impl AsRef<str> + Send,
219 ) -> Result<Self::Output, sqlx::Error>
220 where
221 P: InstrumentedPool<Database = DB> + Send + Sync,
222 for<'c> &'c P: Executor<'c, Database = DB>,
223 {
224 let span = query_span_with_metadata(sql.as_ref(), pool);
225 self.fetch_one(pool.as_executor()).instrument(span).await
226 }
227
228 async fn fetch_optional_instrumented<P>(
229 self,
230 pool: &P,
231 sql: impl AsRef<str> + Send,
232 ) -> Result<Option<Self::Output>, sqlx::Error>
233 where
234 P: InstrumentedPool<Database = DB> + Send + Sync,
235 for<'c> &'c P: Executor<'c, Database = DB>,
236 {
237 let span = query_span_with_metadata(sql.as_ref(), pool);
238 self.fetch_optional(pool.as_executor())
239 .instrument(span)
240 .await
241 }
242
243 async fn fetch_all_instrumented<P>(
244 self,
245 pool: &P,
246 sql: impl AsRef<str> + Send,
247 ) -> Result<Vec<Self::Output>, sqlx::Error>
248 where
249 P: InstrumentedPool<Database = DB> + Send + Sync,
250 for<'c> &'c P: Executor<'c, Database = DB>,
251 {
252 let span = query_span_with_metadata(sql.as_ref(), pool);
253 self.fetch_all(pool.as_executor()).instrument(span).await
254 }
255}
256
257impl<'q, DB, A> InstrumentedExecute<'q, DB> for sqlx::query::Query<'q, DB, A>
258where
259 DB: Database,
260 A: 'q + Send + sqlx::IntoArguments<'q, DB>,
261{
262 async fn execute_instrumented<P>(
263 self,
264 pool: &P,
265 sql: impl AsRef<str> + Send,
266 ) -> Result<DB::QueryResult, sqlx::Error>
267 where
268 P: InstrumentedPool<Database = DB> + Send + Sync,
269 for<'c> &'c P: Executor<'c, Database = DB>,
270 {
271 let span = query_span_with_metadata(sql.as_ref(), pool);
272 self.execute(pool.as_executor()).instrument(span).await
273 }
274}