1use futures::{StreamExt, TryStreamExt};
2use tracing::Instrument;
3
4impl<'p, DB> sqlx::Executor<'p> for &'_ crate::Pool<DB>
5where
6 DB: sqlx::Database + crate::prelude::Database,
7 for<'c> &'c mut DB::Connection: sqlx::Executor<'c, Database = DB>,
8{
9 type Database = DB;
10
11 #[doc(hidden)]
12 fn describe<'e, 'q: 'e>(
13 self,
14 sql: &'q str,
15 ) -> futures::future::BoxFuture<'e, Result<sqlx::Describe<Self::Database>, sqlx::Error>> {
16 let attrs = &self.attributes;
17 let span = crate::instrument!("sqlx.describe", sql, attrs);
18 let fut = self.inner.describe(sql);
19 Box::pin(async move { fut.await.inspect_err(crate::span::record_error) }.instrument(span))
20 }
21
22 fn execute<'e, 'q: 'e, E>(
23 self,
24 query: E,
25 ) -> futures::future::BoxFuture<
26 'e,
27 Result<<Self::Database as sqlx::Database>::QueryResult, sqlx::Error>,
28 >
29 where
30 E: 'q + sqlx::Execute<'q, Self::Database>,
31 {
32 let sql = query.sql();
33 let attrs = &self.attributes;
34 let span = crate::instrument!("sqlx.execute", sql, attrs);
35 let fut = self.inner.execute(query);
36 Box::pin(async move { fut.await.inspect_err(crate::span::record_error) }.instrument(span))
37 }
38
39 fn execute_many<'e, 'q: 'e, E>(
40 self,
41 query: E,
42 ) -> futures::stream::BoxStream<
43 'e,
44 Result<<Self::Database as sqlx::Database>::QueryResult, sqlx::Error>,
45 >
46 where
47 E: 'q + sqlx::Execute<'q, Self::Database>,
48 {
49 let sql = query.sql();
50 let attrs = &self.attributes;
51 let span = crate::instrument!("sqlx.execute_many", sql, attrs);
52 let stream = self.inner.execute_many(query);
53 use futures::StreamExt;
54 Box::pin(
55 stream
56 .inspect(move |_| {
57 let _enter = span.enter();
58 })
59 .inspect_err(crate::span::record_error),
60 )
61 }
62
63 fn fetch<'e, 'q: 'e, E>(
64 self,
65 query: E,
66 ) -> futures::stream::BoxStream<'e, Result<<Self::Database as sqlx::Database>::Row, sqlx::Error>>
67 where
68 E: 'q + sqlx::Execute<'q, Self::Database>,
69 {
70 let sql = query.sql();
71 let attrs = &self.attributes;
72 let span = crate::instrument!("sqlx.fetch", sql, attrs);
73 let stream = self.inner.fetch(query);
74 use futures::StreamExt;
75 Box::pin(
76 stream
77 .inspect(move |_| {
78 let _enter = span.enter();
79 })
80 .inspect_err(crate::span::record_error),
81 )
82 }
83
84 fn fetch_all<'e, 'q: 'e, E>(
85 self,
86 query: E,
87 ) -> futures::future::BoxFuture<
88 'e,
89 Result<Vec<<Self::Database as sqlx::Database>::Row>, sqlx::Error>,
90 >
91 where
92 E: 'q + sqlx::Execute<'q, Self::Database>,
93 {
94 let sql = query.sql();
95 let attrs = &self.attributes;
96 let span = crate::instrument!("sqlx.fetch_all", sql, attrs);
97 let fut = self.inner.fetch_all(query);
98 Box::pin(
99 async move {
100 fut.await
101 .inspect(|res| {
102 let span = tracing::Span::current();
103 span.record("db.response.returned_rows", res.len());
104 })
105 .inspect_err(crate::span::record_error)
106 }
107 .instrument(span),
108 )
109 }
110
111 fn fetch_many<'e, 'q: 'e, E>(
112 self,
113 query: E,
114 ) -> futures::stream::BoxStream<
115 'e,
116 Result<
117 sqlx::Either<
118 <Self::Database as sqlx::Database>::QueryResult,
119 <Self::Database as sqlx::Database>::Row,
120 >,
121 sqlx::Error,
122 >,
123 >
124 where
125 E: 'q + sqlx::Execute<'q, Self::Database>,
126 {
127 let sql = query.sql();
128 let attrs = &self.attributes;
129 let span = crate::instrument!("sqlx.fetch_all", sql, attrs);
130 let stream = self.inner.fetch_many(query);
131 Box::pin(
132 stream
133 .inspect(move |_| {
134 let _enter = span.enter();
135 })
136 .inspect_err(crate::span::record_error),
137 )
138 }
139
140 fn fetch_one<'e, 'q: 'e, E>(
141 self,
142 query: E,
143 ) -> futures::future::BoxFuture<'e, Result<<Self::Database as sqlx::Database>::Row, sqlx::Error>>
144 where
145 E: 'q + sqlx::Execute<'q, Self::Database>,
146 {
147 let sql = query.sql();
148 let attrs = &self.attributes;
149 let span = crate::instrument!("sqlx.fetch_one", sql, attrs);
150 let fut = self.inner.fetch_one(query);
151 Box::pin(
152 async move {
153 fut.await
154 .inspect(crate::span::record_one)
155 .inspect_err(crate::span::record_error)
156 }
157 .instrument(span),
158 )
159 }
160
161 fn fetch_optional<'e, 'q: 'e, E>(
162 self,
163 query: E,
164 ) -> futures::future::BoxFuture<
165 'e,
166 Result<Option<<Self::Database as sqlx::Database>::Row>, sqlx::Error>,
167 >
168 where
169 E: 'q + sqlx::Execute<'q, Self::Database>,
170 {
171 let sql = query.sql();
172 let attrs = &self.attributes;
173 let span = crate::instrument!("sqlx.fetch_optional", sql, attrs);
174 let fut = self.inner.fetch_optional(query);
175 Box::pin(
176 async move {
177 fut.await
178 .inspect(crate::span::record_optional)
179 .inspect_err(crate::span::record_error)
180 }
181 .instrument(span),
182 )
183 }
184
185 fn prepare<'e, 'q: 'e>(
186 self,
187 query: &'q str,
188 ) -> futures::future::BoxFuture<
189 'e,
190 Result<<Self::Database as sqlx::Database>::Statement<'q>, sqlx::Error>,
191 > {
192 let attrs = &self.attributes;
193 let span = crate::instrument!("sqlx.prepare", query, attrs);
194 let fut = self.inner.prepare(query);
195 Box::pin(async move { fut.await.inspect_err(crate::span::record_error) }.instrument(span))
196 }
197
198 fn prepare_with<'e, 'q: 'e>(
199 self,
200 sql: &'q str,
201 parameters: &'e [<Self::Database as sqlx::Database>::TypeInfo],
202 ) -> futures::future::BoxFuture<
203 'e,
204 Result<<Self::Database as sqlx::Database>::Statement<'q>, sqlx::Error>,
205 > {
206 let attrs = &self.attributes;
207 let span = crate::instrument!("sqlx.prepare_with", sql, attrs);
208 let fut = self.inner.prepare_with(sql, parameters);
209 Box::pin(async move { fut.await.inspect_err(crate::span::record_error) }.instrument(span))
210 }
211}