sqlx_askama_template/sql_template_execute.rs
1use futures_core::stream::BoxStream;
2use futures_util::{StreamExt, TryStreamExt};
3use sqlx_core::{
4 Either, Error,
5 arguments::IntoArguments,
6 database::{Database, HasStatementCache},
7 executor::{Execute, Executor},
8 from_row::FromRow,
9 query::{Map, Query, query, query_with},
10 query_as::{QueryAs, query_as, query_as_with},
11 sql_str::{AssertSqlSafe, SqlSafeStr, SqlStr},
12};
13/// Internal executor for SQL templates
14pub struct SqlTemplateExecute<DB: Database> {
15 /// Reference to SQL query string
16 pub(crate) sql: String,
17 /// SQL parameters
18 pub(crate) arguments: Option<DB::Arguments>,
19 /// Persistent flag
20 pub(crate) persistent: bool,
21}
22impl<DB: Database> Clone for SqlTemplateExecute<DB>
23where
24 DB::Arguments: Clone,
25{
26 fn clone(&self) -> Self {
27 SqlTemplateExecute {
28 sql: self.sql.clone(),
29 arguments: self.arguments.clone(),
30 persistent: self.persistent,
31 }
32 }
33}
34impl<DB: Database> SqlTemplateExecute<DB> {
35 /// Creates a new SQL template executor
36 pub fn new(sql: String, arguments: Option<DB::Arguments>) -> Self {
37 SqlTemplateExecute {
38 sql,
39 arguments,
40 persistent: true,
41 }
42 }
43 /// If `true`, the statement will get prepared once and cached to the
44 /// connection's statement cache.
45 ///
46 /// If queried once with the flag set to `true`, all subsequent queries
47 /// matching the one with the flag will use the cached statement until the
48 /// cache is cleared.
49 ///
50 /// If `false`, the prepared statement will be closed after execution.
51 ///
52 /// Default: `true`.
53 pub fn set_persistent(mut self, persistent: bool) -> Self {
54 self.persistent = persistent;
55 self
56 }
57}
58impl<'q, DB> SqlTemplateExecute<DB>
59where
60 DB: Database + HasStatementCache,
61 DB::Arguments: IntoArguments<DB>,
62{
63 /// Converts the SQL template to a `sqlx_core::QueryAs` object, which can be executed to fetch rows.
64 #[inline]
65 pub fn to_query_as<O>(self) -> QueryAs<'q, DB, O, DB::Arguments>
66 where
67 O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
68 {
69 let q = match self.arguments {
70 Some(args) => query_as_with(AssertSqlSafe(self.sql), args),
71 None => query_as(AssertSqlSafe(self.sql)),
72 };
73 q.persistent(self.persistent)
74 }
75 /// Converts the SQL template to a `sqlx_core::Query` object, which can be executed to fetch rows.
76 #[inline]
77 pub fn to_query(self) -> Query<'q, DB, DB::Arguments> {
78 let q = match self.arguments {
79 Some(args) => {
80 // let wrap = ArgWrapper(args);
81 query_with(AssertSqlSafe(self.sql), args)
82 }
83 None => query(AssertSqlSafe(self.sql)),
84 };
85 q.persistent(self.persistent)
86 }
87 /// like sqlx_core::Query::map
88 /// Map each row in the result to another type.
89 #[inline]
90 pub fn map<F, O>(
91 self,
92 f: F,
93 ) -> Map<'q, DB, impl FnMut(DB::Row) -> Result<O, sqlx_core::Error> + Send, DB::Arguments>
94 where
95 F: FnMut(DB::Row) -> O + Send,
96 O: Unpin,
97 {
98 self.to_query().map(f)
99 }
100
101 /// like sqlx_core::Query::try_map
102 /// Map each row in the result to another type, returning an error if the mapping fails.
103 #[inline]
104 pub fn try_map<F, O>(self, f: F) -> Map<'q, DB, F, DB::Arguments>
105 where
106 F: FnMut(DB::Row) -> Result<O, sqlx_core::Error> + Send,
107 O: Unpin,
108 {
109 self.to_query().try_map(f)
110 }
111}
112impl<'c, 'e, DB> SqlTemplateExecute<DB>
113where
114 DB: Database,
115 'c: 'e,
116{
117 /// like sqlx_core::Query::execute
118 /// Execute the query and return the number of rows affected.
119 #[inline]
120 pub async fn execute<E>(self, executor: E) -> Result<DB::QueryResult, Error>
121 where
122 E: Executor<'c, Database = DB>,
123 {
124 executor.execute(self).await
125 }
126 /// like sqlx_core::Query::execute_many
127 /// Execute multiple queries and return the rows affected from each query, in a stream.
128 #[inline]
129 pub fn execute_many<E>(self, executor: E) -> BoxStream<'e, Result<DB::QueryResult, Error>>
130 where
131 E: Executor<'c, Database = DB>,
132 {
133 #[allow(deprecated)]
134 executor.execute_many(self)
135 }
136 /// like sqlx_core::Query::fetch
137 /// Execute the query and return the generated results as a stream.
138 #[inline]
139 pub fn fetch<E>(self, executor: E) -> BoxStream<'e, Result<DB::Row, Error>>
140 where
141 E: Executor<'c, Database = DB>,
142 {
143 executor.fetch(self)
144 }
145 /// like sqlx_core::Query::fetch_many
146 /// Execute multiple queries and return the generated results as a stream.
147 ///
148 /// For each query in the stream, any generated rows are returned first,
149 /// then the `QueryResult` with the number of rows affected.
150 #[inline]
151 #[allow(clippy::type_complexity)]
152 pub fn fetch_many<E>(
153 self,
154 executor: E,
155 ) -> BoxStream<'e, Result<Either<DB::QueryResult, DB::Row>, Error>>
156 where
157 E: Executor<'c, Database = DB>,
158 {
159 #[allow(deprecated)]
160 executor.fetch_many(self)
161 }
162 /// like sqlx_core::Query::fetch_all
163 /// Execute the query and return all the resulting rows collected into a [`Vec`].
164 ///
165 /// ### Note: beware result set size.
166 /// This will attempt to collect the full result set of the query into memory.
167 ///
168 /// To avoid exhausting available memory, ensure the result set has a known upper bound,
169 /// e.g. using `LIMIT`.
170 #[inline]
171 pub async fn fetch_all<E>(self, executor: E) -> Result<Vec<DB::Row>, Error>
172 where
173 E: Executor<'c, Database = DB>,
174 {
175 executor.fetch_all(self).await
176 }
177 /// like sqlx_core::Query::fetch_one
178 /// Execute the query, returning the first row or [`Error::RowNotFound`] otherwise.
179 ///
180 /// ### Note: for best performance, ensure the query returns at most one row.
181 /// Depending on the driver implementation, if your query can return more than one row,
182 /// it may lead to wasted CPU time and bandwidth on the database server.
183 ///
184 /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
185 /// can result in a more optimal query plan.
186 ///
187 /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
188 ///
189 /// Otherwise, you might want to add `LIMIT 1` to your query.
190 #[inline]
191 pub async fn fetch_one<E>(self, executor: E) -> Result<DB::Row, Error>
192 where
193 E: Executor<'c, Database = DB>,
194 {
195 executor.fetch_one(self).await
196 }
197 /// like sqlx_core::Query::fetch_optional
198 /// Execute the query, returning the first row or `None` otherwise.
199 ///
200 /// ### Note: for best performance, ensure the query returns at most one row.
201 /// Depending on the driver implementation, if your query can return more than one row,
202 /// it may lead to wasted CPU time and bandwidth on the database server.
203 ///
204 /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
205 /// can result in a more optimal query plan.
206 ///
207 /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
208 ///
209 /// Otherwise, you might want to add `LIMIT 1` to your query.
210 #[inline]
211 pub async fn fetch_optional<E>(self, executor: E) -> Result<Option<DB::Row>, Error>
212 where
213 E: Executor<'c, Database = DB>,
214 {
215 executor.fetch_optional(self).await
216 }
217
218 // QueryAs functions wrap
219
220 /// like sqlx_core::QueryAs::fetch
221 /// Execute the query and return the generated results as a stream.
222 pub fn fetch_as<O, E>(self, executor: E) -> BoxStream<'e, Result<O, Error>>
223 where
224 E: 'e + Executor<'c, Database = DB>,
225 DB: 'e,
226 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
227 {
228 self.fetch_many_as(executor)
229 .try_filter_map(|step| async move { Ok(step.right()) })
230 .boxed()
231 }
232 /// like sqlx_core::QueryAs::fetch_many
233 /// Execute multiple queries and return the generated results as a stream
234 /// from each query, in a stream.
235 pub fn fetch_many_as<O, E>(
236 self,
237 executor: E,
238 ) -> BoxStream<'e, Result<Either<DB::QueryResult, O>, Error>>
239 where
240 E: 'e + Executor<'c, Database = DB>,
241 DB: 'e,
242 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
243 {
244 #[allow(deprecated)]
245 executor
246 .fetch_many(self)
247 .map(|v| match v {
248 Ok(Either::Right(row)) => O::from_row(&row).map(Either::Right),
249 Ok(Either::Left(v)) => Ok(Either::Left(v)),
250 Err(e) => Err(e),
251 })
252 .boxed()
253 }
254 /// like sqlx_core::QueryAs::fetch_all
255 /// Execute the query and return all the resulting rows collected into a [`Vec`].
256 ///
257 /// ### Note: beware result set size.
258 /// This will attempt to collect the full result set of the query into memory.
259 ///
260 /// To avoid exhausting available memory, ensure the result set has a known upper bound,
261 /// e.g. using `LIMIT`.
262 #[inline]
263 pub async fn fetch_all_as<O, E>(self, executor: E) -> Result<Vec<O>, Error>
264 where
265 E: 'e + Executor<'c, Database = DB>,
266 DB: 'e,
267 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
268 {
269 self.fetch_as(executor).try_collect().await
270 }
271 /// like sqlx_core::QueryAs::fetch_one
272 /// Execute the query, returning the first row or [`Error::RowNotFound`] otherwise.
273 ///
274 /// ### Note: for best performance, ensure the query returns at most one row.
275 /// Depending on the driver implementation, if your query can return more than one row,
276 /// it may lead to wasted CPU time and bandwidth on the database server.
277 ///
278 /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
279 /// can result in a more optimal query plan.
280 ///
281 /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
282 ///
283 /// Otherwise, you might want to add `LIMIT 1` to your query.
284 pub async fn fetch_one_as<O, E>(self, executor: E) -> Result<O, Error>
285 where
286 E: 'e + Executor<'c, Database = DB>,
287 DB: 'e,
288 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
289 {
290 self.fetch_optional_as(executor)
291 .await
292 .and_then(|row| row.ok_or(sqlx_core::Error::RowNotFound))
293 }
294 /// like sqlx_core::QueryAs::fetch_optional
295 /// Execute the query, returning the first row or `None` otherwise.
296 ///
297 /// ### Note: for best performance, ensure the query returns at most one row.
298 /// Depending on the driver implementation, if your query can return more than one row,
299 /// it may lead to wasted CPU time and bandwidth on the database server.
300 ///
301 /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
302 /// can result in a more optimal query plan.
303 ///
304 /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
305 ///
306 /// Otherwise, you might want to add `LIMIT 1` to your query.
307 pub async fn fetch_optional_as<O, E>(self, executor: E) -> Result<Option<O>, Error>
308 where
309 E: 'e + Executor<'c, Database = DB>,
310 DB: 'e,
311 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
312 {
313 let row = executor.fetch_optional(self).await?;
314 if let Some(row) = row {
315 O::from_row(&row).map(Some)
316 } else {
317 Ok(None)
318 }
319 }
320}
321
322impl<'q, DB: Database> Execute<'q, DB> for SqlTemplateExecute<DB> {
323 /// Returns the SQL query string
324 #[inline]
325 fn sql(self) -> SqlStr {
326 tracing::debug!("Executing SQL: {}", self.sql);
327 AssertSqlSafe(self.sql).into_sql_str()
328 }
329
330 /// Gets prepared statement (not supported in this implementation)
331 #[inline]
332 fn statement(&self) -> Option<&DB::Statement> {
333 None
334 }
335
336 /// Takes ownership of the bound arguments
337 #[inline]
338 fn take_arguments(&mut self) -> Result<Option<DB::Arguments>, sqlx_core::error::BoxDynError> {
339 Ok(self.arguments.take())
340 }
341
342 /// Checks if query is persistent
343 #[inline]
344 fn persistent(&self) -> bool {
345 self.persistent
346 }
347}