1use futures::{StreamExt, TryStreamExt};
2use sqlx::Error;
3use tracing::Instrument;
4
5impl<'c, DB> crate::Transaction<'c, DB>
6where
7 DB: crate::prelude::Database + sqlx::Database,
8 for<'a> &'a mut DB::Connection: sqlx::Executor<'a, Database = DB>,
9{
10 pub fn executor(&mut self) -> crate::Connection<'_, DB> {
14 crate::Connection {
15 inner: &mut *self.inner,
16 attributes: self.attributes.clone(),
17 }
18 }
19
20 pub async fn commit(self) -> Result<(), Error> {
22 self.inner.commit().await
23 }
24
25 pub async fn rollback(self) -> Result<(), Error> {
27 self.inner.rollback().await
28 }
29}
30
31impl<'c, DB> sqlx::Executor<'c> for &'c mut crate::Transaction<'c, DB>
36where
37 DB: crate::prelude::Database + sqlx::Database,
38 for<'a> &'a mut DB::Connection: sqlx::Executor<'a, Database = DB>,
39{
40 type Database = DB;
41
42 #[doc(hidden)]
43 fn describe<'e, 'q: 'e>(
44 self,
45 sql: &'q str,
46 ) -> futures::future::BoxFuture<'e, Result<sqlx::Describe<Self::Database>, sqlx::Error>>
47 where
48 'c: 'e,
49 {
50 let attrs = &self.attributes;
51 let span = crate::instrument!("sqlx.describe", sql, attrs);
52 Box::pin(
53 async move {
54 let fut = (&mut self.inner).describe(sql);
55 fut.await.inspect_err(crate::span::record_error)
56 }
57 .instrument(span),
58 )
59 }
60
61 fn execute<'e, 'q: 'e, E>(
62 self,
63 query: E,
64 ) -> futures::future::BoxFuture<
65 'e,
66 Result<<Self::Database as sqlx::Database>::QueryResult, sqlx::Error>,
67 >
68 where
69 E: 'q + sqlx::Execute<'q, Self::Database>,
70 'c: 'e,
71 {
72 let sql = query.sql();
73 let attrs = &self.attributes;
74 let span = crate::instrument!("sqlx.execute", sql, attrs);
75 let fut = (&mut self.inner).execute(query);
76 Box::pin(async move { fut.await.inspect_err(crate::span::record_error) }.instrument(span))
77 }
78
79 fn execute_many<'e, 'q: 'e, E>(
80 self,
81 query: E,
82 ) -> futures::stream::BoxStream<
83 'e,
84 Result<<Self::Database as sqlx::Database>::QueryResult, sqlx::Error>,
85 >
86 where
87 E: 'q + sqlx::Execute<'q, Self::Database>,
88 'c: 'e,
89 {
90 let sql = query.sql();
91 let attrs = &self.attributes;
92 let span = crate::instrument!("sqlx.execute_many", sql, attrs);
93 let stream = (&mut self.inner).execute_many(query);
94 use futures::StreamExt;
95 Box::pin(
96 stream
97 .inspect(move |_| {
98 let _enter = span.enter();
99 })
100 .inspect_err(crate::span::record_error),
101 )
102 }
103
104 fn fetch<'e, 'q: 'e, E>(
105 self,
106 query: E,
107 ) -> futures::stream::BoxStream<'e, Result<<Self::Database as sqlx::Database>::Row, sqlx::Error>>
108 where
109 E: 'q + sqlx::Execute<'q, Self::Database>,
110 'c: 'e,
111 {
112 let sql = query.sql();
113 let attrs = &self.attributes;
114 let span = crate::instrument!("sqlx.fetch", sql, attrs);
115 let stream = (&mut self.inner).fetch(query);
116 use futures::StreamExt;
117 Box::pin(
118 stream
119 .inspect(move |_| {
120 let _enter = span.enter();
121 })
122 .inspect_err(crate::span::record_error),
123 )
124 }
125
126 fn fetch_all<'e, 'q: 'e, E>(
127 self,
128 query: E,
129 ) -> futures::future::BoxFuture<
130 'e,
131 Result<Vec<<Self::Database as sqlx::Database>::Row>, sqlx::Error>,
132 >
133 where
134 E: 'q + sqlx::Execute<'q, Self::Database>,
135 'c: 'e,
136 {
137 let sql = query.sql();
138 let attrs = &self.attributes;
139 let span = crate::instrument!("sqlx.fetch_all", sql, attrs);
140 let fut = (&mut self.inner).fetch_all(query);
141 Box::pin(
142 async move {
143 fut.await
144 .inspect(|res| {
145 let span = tracing::Span::current();
146 span.record("db.response.returned_rows", res.len());
147 })
148 .inspect_err(crate::span::record_error)
149 }
150 .instrument(span),
151 )
152 }
153
154 fn fetch_many<'e, 'q: 'e, E>(
155 self,
156 query: E,
157 ) -> futures::stream::BoxStream<
158 'e,
159 Result<
160 sqlx::Either<
161 <Self::Database as sqlx::Database>::QueryResult,
162 <Self::Database as sqlx::Database>::Row,
163 >,
164 sqlx::Error,
165 >,
166 >
167 where
168 E: 'q + sqlx::Execute<'q, Self::Database>,
169 'c: 'e,
170 {
171 let sql = query.sql();
172 let attrs = &self.attributes;
173 let span = crate::instrument!("sqlx.fetch_all", sql, attrs);
174 let stream = (&mut self.inner).fetch_many(query);
175 Box::pin(
176 stream
177 .inspect(move |_| {
178 let _enter = span.enter();
179 })
180 .inspect_err(crate::span::record_error),
181 )
182 }
183
184 fn fetch_one<'e, 'q: 'e, E>(
185 self,
186 query: E,
187 ) -> futures::future::BoxFuture<'e, Result<<Self::Database as sqlx::Database>::Row, sqlx::Error>>
188 where
189 E: 'q + sqlx::Execute<'q, Self::Database>,
190 'c: 'e,
191 {
192 let sql = query.sql();
193 let attrs = &self.attributes;
194 let span = crate::instrument!("sqlx.fetch_one", sql, attrs);
195 let fut = (&mut self.inner).fetch_one(query);
196 Box::pin(
197 async move {
198 fut.await
199 .inspect(crate::span::record_one)
200 .inspect_err(crate::span::record_error)
201 }
202 .instrument(span),
203 )
204 }
205
206 fn fetch_optional<'e, 'q: 'e, E>(
207 self,
208 query: E,
209 ) -> futures::future::BoxFuture<
210 'e,
211 Result<Option<<Self::Database as sqlx::Database>::Row>, sqlx::Error>,
212 >
213 where
214 E: 'q + sqlx::Execute<'q, Self::Database>,
215 'c: 'e,
216 {
217 let sql = query.sql();
218 let attrs = &self.attributes;
219 let span = crate::instrument!("sqlx.fetch_optional", sql, attrs);
220 let fut = (&mut self.inner).fetch_optional(query);
221 Box::pin(
222 async move {
223 fut.await
224 .inspect(crate::span::record_optional)
225 .inspect_err(crate::span::record_error)
226 }
227 .instrument(span),
228 )
229 }
230
231 fn prepare<'e, 'q: 'e>(
232 self,
233 query: &'q str,
234 ) -> futures::future::BoxFuture<
235 'e,
236 Result<<Self::Database as sqlx::Database>::Statement<'q>, sqlx::Error>,
237 >
238 where
239 'c: 'e,
240 {
241 let attrs = &self.attributes;
242 let span = crate::instrument!("sqlx.prepare", query, attrs);
243 let fut = (&mut self.inner).prepare(query);
244 Box::pin(async move { fut.await.inspect_err(crate::span::record_error) }.instrument(span))
245 }
246
247 fn prepare_with<'e, 'q: 'e>(
248 self,
249 sql: &'q str,
250 parameters: &'e [<Self::Database as sqlx::Database>::TypeInfo],
251 ) -> futures::future::BoxFuture<
252 'e,
253 Result<<Self::Database as sqlx::Database>::Statement<'q>, sqlx::Error>,
254 >
255 where
256 'c: 'e,
257 {
258 let attrs = &self.attributes;
259 let span = crate::instrument!("sqlx.prepare_with", sql, attrs);
260 let fut = (&mut self.inner).prepare_with(sql, parameters);
261 Box::pin(async move { fut.await.inspect_err(crate::span::record_error) }.instrument(span))
262 }
263}