dogdata_sqlx/
execute.rs

1#[cfg(feature = "mysql")]
2use sqlx::MySqlPool;
3#[cfg(feature = "postgres")]
4use sqlx::PgPool;
5use sqlx::{Database, Execute, Executor};
6use tracing::Instrument;
7
8use crate::sqlx_otel_span_macro::query_span_with_metadata;
9
10#[derive(Debug, Clone)]
11pub struct ConnectionInfo {
12    pub host: String,
13    pub port: u16,
14    pub database: String,
15    pub system: &'static str,
16}
17
18pub trait InstrumentedPool: Sized {
19    type Database: Database;
20
21    fn connection_info(&self) -> ConnectionInfo;
22
23    fn as_executor(&self) -> &Self;
24}
25
26#[cfg(feature = "postgres")]
27impl InstrumentedPool for PgPool {
28    type Database = sqlx::Postgres;
29
30    fn connection_info(&self) -> ConnectionInfo {
31        let options = self.connect_options();
32        ConnectionInfo {
33            host: options.get_host().to_string(),
34            port: options.get_port(),
35            database: options.get_database().unwrap_or("postgres").to_string(),
36            system: "postgresql",
37        }
38    }
39
40    fn as_executor(&self) -> &Self {
41        self
42    }
43}
44
45#[cfg(feature = "mysql")]
46impl InstrumentedPool for MySqlPool {
47    type Database = sqlx::MySql;
48
49    fn connection_info(&self) -> ConnectionInfo {
50        let options = self.connect_options();
51        ConnectionInfo {
52            host: options.get_host().to_string(),
53            port: options.get_port(),
54            database: options.get_database().unwrap_or("mysql").to_string(),
55            system: "mysql",
56        }
57    }
58
59    fn as_executor(&self) -> &Self {
60        self
61    }
62}
63
64pub trait InstrumentedFetch<'q, DB: Database>: Sized + Send {
65    type Output: Send;
66
67    fn fetch_one_instrumented<P>(
68        self,
69        pool: &P,
70        sql: impl AsRef<str> + Send,
71    ) -> impl Future<Output = Result<Self::Output, sqlx::Error>> + Send
72    where
73        P: InstrumentedPool<Database = DB> + Send + Sync,
74        for<'c> &'c P: Executor<'c, Database = DB>;
75
76    fn fetch_optional_instrumented<P>(
77        self,
78        pool: &P,
79        sql: impl AsRef<str> + Send,
80    ) -> impl Future<Output = Result<Option<Self::Output>, sqlx::Error>> + Send
81    where
82        P: InstrumentedPool<Database = DB> + Send + Sync,
83        for<'c> &'c P: Executor<'c, Database = DB>;
84
85    fn fetch_all_instrumented<P>(
86        self,
87        pool: &P,
88        sql: impl AsRef<str> + Send,
89    ) -> impl Future<Output = Result<Vec<Self::Output>, sqlx::Error>> + Send
90    where
91        P: InstrumentedPool<Database = DB> + Send + Sync,
92        for<'c> &'c P: Executor<'c, Database = DB>;
93}
94
95pub trait InstrumentedExecute<'q, DB: Database>: Sized + Execute<'q, DB> + Send {
96    fn execute_instrumented<P>(
97        self,
98        pool: &P,
99        sql: impl AsRef<str> + Send,
100    ) -> impl Future<Output = Result<DB::QueryResult, sqlx::Error>> + Send
101    where
102        P: InstrumentedPool<Database = DB> + Send + Sync,
103        for<'c> &'c P: Executor<'c, Database = DB>;
104}
105
106impl<'q, DB, A> InstrumentedFetch<'q, DB> for sqlx::query::Query<'q, DB, A>
107where
108    DB: Database,
109    A: 'q + Send + sqlx::IntoArguments<'q, DB>,
110{
111    type Output = DB::Row;
112
113    async fn fetch_one_instrumented<P>(
114        self,
115        pool: &P,
116        sql: impl AsRef<str> + Send,
117    ) -> Result<Self::Output, sqlx::Error>
118    where
119        P: InstrumentedPool<Database = DB> + Send + Sync,
120        for<'c> &'c P: Executor<'c, Database = DB>,
121    {
122        let span = query_span_with_metadata(sql.as_ref(), pool);
123        self.fetch_one(pool.as_executor()).instrument(span).await
124    }
125
126    async fn fetch_optional_instrumented<P>(
127        self,
128        pool: &P,
129        sql: impl AsRef<str> + Send,
130    ) -> Result<Option<Self::Output>, sqlx::Error>
131    where
132        P: InstrumentedPool<Database = DB> + Send + Sync,
133        for<'c> &'c P: Executor<'c, Database = DB>,
134    {
135        let span = query_span_with_metadata(sql.as_ref(), pool);
136        self.fetch_optional(pool.as_executor())
137            .instrument(span)
138            .await
139    }
140
141    async fn fetch_all_instrumented<P>(
142        self,
143        pool: &P,
144        sql: impl AsRef<str> + Send,
145    ) -> Result<Vec<Self::Output>, sqlx::Error>
146    where
147        P: InstrumentedPool<Database = DB> + Send + Sync,
148        for<'c> &'c P: Executor<'c, Database = DB>,
149    {
150        let span = query_span_with_metadata(sql.as_ref(), pool);
151        self.fetch_all(pool.as_executor()).instrument(span).await
152    }
153}
154
155impl<'q, DB, F, A, O> InstrumentedFetch<'q, DB> for sqlx::query::Map<'q, DB, F, A>
156where
157    DB: Database,
158    F: FnMut(DB::Row) -> Result<O, sqlx::Error> + Send,
159    O: Send + Unpin,
160    A: 'q + Send + sqlx::IntoArguments<'q, DB>,
161{
162    type Output = O;
163
164    async fn fetch_one_instrumented<P>(
165        self,
166        pool: &P,
167        sql: impl AsRef<str> + Send,
168    ) -> Result<Self::Output, sqlx::Error>
169    where
170        P: InstrumentedPool<Database = DB> + Send + Sync,
171        for<'c> &'c P: Executor<'c, Database = DB>,
172    {
173        let span = query_span_with_metadata(sql.as_ref(), pool);
174        self.fetch_one(pool.as_executor()).instrument(span).await
175    }
176
177    async fn fetch_optional_instrumented<P>(
178        self,
179        pool: &P,
180        sql: impl AsRef<str> + Send,
181    ) -> Result<Option<Self::Output>, sqlx::Error>
182    where
183        P: InstrumentedPool<Database = DB> + Send + Sync,
184        for<'c> &'c P: Executor<'c, Database = DB>,
185    {
186        let span = query_span_with_metadata(sql.as_ref(), pool);
187        self.fetch_optional(pool.as_executor())
188            .instrument(span)
189            .await
190    }
191
192    async fn fetch_all_instrumented<P>(
193        self,
194        pool: &P,
195        sql: impl AsRef<str> + Send,
196    ) -> Result<Vec<Self::Output>, sqlx::Error>
197    where
198        P: InstrumentedPool<Database = DB> + Send + Sync,
199        for<'c> &'c P: Executor<'c, Database = DB>,
200    {
201        let span = query_span_with_metadata(sql.as_ref(), pool);
202        self.fetch_all(pool.as_executor()).instrument(span).await
203    }
204}
205
206impl<'q, DB, O, A> InstrumentedFetch<'q, DB> for sqlx::query::QueryScalar<'q, DB, O, A>
207where
208    DB: Database,
209    O: Send + Unpin + for<'r> sqlx::Decode<'r, DB> + sqlx::Type<DB>,
210    A: 'q + Send + sqlx::IntoArguments<'q, DB>,
211    usize: sqlx::ColumnIndex<DB::Row>,
212{
213    type Output = O;
214
215    async fn fetch_one_instrumented<P>(
216        self,
217        pool: &P,
218        sql: impl AsRef<str> + Send,
219    ) -> Result<Self::Output, sqlx::Error>
220    where
221        P: InstrumentedPool<Database = DB> + Send + Sync,
222        for<'c> &'c P: Executor<'c, Database = DB>,
223    {
224        let span = query_span_with_metadata(sql.as_ref(), pool);
225        self.fetch_one(pool.as_executor()).instrument(span).await
226    }
227
228    async fn fetch_optional_instrumented<P>(
229        self,
230        pool: &P,
231        sql: impl AsRef<str> + Send,
232    ) -> Result<Option<Self::Output>, sqlx::Error>
233    where
234        P: InstrumentedPool<Database = DB> + Send + Sync,
235        for<'c> &'c P: Executor<'c, Database = DB>,
236    {
237        let span = query_span_with_metadata(sql.as_ref(), pool);
238        self.fetch_optional(pool.as_executor())
239            .instrument(span)
240            .await
241    }
242
243    async fn fetch_all_instrumented<P>(
244        self,
245        pool: &P,
246        sql: impl AsRef<str> + Send,
247    ) -> Result<Vec<Self::Output>, sqlx::Error>
248    where
249        P: InstrumentedPool<Database = DB> + Send + Sync,
250        for<'c> &'c P: Executor<'c, Database = DB>,
251    {
252        let span = query_span_with_metadata(sql.as_ref(), pool);
253        self.fetch_all(pool.as_executor()).instrument(span).await
254    }
255}
256
257impl<'q, DB, A> InstrumentedExecute<'q, DB> for sqlx::query::Query<'q, DB, A>
258where
259    DB: Database,
260    A: 'q + Send + sqlx::IntoArguments<'q, DB>,
261{
262    async fn execute_instrumented<P>(
263        self,
264        pool: &P,
265        sql: impl AsRef<str> + Send,
266    ) -> Result<DB::QueryResult, sqlx::Error>
267    where
268        P: InstrumentedPool<Database = DB> + Send + Sync,
269        for<'c> &'c P: Executor<'c, Database = DB>,
270    {
271        let span = query_span_with_metadata(sql.as_ref(), pool);
272        self.execute(pool.as_executor()).instrument(span).await
273    }
274}