sqlx_askama_template/v3/sql_template_execute.rs
1use futures_core::stream::BoxStream;
2use futures_util::{StreamExt, TryStreamExt};
3use sqlx_core::{
4 Either, Error,
5 arguments::IntoArguments,
6 database::{Database, HasStatementCache},
7 executor::{Execute, Executor},
8 from_row::FromRow,
9 query::{Map, Query, query, query_with},
10 query_as::{QueryAs, query_as, query_as_with},
11};
12/// Internal executor for SQL templates
13pub struct SqlTemplateExecute<'q, DB: Database> {
14 /// Reference to SQL query string
15 pub(crate) sql: &'q str,
16 /// SQL parameters
17 pub(crate) arguments: Option<DB::Arguments<'q>>,
18 /// Persistent flag
19 pub(crate) persistent: bool,
20}
21impl<'q, DB: Database> Clone for SqlTemplateExecute<'q, DB>
22where
23 DB::Arguments<'q>: Clone,
24{
25 fn clone(&self) -> Self {
26 SqlTemplateExecute {
27 sql: self.sql,
28 arguments: self.arguments.clone(),
29 persistent: self.persistent,
30 }
31 }
32}
33impl<'q, DB: Database> SqlTemplateExecute<'q, DB> {
34 /// Creates a new SQL template executor
35 pub fn new(sql: &'q str, arguments: Option<DB::Arguments<'q>>) -> Self {
36 SqlTemplateExecute {
37 sql,
38 arguments,
39 persistent: true,
40 }
41 }
42 /// If `true`, the statement will get prepared once and cached to the
43 /// connection's statement cache.
44 ///
45 /// If queried once with the flag set to `true`, all subsequent queries
46 /// matching the one with the flag will use the cached statement until the
47 /// cache is cleared.
48 ///
49 /// If `false`, the prepared statement will be closed after execution.
50 ///
51 /// Default: `true`.
52 pub fn set_persistent(mut self, persistent: bool) -> Self {
53 self.persistent = persistent;
54 self
55 }
56}
57impl<'q, DB> SqlTemplateExecute<'q, DB>
58where
59 DB: Database + HasStatementCache,
60 DB::Arguments<'q>: IntoArguments<'q, DB>,
61{
62 /// to sqlx_core::QueryAs
63 /// Converts the SQL template to a `QueryAs` object, which can be executed to fetch rows
64 #[inline]
65 pub fn to_query_as<O>(self) -> QueryAs<'q, DB, O, DB::Arguments<'q>>
66 where
67 O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
68 {
69 let q = match self.arguments {
70 Some(args) => query_as_with(self.sql, args),
71 None => query_as(self.sql),
72 };
73 q.persistent(self.persistent)
74 }
75 /// to sqlx_core::Query
76 /// Converts the SQL template to a `Query` object, which can be executed to fetch rows
77 #[inline]
78 pub fn to_query(self) -> Query<'q, DB, DB::Arguments<'q>> {
79 let q = match self.arguments {
80 Some(args) => {
81 // let wrap = ArgWrapper(args);
82 query_with(self.sql, args)
83 }
84 None => query(self.sql),
85 };
86 q.persistent(self.persistent)
87 }
88 /// like sqlx_core::Query::map
89 /// Map each row in the result to another type.
90 #[inline]
91 pub fn map<F, O>(
92 self,
93 f: F,
94 ) -> Map<'q, DB, impl FnMut(DB::Row) -> Result<O, sqlx_core::Error> + Send, DB::Arguments<'q>>
95 where
96 F: FnMut(DB::Row) -> O + Send,
97 O: Unpin,
98 {
99 self.to_query().map(f)
100 }
101
102 /// like sqlx_core::Query::try_map
103 /// Map each row in the result to another type, returning an error if the mapping fails.
104 #[inline]
105 pub fn try_map<F, O>(self, f: F) -> Map<'q, DB, F, DB::Arguments<'q>>
106 where
107 F: FnMut(DB::Row) -> Result<O, sqlx_core::Error> + Send,
108 O: Unpin,
109 {
110 self.to_query().try_map(f)
111 }
112}
113impl<'q, DB> SqlTemplateExecute<'q, DB>
114where
115 DB: Database,
116{
117 /// like sqlx_core::Query::execute
118 /// Execute the query and return the number of rows affected.
119 #[inline]
120 pub async fn execute<'e, 'c: 'e, E>(self, executor: E) -> Result<DB::QueryResult, Error>
121 where
122 'q: 'e,
123 DB::Arguments<'q>: 'e,
124 E: Executor<'c, Database = DB>,
125 {
126 executor.execute(self).await
127 }
128 /// like sqlx_core::Query::execute_many
129 /// Execute multiple queries and return the rows affected from each query, in a stream.
130 #[inline]
131 pub fn execute_many<'e, 'c: 'e, E>(
132 self,
133 executor: E,
134 ) -> BoxStream<'e, Result<DB::QueryResult, Error>>
135 where
136 'q: 'e,
137 DB::Arguments<'q>: 'e,
138 E: Executor<'c, Database = DB>,
139 {
140 #[allow(deprecated)]
141 executor.execute_many(self)
142 }
143 /// like sqlx_core::Query::fetch
144 /// Execute the query and return the generated results as a stream.
145 #[inline]
146 pub fn fetch<'e, 'c: 'e, E>(self, executor: E) -> BoxStream<'e, Result<DB::Row, Error>>
147 where
148 'q: 'e,
149 DB::Arguments<'q>: 'e,
150 E: Executor<'c, Database = DB>,
151 {
152 executor.fetch(self)
153 }
154 /// like sqlx_core::Query::fetch_many
155 /// Execute multiple queries and return the generated results as a stream.
156 ///
157 /// For each query in the stream, any generated rows are returned first,
158 /// then the `QueryResult` with the number of rows affected.
159 #[inline]
160 #[allow(clippy::type_complexity)]
161 pub fn fetch_many<'e, 'c: 'e, E>(
162 self,
163 executor: E,
164 ) -> BoxStream<'e, Result<Either<DB::QueryResult, DB::Row>, Error>>
165 where
166 'q: 'e,
167 DB::Arguments<'q>: 'e,
168 E: Executor<'c, Database = DB>,
169 {
170 #[allow(deprecated)]
171 executor.fetch_many(self)
172 }
173 /// like sqlx_core::Query::fetch_all
174 /// Execute the query and return all the resulting rows collected into a [`Vec`].
175 ///
176 /// ### Note: beware result set size.
177 /// This will attempt to collect the full result set of the query into memory.
178 ///
179 /// To avoid exhausting available memory, ensure the result set has a known upper bound,
180 /// e.g. using `LIMIT`.
181 #[inline]
182 pub async fn fetch_all<'e, 'c: 'e, E>(self, executor: E) -> Result<Vec<DB::Row>, Error>
183 where
184 'q: 'e,
185 DB::Arguments<'q>: 'e,
186 E: Executor<'c, Database = DB>,
187 {
188 executor.fetch_all(self).await
189 }
190 /// like sqlx_core::Query::fetch_one
191 /// Execute the query, returning the first row or [`Error::RowNotFound`] otherwise.
192 ///
193 /// ### Note: for best performance, ensure the query returns at most one row.
194 /// Depending on the driver implementation, if your query can return more than one row,
195 /// it may lead to wasted CPU time and bandwidth on the database server.
196 ///
197 /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
198 /// can result in a more optimal query plan.
199 ///
200 /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
201 ///
202 /// Otherwise, you might want to add `LIMIT 1` to your query.
203 #[inline]
204 pub async fn fetch_one<'e, 'c: 'e, E>(self, executor: E) -> Result<DB::Row, Error>
205 where
206 'q: 'e,
207 DB::Arguments<'q>: 'e,
208 E: Executor<'c, Database = DB>,
209 {
210 executor.fetch_one(self).await
211 }
212 /// like sqlx_core::Query::fetch_optional
213 /// Execute the query, returning the first row or `None` otherwise.
214 ///
215 /// ### Note: for best performance, ensure the query returns at most one row.
216 /// Depending on the driver implementation, if your query can return more than one row,
217 /// it may lead to wasted CPU time and bandwidth on the database server.
218 ///
219 /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
220 /// can result in a more optimal query plan.
221 ///
222 /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
223 ///
224 /// Otherwise, you might want to add `LIMIT 1` to your query.
225 #[inline]
226 pub async fn fetch_optional<'e, 'c: 'e, E>(self, executor: E) -> Result<Option<DB::Row>, Error>
227 where
228 'q: 'e,
229 DB::Arguments<'q>: 'e,
230 E: Executor<'c, Database = DB>,
231 {
232 executor.fetch_optional(self).await
233 }
234
235 // QueryAs functions wrapp
236
237 /// like sqlx_core::QueryAs::fetch
238 /// Execute the query and return the generated results as a stream.
239 pub fn fetch_as<'e, 'c: 'e, O, E>(self, executor: E) -> BoxStream<'e, Result<O, Error>>
240 where
241 'q: 'e,
242 DB::Arguments<'q>: 'e,
243 E: 'e + Executor<'c, Database = DB>,
244 DB: 'e,
245 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
246 {
247 self.fetch_many_as(executor)
248 .try_filter_map(|step| async move { Ok(step.right()) })
249 .boxed()
250 }
251 /// like sqlx_core::QueryAs::fetch_many
252 /// Execute multiple queries and return the generated results as a stream
253 /// from each query, in a stream.
254 pub fn fetch_many_as<'e, 'c: 'e, O, E>(
255 self,
256 executor: E,
257 ) -> BoxStream<'e, Result<Either<DB::QueryResult, O>, Error>>
258 where
259 'q: 'e,
260 DB::Arguments<'q>: 'e,
261 E: 'e + Executor<'c, Database = DB>,
262 DB: 'e,
263 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
264 {
265 #[allow(deprecated)]
266 executor
267 .fetch_many(self)
268 .map(|v| match v {
269 Ok(Either::Right(row)) => O::from_row(&row).map(Either::Right),
270 Ok(Either::Left(v)) => Ok(Either::Left(v)),
271 Err(e) => Err(e),
272 })
273 .boxed()
274 }
275 /// like sqlx_core::QueryAs::fetch_all
276 /// Execute the query and return all the resulting rows collected into a [`Vec`].
277 ///
278 /// ### Note: beware result set size.
279 /// This will attempt to collect the full result set of the query into memory.
280 ///
281 /// To avoid exhausting available memory, ensure the result set has a known upper bound,
282 /// e.g. using `LIMIT`.
283 #[inline]
284 pub async fn fetch_all_as<'e, 'c: 'e, O, E>(self, executor: E) -> Result<Vec<O>, Error>
285 where
286 'q: 'e,
287 DB::Arguments<'q>: 'e,
288 E: 'e + Executor<'c, Database = DB>,
289 DB: 'e,
290 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
291 {
292 self.fetch_as(executor).try_collect().await
293 }
294 /// like sqlx_core::QueryAs::fetch_one
295 /// Execute the query, returning the first row or [`Error::RowNotFound`] otherwise.
296 ///
297 /// ### Note: for best performance, ensure the query returns at most one row.
298 /// Depending on the driver implementation, if your query can return more than one row,
299 /// it may lead to wasted CPU time and bandwidth on the database server.
300 ///
301 /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
302 /// can result in a more optimal query plan.
303 ///
304 /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
305 ///
306 /// Otherwise, you might want to add `LIMIT 1` to your query.
307 pub async fn fetch_one_as<'e, 'c: 'e, O, E>(self, executor: E) -> Result<O, Error>
308 where
309 'q: 'e,
310 DB::Arguments<'q>: 'e,
311 E: 'e + Executor<'c, Database = DB>,
312 DB: 'e,
313 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
314 {
315 self.fetch_optional_as(executor)
316 .await
317 .and_then(|row| row.ok_or(sqlx_core::Error::RowNotFound))
318 }
319 /// like sqlx_core_core::QueryAs::fetch_optional
320 /// Execute the query, returning the first row or `None` otherwise.
321 ///
322 /// ### Note: for best performance, ensure the query returns at most one row.
323 /// Depending on the driver implementation, if your query can return more than one row,
324 /// it may lead to wasted CPU time and bandwidth on the database server.
325 ///
326 /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
327 /// can result in a more optimal query plan.
328 ///
329 /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
330 ///
331 /// Otherwise, you might want to add `LIMIT 1` to your query.
332 pub async fn fetch_optional_as<'e, 'c: 'e, O, E>(self, executor: E) -> Result<Option<O>, Error>
333 where
334 'q: 'e,
335 DB::Arguments<'q>: 'e,
336 E: 'e + Executor<'c, Database = DB>,
337 DB: 'e,
338 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
339 {
340 let row = executor.fetch_optional(self).await?;
341 if let Some(row) = row {
342 O::from_row(&row).map(Some)
343 } else {
344 Ok(None)
345 }
346 }
347}
348
349impl<'q, DB: Database> Execute<'q, DB> for SqlTemplateExecute<'q, DB> {
350 /// Returns the SQL query string
351 #[inline]
352 fn sql(&self) -> &'q str {
353 log::debug!("Executing SQL: {}", self.sql);
354 self.sql
355 }
356
357 /// Gets prepared statement (not supported in this implementation)
358 #[inline]
359 fn statement(&self) -> Option<&DB::Statement<'q>> {
360 None
361 }
362
363 /// Takes ownership of the bound arguments
364 #[inline]
365 fn take_arguments(
366 &mut self,
367 ) -> Result<Option<DB::Arguments<'q>>, sqlx_core::error::BoxDynError> {
368 Ok(self.arguments.take())
369 }
370
371 /// Checks if query is persistent
372 #[inline]
373 fn persistent(&self) -> bool {
374 self.persistent
375 }
376}