Skip to main content

sqlx_tracing/
transaction.rs

1use futures::{StreamExt, TryStreamExt};
2use sqlx::Error;
3use tracing::Instrument;
4
5impl<'c, DB> crate::Transaction<'c, DB>
6where
7    DB: crate::prelude::Database + sqlx::Database,
8    for<'a> &'a mut DB::Connection: sqlx::Executor<'a, Database = DB>,
9{
10    /// Returns a tracing-instrumented executor for this transaction.
11    ///
12    /// This allows running queries with full span context and attributes.
13    pub fn executor(&mut self) -> crate::Connection<'_, DB> {
14        crate::Connection {
15            inner: &mut *self.inner,
16            attributes: self.attributes.clone(),
17        }
18    }
19
20    /// Commits this transaction or savepoint.
21    pub async fn commit(self) -> Result<(), Error> {
22        self.inner.commit().await
23    }
24
25    /// Aborts this transaction or savepoint.
26    pub async fn rollback(self) -> Result<(), Error> {
27        self.inner.rollback().await
28    }
29}
30
31/// Implements `sqlx::Executor` for a mutable reference to a tracing-instrumented transaction.
32///
33/// Each method creates a tracing span for the SQL operation, attaches relevant attributes,
34/// and records errors or row counts as appropriate for observability.
35impl<'c, DB> sqlx::Executor<'c> for &'c mut crate::Transaction<'c, DB>
36where
37    DB: crate::prelude::Database + sqlx::Database,
38    for<'a> &'a mut DB::Connection: sqlx::Executor<'a, Database = DB>,
39{
40    type Database = DB;
41
42    #[doc(hidden)]
43    fn describe<'e, 'q: 'e>(
44        self,
45        sql: &'q str,
46    ) -> futures::future::BoxFuture<'e, Result<sqlx::Describe<Self::Database>, sqlx::Error>>
47    where
48        'c: 'e,
49    {
50        let attrs = &self.attributes;
51        let span = crate::instrument!("sqlx.describe", sql, attrs);
52        Box::pin(
53            async move {
54                let fut = (&mut self.inner).describe(sql);
55                fut.await.inspect_err(crate::span::record_error)
56            }
57            .instrument(span),
58        )
59    }
60
61    fn execute<'e, 'q: 'e, E>(
62        self,
63        query: E,
64    ) -> futures::future::BoxFuture<
65        'e,
66        Result<<Self::Database as sqlx::Database>::QueryResult, sqlx::Error>,
67    >
68    where
69        E: 'q + sqlx::Execute<'q, Self::Database>,
70        'c: 'e,
71    {
72        let sql = query.sql();
73        let attrs = &self.attributes;
74        let span = crate::instrument!("sqlx.execute", sql, attrs);
75        let fut = (&mut self.inner).execute(query);
76        Box::pin(async move { fut.await.inspect_err(crate::span::record_error) }.instrument(span))
77    }
78
79    fn execute_many<'e, 'q: 'e, E>(
80        self,
81        query: E,
82    ) -> futures::stream::BoxStream<
83        'e,
84        Result<<Self::Database as sqlx::Database>::QueryResult, sqlx::Error>,
85    >
86    where
87        E: 'q + sqlx::Execute<'q, Self::Database>,
88        'c: 'e,
89    {
90        let sql = query.sql();
91        let attrs = &self.attributes;
92        let span = crate::instrument!("sqlx.execute_many", sql, attrs);
93        let stream = (&mut self.inner).execute_many(query);
94        use futures::StreamExt;
95        Box::pin(
96            stream
97                .inspect(move |_| {
98                    let _enter = span.enter();
99                })
100                .inspect_err(crate::span::record_error),
101        )
102    }
103
104    fn fetch<'e, 'q: 'e, E>(
105        self,
106        query: E,
107    ) -> futures::stream::BoxStream<'e, Result<<Self::Database as sqlx::Database>::Row, sqlx::Error>>
108    where
109        E: 'q + sqlx::Execute<'q, Self::Database>,
110        'c: 'e,
111    {
112        let sql = query.sql();
113        let attrs = &self.attributes;
114        let span = crate::instrument!("sqlx.fetch", sql, attrs);
115        let stream = (&mut self.inner).fetch(query);
116        use futures::StreamExt;
117        Box::pin(
118            stream
119                .inspect(move |_| {
120                    let _enter = span.enter();
121                })
122                .inspect_err(crate::span::record_error),
123        )
124    }
125
126    fn fetch_all<'e, 'q: 'e, E>(
127        self,
128        query: E,
129    ) -> futures::future::BoxFuture<
130        'e,
131        Result<Vec<<Self::Database as sqlx::Database>::Row>, sqlx::Error>,
132    >
133    where
134        E: 'q + sqlx::Execute<'q, Self::Database>,
135        'c: 'e,
136    {
137        let sql = query.sql();
138        let attrs = &self.attributes;
139        let span = crate::instrument!("sqlx.fetch_all", sql, attrs);
140        let fut = (&mut self.inner).fetch_all(query);
141        Box::pin(
142            async move {
143                fut.await
144                    .inspect(|res| {
145                        let span = tracing::Span::current();
146                        span.record("db.response.returned_rows", res.len());
147                    })
148                    .inspect_err(crate::span::record_error)
149            }
150            .instrument(span),
151        )
152    }
153
154    fn fetch_many<'e, 'q: 'e, E>(
155        self,
156        query: E,
157    ) -> futures::stream::BoxStream<
158        'e,
159        Result<
160            sqlx::Either<
161                <Self::Database as sqlx::Database>::QueryResult,
162                <Self::Database as sqlx::Database>::Row,
163            >,
164            sqlx::Error,
165        >,
166    >
167    where
168        E: 'q + sqlx::Execute<'q, Self::Database>,
169        'c: 'e,
170    {
171        let sql = query.sql();
172        let attrs = &self.attributes;
173        let span = crate::instrument!("sqlx.fetch_all", sql, attrs);
174        let stream = (&mut self.inner).fetch_many(query);
175        Box::pin(
176            stream
177                .inspect(move |_| {
178                    let _enter = span.enter();
179                })
180                .inspect_err(crate::span::record_error),
181        )
182    }
183
184    fn fetch_one<'e, 'q: 'e, E>(
185        self,
186        query: E,
187    ) -> futures::future::BoxFuture<'e, Result<<Self::Database as sqlx::Database>::Row, sqlx::Error>>
188    where
189        E: 'q + sqlx::Execute<'q, Self::Database>,
190        'c: 'e,
191    {
192        let sql = query.sql();
193        let attrs = &self.attributes;
194        let span = crate::instrument!("sqlx.fetch_one", sql, attrs);
195        let fut = (&mut self.inner).fetch_one(query);
196        Box::pin(
197            async move {
198                fut.await
199                    .inspect(crate::span::record_one)
200                    .inspect_err(crate::span::record_error)
201            }
202            .instrument(span),
203        )
204    }
205
206    fn fetch_optional<'e, 'q: 'e, E>(
207        self,
208        query: E,
209    ) -> futures::future::BoxFuture<
210        'e,
211        Result<Option<<Self::Database as sqlx::Database>::Row>, sqlx::Error>,
212    >
213    where
214        E: 'q + sqlx::Execute<'q, Self::Database>,
215        'c: 'e,
216    {
217        let sql = query.sql();
218        let attrs = &self.attributes;
219        let span = crate::instrument!("sqlx.fetch_optional", sql, attrs);
220        let fut = (&mut self.inner).fetch_optional(query);
221        Box::pin(
222            async move {
223                fut.await
224                    .inspect(crate::span::record_optional)
225                    .inspect_err(crate::span::record_error)
226            }
227            .instrument(span),
228        )
229    }
230
231    fn prepare<'e, 'q: 'e>(
232        self,
233        query: &'q str,
234    ) -> futures::future::BoxFuture<
235        'e,
236        Result<<Self::Database as sqlx::Database>::Statement<'q>, sqlx::Error>,
237    >
238    where
239        'c: 'e,
240    {
241        let attrs = &self.attributes;
242        let span = crate::instrument!("sqlx.prepare", query, attrs);
243        let fut = (&mut self.inner).prepare(query);
244        Box::pin(async move { fut.await.inspect_err(crate::span::record_error) }.instrument(span))
245    }
246
247    fn prepare_with<'e, 'q: 'e>(
248        self,
249        sql: &'q str,
250        parameters: &'e [<Self::Database as sqlx::Database>::TypeInfo],
251    ) -> futures::future::BoxFuture<
252        'e,
253        Result<<Self::Database as sqlx::Database>::Statement<'q>, sqlx::Error>,
254    >
255    where
256        'c: 'e,
257    {
258        let attrs = &self.attributes;
259        let span = crate::instrument!("sqlx.prepare_with", sql, attrs);
260        let fut = (&mut self.inner).prepare_with(sql, parameters);
261        Box::pin(async move { fut.await.inspect_err(crate::span::record_error) }.instrument(span))
262    }
263}