sqlx_askama_template/sql_template_execute.rs
1use futures_core::stream::BoxStream;
2use futures_util::{StreamExt, TryStreamExt};
3use sqlx_core::{
4 Either, Error,
5 arguments::IntoArguments,
6 database::{Database, HasStatementCache},
7 executor::{Execute, Executor},
8 from_row::FromRow,
9 query::{Map, Query, query, query_with},
10 query_as::{QueryAs, query_as, query_as_with},
11 sql_str::{AssertSqlSafe, SqlSafeStr, SqlStr},
12};
13/// Internal executor for SQL templates
14pub struct SqlTemplateExecute<DB: Database> {
15 /// Reference to SQL query string
16 pub(crate) sql: String,
17 /// SQL parameters
18 pub(crate) arguments: Option<DB::Arguments>,
19 /// Persistent flag
20 pub(crate) persistent: bool,
21}
22impl<DB: Database> Clone for SqlTemplateExecute<DB>
23where
24 DB::Arguments: Clone,
25{
26 fn clone(&self) -> Self {
27 SqlTemplateExecute {
28 sql: self.sql.clone(),
29 arguments: self.arguments.clone(),
30 persistent: self.persistent,
31 }
32 }
33}
34impl<DB: Database> SqlTemplateExecute<DB> {
35 /// Creates a new SQL template executor
36 pub fn new(sql: String, arguments: Option<DB::Arguments>) -> Self {
37 SqlTemplateExecute {
38 sql,
39 arguments,
40 persistent: true,
41 }
42 }
43 /// If `true`, the statement will get prepared once and cached to the
44 /// connection's statement cache.
45 ///
46 /// If queried once with the flag set to `true`, all subsequent queries
47 /// matching the one with the flag will use the cached statement until the
48 /// cache is cleared.
49 ///
50 /// If `false`, the prepared statement will be closed after execution.
51 ///
52 /// Default: `true`.
53 pub fn set_persistent(mut self, persistent: bool) -> Self {
54 self.persistent = persistent;
55 self
56 }
57}
58impl<'q, DB> SqlTemplateExecute<DB>
59where
60 DB: Database + HasStatementCache,
61 DB::Arguments: IntoArguments<DB>,
62{
63 /// to sqlx_core::QueryAs
64 /// Converts the SQL template to a `QueryAs` object, which can be executed to fetch rows
65 #[inline]
66 pub fn to_query_as<O>(self) -> QueryAs<'q, DB, O, DB::Arguments>
67 where
68 O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
69 {
70 let q = match self.arguments {
71 Some(args) => query_as_with(AssertSqlSafe(self.sql), args),
72 None => query_as(AssertSqlSafe(self.sql)),
73 };
74 q.persistent(self.persistent)
75 }
76 /// to sqlx_core::Query
77 /// Converts the SQL template to a `Query` object, which can be executed to fetch rows
78 #[inline]
79 pub fn to_query(self) -> Query<'q, DB, DB::Arguments> {
80 let q = match self.arguments {
81 Some(args) => {
82 // let wrap = ArgWrapper(args);
83 query_with(AssertSqlSafe(self.sql), args)
84 }
85 None => query(AssertSqlSafe(self.sql)),
86 };
87 q.persistent(self.persistent)
88 }
89 /// like sqlx_core::Query::map
90 /// Map each row in the result to another type.
91 #[inline]
92 pub fn map<F, O>(
93 self,
94 f: F,
95 ) -> Map<'q, DB, impl FnMut(DB::Row) -> Result<O, sqlx_core::Error> + Send, DB::Arguments>
96 where
97 F: FnMut(DB::Row) -> O + Send,
98 O: Unpin,
99 {
100 self.to_query().map(f)
101 }
102
103 /// like sqlx_core::Query::try_map
104 /// Map each row in the result to another type, returning an error if the mapping fails.
105 #[inline]
106 pub fn try_map<F, O>(self, f: F) -> Map<'q, DB, F, DB::Arguments>
107 where
108 F: FnMut(DB::Row) -> Result<O, sqlx_core::Error> + Send,
109 O: Unpin,
110 {
111 self.to_query().try_map(f)
112 }
113}
114impl<DB> SqlTemplateExecute<DB>
115where
116 DB: Database,
117{
118 /// like sqlx_core::Query::execute
119 /// Execute the query and return the number of rows affected.
120 #[inline]
121 pub async fn execute<'e, 'c: 'e, E>(self, executor: E) -> Result<DB::QueryResult, Error>
122 where
123 DB::Arguments: 'e,
124 E: Executor<'c, Database = DB>,
125 {
126 executor.execute(self).await
127 }
128 /// like sqlx_core::Query::execute_many
129 /// Execute multiple queries and return the rows affected from each query, in a stream.
130 #[inline]
131 pub fn execute_many<'e, 'c: 'e, E>(
132 self,
133 executor: E,
134 ) -> BoxStream<'e, Result<DB::QueryResult, Error>>
135 where
136 DB::Arguments: 'e,
137 E: Executor<'c, Database = DB>,
138 {
139 #[allow(deprecated)]
140 executor.execute_many(self)
141 }
142 /// like sqlx_core::Query::fetch
143 /// Execute the query and return the generated results as a stream.
144 #[inline]
145 pub fn fetch<'e, 'c: 'e, E>(self, executor: E) -> BoxStream<'e, Result<DB::Row, Error>>
146 where
147 DB::Arguments: 'e,
148 E: Executor<'c, Database = DB>,
149 {
150 executor.fetch(self)
151 }
152 /// like sqlx_core::Query::fetch_many
153 /// Execute multiple queries and return the generated results as a stream.
154 ///
155 /// For each query in the stream, any generated rows are returned first,
156 /// then the `QueryResult` with the number of rows affected.
157 #[inline]
158 #[allow(clippy::type_complexity)]
159 pub fn fetch_many<'e, 'c: 'e, E>(
160 self,
161 executor: E,
162 ) -> BoxStream<'e, Result<Either<DB::QueryResult, DB::Row>, Error>>
163 where
164 DB::Arguments: 'e,
165 E: Executor<'c, Database = DB>,
166 {
167 #[allow(deprecated)]
168 executor.fetch_many(self)
169 }
170 /// like sqlx_core::Query::fetch_all
171 /// Execute the query and return all the resulting rows collected into a [`Vec`].
172 ///
173 /// ### Note: beware result set size.
174 /// This will attempt to collect the full result set of the query into memory.
175 ///
176 /// To avoid exhausting available memory, ensure the result set has a known upper bound,
177 /// e.g. using `LIMIT`.
178 #[inline]
179 pub async fn fetch_all<'e, 'c: 'e, E>(self, executor: E) -> Result<Vec<DB::Row>, Error>
180 where
181 DB::Arguments: 'e,
182 E: Executor<'c, Database = DB>,
183 {
184 executor.fetch_all(self).await
185 }
186 /// like sqlx_core::Query::fetch_one
187 /// Execute the query, returning the first row or [`Error::RowNotFound`] otherwise.
188 ///
189 /// ### Note: for best performance, ensure the query returns at most one row.
190 /// Depending on the driver implementation, if your query can return more than one row,
191 /// it may lead to wasted CPU time and bandwidth on the database server.
192 ///
193 /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
194 /// can result in a more optimal query plan.
195 ///
196 /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
197 ///
198 /// Otherwise, you might want to add `LIMIT 1` to your query.
199 #[inline]
200 pub async fn fetch_one<'e, 'c: 'e, E>(self, executor: E) -> Result<DB::Row, Error>
201 where
202 DB::Arguments: 'e,
203 E: Executor<'c, Database = DB>,
204 {
205 executor.fetch_one(self).await
206 }
207 /// like sqlx_core::Query::fetch_optional
208 /// Execute the query, returning the first row or `None` otherwise.
209 ///
210 /// ### Note: for best performance, ensure the query returns at most one row.
211 /// Depending on the driver implementation, if your query can return more than one row,
212 /// it may lead to wasted CPU time and bandwidth on the database server.
213 ///
214 /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
215 /// can result in a more optimal query plan.
216 ///
217 /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
218 ///
219 /// Otherwise, you might want to add `LIMIT 1` to your query.
220 #[inline]
221 pub async fn fetch_optional<'e, 'c: 'e, E>(self, executor: E) -> Result<Option<DB::Row>, Error>
222 where
223 DB::Arguments: 'e,
224 E: Executor<'c, Database = DB>,
225 {
226 executor.fetch_optional(self).await
227 }
228
229 // QueryAs functions wrapp
230
231 /// like sqlx_core::QueryAs::fetch
232 /// Execute the query and return the generated results as a stream.
233 pub fn fetch_as<'e, 'c: 'e, O, E>(self, executor: E) -> BoxStream<'e, Result<O, Error>>
234 where
235 DB::Arguments: 'e,
236 E: 'e + Executor<'c, Database = DB>,
237 DB: 'e,
238 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
239 {
240 self.fetch_many_as(executor)
241 .try_filter_map(|step| async move { Ok(step.right()) })
242 .boxed()
243 }
244 /// like sqlx_core::QueryAs::fetch_many
245 /// Execute multiple queries and return the generated results as a stream
246 /// from each query, in a stream.
247 pub fn fetch_many_as<'e, 'c: 'e, O, E>(
248 self,
249 executor: E,
250 ) -> BoxStream<'e, Result<Either<DB::QueryResult, O>, Error>>
251 where
252 DB::Arguments: 'e,
253 E: 'e + Executor<'c, Database = DB>,
254 DB: 'e,
255 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
256 {
257 #[allow(deprecated)]
258 executor
259 .fetch_many(self)
260 .map(|v| match v {
261 Ok(Either::Right(row)) => O::from_row(&row).map(Either::Right),
262 Ok(Either::Left(v)) => Ok(Either::Left(v)),
263 Err(e) => Err(e),
264 })
265 .boxed()
266 }
267 /// like sqlx_core::QueryAs::fetch_all
268 /// Execute the query and return all the resulting rows collected into a [`Vec`].
269 ///
270 /// ### Note: beware result set size.
271 /// This will attempt to collect the full result set of the query into memory.
272 ///
273 /// To avoid exhausting available memory, ensure the result set has a known upper bound,
274 /// e.g. using `LIMIT`.
275 #[inline]
276 pub async fn fetch_all_as<'e, 'c: 'e, O, E>(self, executor: E) -> Result<Vec<O>, Error>
277 where
278 DB::Arguments: 'e,
279 E: 'e + Executor<'c, Database = DB>,
280 DB: 'e,
281 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
282 {
283 self.fetch_as(executor).try_collect().await
284 }
285 /// like sqlx_core::QueryAs::fetch_one
286 /// Execute the query, returning the first row or [`Error::RowNotFound`] otherwise.
287 ///
288 /// ### Note: for best performance, ensure the query returns at most one row.
289 /// Depending on the driver implementation, if your query can return more than one row,
290 /// it may lead to wasted CPU time and bandwidth on the database server.
291 ///
292 /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
293 /// can result in a more optimal query plan.
294 ///
295 /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
296 ///
297 /// Otherwise, you might want to add `LIMIT 1` to your query.
298 pub async fn fetch_one_as<'e, 'c: 'e, O, E>(self, executor: E) -> Result<O, Error>
299 where
300 DB::Arguments: 'e,
301 E: 'e + Executor<'c, Database = DB>,
302 DB: 'e,
303 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
304 {
305 self.fetch_optional_as(executor)
306 .await
307 .and_then(|row| row.ok_or(sqlx_core::Error::RowNotFound))
308 }
309 /// like sqlx_core_core::QueryAs::fetch_optional
310 /// Execute the query, returning the first row or `None` otherwise.
311 ///
312 /// ### Note: for best performance, ensure the query returns at most one row.
313 /// Depending on the driver implementation, if your query can return more than one row,
314 /// it may lead to wasted CPU time and bandwidth on the database server.
315 ///
316 /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
317 /// can result in a more optimal query plan.
318 ///
319 /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
320 ///
321 /// Otherwise, you might want to add `LIMIT 1` to your query.
322 pub async fn fetch_optional_as<'e, 'c: 'e, O, E>(self, executor: E) -> Result<Option<O>, Error>
323 where
324 DB::Arguments: 'e,
325 E: 'e + Executor<'c, Database = DB>,
326 DB: 'e,
327 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
328 {
329 let row = executor.fetch_optional(self).await?;
330 if let Some(row) = row {
331 O::from_row(&row).map(Some)
332 } else {
333 Ok(None)
334 }
335 }
336}
337
338impl<'q, DB: Database> Execute<'q, DB> for SqlTemplateExecute<DB> {
339 /// Returns the SQL query string
340 #[inline]
341 fn sql(self) -> SqlStr {
342 tracing::debug!("Executing SQL: {}", self.sql);
343 AssertSqlSafe(self.sql).into_sql_str()
344 }
345
346 /// Gets prepared statement (not supported in this implementation)
347 #[inline]
348 fn statement(&self) -> Option<&DB::Statement> {
349 None
350 }
351
352 /// Takes ownership of the bound arguments
353 #[inline]
354 fn take_arguments(&mut self) -> Result<Option<DB::Arguments>, sqlx_core::error::BoxDynError> {
355 Ok(self.arguments.take())
356 }
357
358 /// Checks if query is persistent
359 #[inline]
360 fn persistent(&self) -> bool {
361 self.persistent
362 }
363}