sqlx_askama_template/v3/sql_templte_execute.rs
1use futures_core::stream::BoxStream;
2use futures_util::{StreamExt, TryStreamExt};
3use sqlx_core::{
4 Either, Error,
5 arguments::IntoArguments,
6 database::{Database, HasStatementCache},
7 executor::{Execute, Executor},
8 from_row::FromRow,
9 query::{Map, Query, query, query_with},
10 query_as::{QueryAs, query_as, query_as_with},
11};
12/// Internal executor for SQL templates
13pub struct SqlTemplateExecute<'q, DB: Database> {
14 /// Reference to SQL query string
15 pub(crate) sql: &'q String,
16 /// SQL parameters
17 pub(crate) arguments: Option<DB::Arguments<'q>>,
18 /// Persistent flag
19 pub(crate) persistent: bool,
20}
21impl<'q, DB: Database> Clone for SqlTemplateExecute<'q, DB>
22where
23 DB::Arguments<'q>: Clone,
24{
25 fn clone(&self) -> Self {
26 SqlTemplateExecute {
27 sql: self.sql,
28 arguments: self.arguments.clone(),
29 persistent: self.persistent,
30 }
31 }
32}
33impl<'q, DB: Database> SqlTemplateExecute<'q, DB> {
34 /// Creates a new SQL template executor
35 pub fn new(sql: &'q String, arguments: Option<DB::Arguments<'q>>) -> Self {
36 SqlTemplateExecute {
37 sql,
38 arguments,
39 persistent: true,
40 }
41 }
42 /// If `true`, the statement will get prepared once and cached to the
43 /// connection's statement cache.
44 ///
45 /// If queried once with the flag set to `true`, all subsequent queries
46 /// matching the one with the flag will use the cached statement until the
47 /// cache is cleared.
48 ///
49 /// If `false`, the prepared statement will be closed after execution.
50 ///
51 /// Default: `true`.
52 pub fn set_persistent(mut self, persistent: bool) -> Self {
53 self.persistent = persistent;
54 self
55 }
56}
57impl<'q, DB> SqlTemplateExecute<'q, DB>
58where
59 DB: Database + HasStatementCache,
60 DB::Arguments<'q>: IntoArguments<'q, DB>,
61{
62 /// to sqlx_core::QueryAs
63 /// Converts the SQL template to a `QueryAs` object, which can be executed to fetch rows
64 #[inline]
65 pub fn to_query_as<O>(self) -> QueryAs<'q, DB, O, DB::Arguments<'q>>
66 where
67 O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
68 {
69 let q = match self.arguments {
70 Some(args) => query_as_with(self.sql, args),
71 None => query_as(self.sql),
72 };
73 q.persistent(self.persistent)
74 }
75 /// to sqlx_core::Query
76 /// Converts the SQL template to a `Query` object, which can be executed to fetch rows
77 #[inline]
78 pub fn to_query(self) -> Query<'q, DB, DB::Arguments<'q>> {
79 let q = match self.arguments {
80 Some(args) => {
81 // let wrap = ArgWrapper(args);
82 query_with(self.sql, args)
83 }
84 None => query(self.sql),
85 };
86 q.persistent(self.persistent)
87 }
88 /// like sqlx_core::Query::map
89 /// Map each row in the result to another type.
90 #[inline]
91 pub fn map<F, O>(
92 self,
93 f: F,
94 ) -> Map<'q, DB, impl FnMut(DB::Row) -> Result<O, sqlx_core::Error> + Send, DB::Arguments<'q>>
95 where
96 F: FnMut(DB::Row) -> O + Send,
97 O: Unpin,
98 {
99 self.to_query().map(f)
100 }
101
102 /// like sqlx_core::Query::try_map
103 /// Map each row in the result to another type, returning an error if the mapping fails.
104 #[inline]
105 pub fn try_map<F, O>(self, f: F) -> Map<'q, DB, F, DB::Arguments<'q>>
106 where
107 F: FnMut(DB::Row) -> Result<O, sqlx_core::Error> + Send,
108 O: Unpin,
109 {
110 self.to_query().try_map(f)
111 }
112}
113impl<'q, DB> SqlTemplateExecute<'q, DB>
114where
115 DB: Database,
116{
117 /// like sqlx_core::Query::execute
118 /// Execute the query and return the number of rows affected.
119 #[inline]
120 pub async fn execute<'e, 'c: 'e, E>(self, executor: E) -> Result<DB::QueryResult, Error>
121 where
122 'q: 'e,
123 E: Executor<'c, Database = DB>,
124 {
125 executor.execute(self).await
126 }
127 /// like sqlx_core::Query::execute_many
128 /// Execute multiple queries and return the rows affected from each query, in a stream.
129 #[inline]
130 pub fn execute_many<'e, 'c: 'e, E>(
131 self,
132 executor: E,
133 ) -> BoxStream<'e, Result<DB::QueryResult, Error>>
134 where
135 'q: 'e,
136 E: Executor<'c, Database = DB>,
137 {
138 #[allow(deprecated)]
139 executor.execute_many(self)
140 }
141 /// like sqlx_core::Query::fetch
142 /// Execute the query and return the generated results as a stream.
143 #[inline]
144 pub fn fetch<'e, 'c: 'e, E>(self, executor: E) -> BoxStream<'e, Result<DB::Row, Error>>
145 where
146 'q: 'e,
147 E: Executor<'c, Database = DB>,
148 {
149 executor.fetch(self)
150 }
151 /// like sqlx_core::Query::fetch_many
152 /// Execute multiple queries and return the generated results as a stream.
153 ///
154 /// For each query in the stream, any generated rows are returned first,
155 /// then the `QueryResult` with the number of rows affected.
156 #[inline]
157 #[allow(clippy::type_complexity)]
158 pub fn fetch_many<'e, 'c: 'e, E>(
159 self,
160 executor: E,
161 ) -> BoxStream<'e, Result<Either<DB::QueryResult, DB::Row>, Error>>
162 where
163 'q: 'e,
164 E: Executor<'c, Database = DB>,
165 {
166 #[allow(deprecated)]
167 executor.fetch_many(self)
168 }
169 /// like sqlx_core::Query::fetch_all
170 /// Execute the query and return all the resulting rows collected into a [`Vec`].
171 ///
172 /// ### Note: beware result set size.
173 /// This will attempt to collect the full result set of the query into memory.
174 ///
175 /// To avoid exhausting available memory, ensure the result set has a known upper bound,
176 /// e.g. using `LIMIT`.
177 #[inline]
178 pub async fn fetch_all<'e, 'c: 'e, E>(self, executor: E) -> Result<Vec<DB::Row>, Error>
179 where
180 'q: 'e,
181 E: Executor<'c, Database = DB>,
182 {
183 executor.fetch_all(self).await
184 }
185 /// like sqlx_core::Query::fetch_one
186 /// Execute the query, returning the first row or [`Error::RowNotFound`] otherwise.
187 ///
188 /// ### Note: for best performance, ensure the query returns at most one row.
189 /// Depending on the driver implementation, if your query can return more than one row,
190 /// it may lead to wasted CPU time and bandwidth on the database server.
191 ///
192 /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
193 /// can result in a more optimal query plan.
194 ///
195 /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
196 ///
197 /// Otherwise, you might want to add `LIMIT 1` to your query.
198 #[inline]
199 pub async fn fetch_one<'e, 'c: 'e, E>(self, executor: E) -> Result<DB::Row, Error>
200 where
201 'q: 'e,
202 E: Executor<'c, Database = DB>,
203 {
204 executor.fetch_one(self).await
205 }
206 /// like sqlx_core::Query::fetch_optional
207 /// Execute the query, returning the first row or `None` otherwise.
208 ///
209 /// ### Note: for best performance, ensure the query returns at most one row.
210 /// Depending on the driver implementation, if your query can return more than one row,
211 /// it may lead to wasted CPU time and bandwidth on the database server.
212 ///
213 /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
214 /// can result in a more optimal query plan.
215 ///
216 /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
217 ///
218 /// Otherwise, you might want to add `LIMIT 1` to your query.
219 #[inline]
220 pub async fn fetch_optional<'e, 'c: 'e, E>(self, executor: E) -> Result<Option<DB::Row>, Error>
221 where
222 'q: 'e,
223 E: Executor<'c, Database = DB>,
224 {
225 executor.fetch_optional(self).await
226 }
227
228 // QueryAs functions wrapp
229
230 /// like sqlx_core::QueryAs::fetch
231 /// Execute the query and return the generated results as a stream.
232 pub fn fetch_as<'e, 'c: 'e, O, E>(self, executor: E) -> BoxStream<'e, Result<O, Error>>
233 where
234 'q: 'e,
235 E: 'e + Executor<'c, Database = DB>,
236 DB: 'e,
237 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
238 {
239 self.fetch_many_as(executor)
240 .try_filter_map(|step| async move { Ok(step.right()) })
241 .boxed()
242 }
243 /// like sqlx_core::QueryAs::fetch_many
244 /// Execute multiple queries and return the generated results as a stream
245 /// from each query, in a stream.
246 pub fn fetch_many_as<'e, 'c: 'e, O, E>(
247 self,
248 executor: E,
249 ) -> BoxStream<'e, Result<Either<DB::QueryResult, O>, Error>>
250 where
251 'q: 'e,
252 E: 'e + Executor<'c, Database = DB>,
253 DB: 'e,
254 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
255 {
256 #[allow(deprecated)]
257 executor
258 .fetch_many(self)
259 .map(|v| match v {
260 Ok(Either::Right(row)) => O::from_row(&row).map(Either::Right),
261 Ok(Either::Left(v)) => Ok(Either::Left(v)),
262 Err(e) => Err(e),
263 })
264 .boxed()
265 }
266 /// like sqlx_core::QueryAs::fetch_all
267 /// Execute the query and return all the resulting rows collected into a [`Vec`].
268 ///
269 /// ### Note: beware result set size.
270 /// This will attempt to collect the full result set of the query into memory.
271 ///
272 /// To avoid exhausting available memory, ensure the result set has a known upper bound,
273 /// e.g. using `LIMIT`.
274 #[inline]
275 pub async fn fetch_all_as<'e, 'c: 'e, O, E>(self, executor: E) -> Result<Vec<O>, Error>
276 where
277 'q: 'e,
278 E: 'e + Executor<'c, Database = DB>,
279 DB: 'e,
280 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
281 {
282 self.fetch_as(executor).try_collect().await
283 }
284 /// like sqlx_core::QueryAs::fetch_one
285 /// Execute the query, returning the first row or [`Error::RowNotFound`] otherwise.
286 ///
287 /// ### Note: for best performance, ensure the query returns at most one row.
288 /// Depending on the driver implementation, if your query can return more than one row,
289 /// it may lead to wasted CPU time and bandwidth on the database server.
290 ///
291 /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
292 /// can result in a more optimal query plan.
293 ///
294 /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
295 ///
296 /// Otherwise, you might want to add `LIMIT 1` to your query.
297 pub async fn fetch_one_as<'e, 'c: 'e, O, E>(self, executor: E) -> Result<O, Error>
298 where
299 'q: 'e,
300 E: 'e + Executor<'c, Database = DB>,
301 DB: 'e,
302 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
303 {
304 self.fetch_optional_as(executor)
305 .await
306 .and_then(|row| row.ok_or(sqlx_core::Error::RowNotFound))
307 }
308 /// like sqlx_core_core::QueryAs::fetch_optional
309 /// Execute the query, returning the first row or `None` otherwise.
310 ///
311 /// ### Note: for best performance, ensure the query returns at most one row.
312 /// Depending on the driver implementation, if your query can return more than one row,
313 /// it may lead to wasted CPU time and bandwidth on the database server.
314 ///
315 /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
316 /// can result in a more optimal query plan.
317 ///
318 /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
319 ///
320 /// Otherwise, you might want to add `LIMIT 1` to your query.
321 pub async fn fetch_optional_as<'e, 'c: 'e, O, E>(self, executor: E) -> Result<Option<O>, Error>
322 where
323 'q: 'e,
324 E: 'e + Executor<'c, Database = DB>,
325 DB: 'e,
326 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
327 {
328 let row = executor.fetch_optional(self).await?;
329 if let Some(row) = row {
330 O::from_row(&row).map(Some)
331 } else {
332 Ok(None)
333 }
334 }
335}
336
337impl<'q, DB: Database> Execute<'q, DB> for SqlTemplateExecute<'q, DB> {
338 /// Returns the SQL query string
339 #[inline]
340 fn sql(&self) -> &'q str {
341 self.sql
342 }
343
344 /// Gets prepared statement (not supported in this implementation)
345 #[inline]
346 fn statement(&self) -> Option<&DB::Statement<'q>> {
347 None
348 }
349
350 /// Takes ownership of the bound arguments
351 #[inline]
352 fn take_arguments(
353 &mut self,
354 ) -> Result<Option<DB::Arguments<'q>>, sqlx_core::error::BoxDynError> {
355 Ok(self.arguments.take())
356 }
357
358 /// Checks if query is persistent
359 #[inline]
360 fn persistent(&self) -> bool {
361 self.persistent
362 }
363}