sqlx_tracing/
transaction.rs

1use futures::{StreamExt, TryStreamExt};
2use tracing::Instrument;
3
4impl<'c, DB> crate::Transaction<'c, DB>
5where
6    DB: crate::prelude::Database + sqlx::Database,
7    for<'a> &'a mut DB::Connection: sqlx::Executor<'a, Database = DB>,
8{
9    /// Returns a tracing-instrumented executor for this transaction.
10    ///
11    /// This allows running queries with full span context and attributes.
12    pub fn executor(&mut self) -> crate::Connection<'_, DB> {
13        crate::Connection {
14            inner: &mut *self.inner,
15            attributes: self.attributes.clone(),
16        }
17    }
18}
19
20/// Implements `sqlx::Executor` for a mutable reference to a tracing-instrumented transaction.
21///
22/// Each method creates a tracing span for the SQL operation, attaches relevant attributes,
23/// and records errors or row counts as appropriate for observability.
24impl<'c, DB> sqlx::Executor<'c> for &'c mut crate::Transaction<'c, DB>
25where
26    DB: crate::prelude::Database + sqlx::Database,
27    for<'a> &'a mut DB::Connection: sqlx::Executor<'a, Database = DB>,
28{
29    type Database = DB;
30
31    #[doc(hidden)]
32    fn describe<'e, 'q: 'e>(
33        self,
34        sql: &'q str,
35    ) -> futures::future::BoxFuture<'e, Result<sqlx::Describe<Self::Database>, sqlx::Error>>
36    where
37        'c: 'e,
38    {
39        let attrs = &self.attributes;
40        let span = crate::instrument!("sqlx.describe", sql, attrs);
41        Box::pin(
42            async move {
43                let fut = (&mut self.inner).describe(sql);
44                fut.await.inspect_err(crate::span::record_error)
45            }
46            .instrument(span),
47        )
48    }
49
50    fn execute<'e, 'q: 'e, E>(
51        self,
52        query: E,
53    ) -> futures::future::BoxFuture<
54        'e,
55        Result<<Self::Database as sqlx::Database>::QueryResult, sqlx::Error>,
56    >
57    where
58        E: 'q + sqlx::Execute<'q, Self::Database>,
59        'c: 'e,
60    {
61        let sql = query.sql();
62        let attrs = &self.attributes;
63        let span = crate::instrument!("sqlx.execute", sql, attrs);
64        let fut = (&mut self.inner).execute(query);
65        Box::pin(async move { fut.await.inspect_err(crate::span::record_error) }.instrument(span))
66    }
67
68    fn execute_many<'e, 'q: 'e, E>(
69        self,
70        query: E,
71    ) -> futures::stream::BoxStream<
72        'e,
73        Result<<Self::Database as sqlx::Database>::QueryResult, sqlx::Error>,
74    >
75    where
76        E: 'q + sqlx::Execute<'q, Self::Database>,
77        'c: 'e,
78    {
79        let sql = query.sql();
80        let attrs = &self.attributes;
81        let span = crate::instrument!("sqlx.execute_many", sql, attrs);
82        let stream = (&mut self.inner).execute_many(query);
83        use futures::StreamExt;
84        Box::pin(
85            stream
86                .inspect(move |_| {
87                    let _enter = span.enter();
88                })
89                .inspect_err(crate::span::record_error),
90        )
91    }
92
93    fn fetch<'e, 'q: 'e, E>(
94        self,
95        query: E,
96    ) -> futures::stream::BoxStream<'e, Result<<Self::Database as sqlx::Database>::Row, sqlx::Error>>
97    where
98        E: 'q + sqlx::Execute<'q, Self::Database>,
99        'c: 'e,
100    {
101        let sql = query.sql();
102        let attrs = &self.attributes;
103        let span = crate::instrument!("sqlx.fetch", sql, attrs);
104        let stream = (&mut self.inner).fetch(query);
105        use futures::StreamExt;
106        Box::pin(
107            stream
108                .inspect(move |_| {
109                    let _enter = span.enter();
110                })
111                .inspect_err(crate::span::record_error),
112        )
113    }
114
115    fn fetch_all<'e, 'q: 'e, E>(
116        self,
117        query: E,
118    ) -> futures::future::BoxFuture<
119        'e,
120        Result<Vec<<Self::Database as sqlx::Database>::Row>, sqlx::Error>,
121    >
122    where
123        E: 'q + sqlx::Execute<'q, Self::Database>,
124        'c: 'e,
125    {
126        let sql = query.sql();
127        let attrs = &self.attributes;
128        let span = crate::instrument!("sqlx.fetch_all", sql, attrs);
129        let fut = (&mut self.inner).fetch_all(query);
130        Box::pin(
131            async move {
132                fut.await
133                    .inspect(|res| {
134                        let span = tracing::Span::current();
135                        span.record("db.response.returned_rows", res.len());
136                    })
137                    .inspect_err(crate::span::record_error)
138            }
139            .instrument(span),
140        )
141    }
142
143    fn fetch_many<'e, 'q: 'e, E>(
144        self,
145        query: E,
146    ) -> futures::stream::BoxStream<
147        'e,
148        Result<
149            sqlx::Either<
150                <Self::Database as sqlx::Database>::QueryResult,
151                <Self::Database as sqlx::Database>::Row,
152            >,
153            sqlx::Error,
154        >,
155    >
156    where
157        E: 'q + sqlx::Execute<'q, Self::Database>,
158        'c: 'e,
159    {
160        let sql = query.sql();
161        let attrs = &self.attributes;
162        let span = crate::instrument!("sqlx.fetch_all", sql, attrs);
163        let stream = (&mut self.inner).fetch_many(query);
164        Box::pin(
165            stream
166                .inspect(move |_| {
167                    let _enter = span.enter();
168                })
169                .inspect_err(crate::span::record_error),
170        )
171    }
172
173    fn fetch_one<'e, 'q: 'e, E>(
174        self,
175        query: E,
176    ) -> futures::future::BoxFuture<'e, Result<<Self::Database as sqlx::Database>::Row, sqlx::Error>>
177    where
178        E: 'q + sqlx::Execute<'q, Self::Database>,
179        'c: 'e,
180    {
181        let sql = query.sql();
182        let attrs = &self.attributes;
183        let span = crate::instrument!("sqlx.fetch_one", sql, attrs);
184        let fut = (&mut self.inner).fetch_one(query);
185        Box::pin(
186            async move {
187                fut.await
188                    .inspect(crate::span::record_one)
189                    .inspect_err(crate::span::record_error)
190            }
191            .instrument(span),
192        )
193    }
194
195    fn fetch_optional<'e, 'q: 'e, E>(
196        self,
197        query: E,
198    ) -> futures::future::BoxFuture<
199        'e,
200        Result<Option<<Self::Database as sqlx::Database>::Row>, sqlx::Error>,
201    >
202    where
203        E: 'q + sqlx::Execute<'q, Self::Database>,
204        'c: 'e,
205    {
206        let sql = query.sql();
207        let attrs = &self.attributes;
208        let span = crate::instrument!("sqlx.fetch_optional", sql, attrs);
209        let fut = (&mut self.inner).fetch_optional(query);
210        Box::pin(
211            async move {
212                fut.await
213                    .inspect(crate::span::record_optional)
214                    .inspect_err(crate::span::record_error)
215            }
216            .instrument(span),
217        )
218    }
219
220    fn prepare<'e, 'q: 'e>(
221        self,
222        query: &'q str,
223    ) -> futures::future::BoxFuture<
224        'e,
225        Result<<Self::Database as sqlx::Database>::Statement<'q>, sqlx::Error>,
226    >
227    where
228        'c: 'e,
229    {
230        let attrs = &self.attributes;
231        let span = crate::instrument!("sqlx.prepare", query, attrs);
232        let fut = (&mut self.inner).prepare(query);
233        Box::pin(async move { fut.await.inspect_err(crate::span::record_error) }.instrument(span))
234    }
235
236    fn prepare_with<'e, 'q: 'e>(
237        self,
238        sql: &'q str,
239        parameters: &'e [<Self::Database as sqlx::Database>::TypeInfo],
240    ) -> futures::future::BoxFuture<
241        'e,
242        Result<<Self::Database as sqlx::Database>::Statement<'q>, sqlx::Error>,
243    >
244    where
245        'c: 'e,
246    {
247        let attrs = &self.attributes;
248        let span = crate::instrument!("sqlx.prepare_with", sql, attrs);
249        let fut = (&mut self.inner).prepare_with(sql, parameters);
250        Box::pin(async move { fut.await.inspect_err(crate::span::record_error) }.instrument(span))
251    }
252}