sqlx_askama_template/v3/
sql_templte_execute.rs

1use futures_core::stream::BoxStream;
2use futures_util::{StreamExt, TryStreamExt};
3use sqlx_core::{
4    Either, Error,
5    arguments::IntoArguments,
6    database::{Database, HasStatementCache},
7    executor::{Execute, Executor},
8    from_row::FromRow,
9    query::{Map, Query, query, query_with},
10    query_as::{QueryAs, query_as, query_as_with},
11};
12/// Internal executor for SQL templates
13pub struct SqlTemplateExecute<'q, DB: Database> {
14    /// Reference to SQL query string
15    pub(crate) sql: &'q String,
16    /// SQL parameters
17    pub(crate) arguments: Option<DB::Arguments<'q>>,
18    /// Persistent flag
19    pub(crate) persistent: bool,
20}
21impl<'q, DB: Database> Clone for SqlTemplateExecute<'q, DB>
22where
23    DB::Arguments<'q>: Clone,
24{
25    fn clone(&self) -> Self {
26        SqlTemplateExecute {
27            sql: self.sql,
28            arguments: self.arguments.clone(),
29            persistent: self.persistent,
30        }
31    }
32}
33impl<'q, DB: Database> SqlTemplateExecute<'q, DB> {
34    /// Creates a new SQL template executor
35    pub fn new(sql: &'q String, arguments: Option<DB::Arguments<'q>>) -> Self {
36        SqlTemplateExecute {
37            sql,
38            arguments,
39            persistent: true,
40        }
41    }
42    /// If `true`, the statement will get prepared once and cached to the
43    /// connection's statement cache.
44    ///
45    /// If queried once with the flag set to `true`, all subsequent queries
46    /// matching the one with the flag will use the cached statement until the
47    /// cache is cleared.
48    ///
49    /// If `false`, the prepared statement will be closed after execution.
50    ///
51    /// Default: `true`.
52    pub fn set_persistent(mut self, persistent: bool) -> Self {
53        self.persistent = persistent;
54        self
55    }
56}
57impl<'q, DB> SqlTemplateExecute<'q, DB>
58where
59    DB: Database + HasStatementCache,
60    DB::Arguments<'q>: IntoArguments<'q, DB>,
61{
62    /// to sqlx_core::QueryAs
63    /// Converts the SQL template to a `QueryAs` object, which can be executed to fetch rows
64    #[inline]
65    pub fn to_query_as<O>(self) -> QueryAs<'q, DB, O, DB::Arguments<'q>>
66    where
67        O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
68    {
69        let q = match self.arguments {
70            Some(args) => query_as_with(self.sql, args),
71            None => query_as(self.sql),
72        };
73        q.persistent(self.persistent)
74    }
75    /// to sqlx_core::Query
76    /// Converts the SQL template to a `Query` object, which can be executed to fetch rows
77    #[inline]
78    pub fn to_query(self) -> Query<'q, DB, DB::Arguments<'q>> {
79        let q = match self.arguments {
80            Some(args) => {
81                //   let wrap = ArgWrapper(args);
82                query_with(self.sql, args)
83            }
84            None => query(self.sql),
85        };
86        q.persistent(self.persistent)
87    }
88    /// like sqlx_core::Query::map
89    /// Map each row in the result to another type.
90    #[inline]
91    pub fn map<F, O>(
92        self,
93        f: F,
94    ) -> Map<'q, DB, impl FnMut(DB::Row) -> Result<O, sqlx_core::Error> + Send, DB::Arguments<'q>>
95    where
96        F: FnMut(DB::Row) -> O + Send,
97        O: Unpin,
98    {
99        self.to_query().map(f)
100    }
101
102    /// like sqlx_core::Query::try_map
103    /// Map each row in the result to another type, returning an error if the mapping fails.
104    #[inline]
105    pub fn try_map<F, O>(self, f: F) -> Map<'q, DB, F, DB::Arguments<'q>>
106    where
107        F: FnMut(DB::Row) -> Result<O, sqlx_core::Error> + Send,
108        O: Unpin,
109    {
110        self.to_query().try_map(f)
111    }
112}
113impl<'q, DB> SqlTemplateExecute<'q, DB>
114where
115    DB: Database,
116{
117    /// like sqlx_core::Query::execute
118    /// Execute the query and return the number of rows affected.
119    #[inline]
120    pub async fn execute<'e, 'c: 'e, E>(self, executor: E) -> Result<DB::QueryResult, Error>
121    where
122        'q: 'e,
123        E: Executor<'c, Database = DB>,
124    {
125        executor.execute(self).await
126    }
127    /// like    sqlx_core::Query::execute_many
128    /// Execute multiple queries and return the rows affected from each query, in a stream.
129    #[inline]
130    pub fn execute_many<'e, 'c: 'e, E>(
131        self,
132        executor: E,
133    ) -> BoxStream<'e, Result<DB::QueryResult, Error>>
134    where
135        'q: 'e,
136        E: Executor<'c, Database = DB>,
137    {
138        #[allow(deprecated)]
139        executor.execute_many(self)
140    }
141    /// like sqlx_core::Query::fetch
142    /// Execute the query and return the generated results as a stream.
143    #[inline]
144    pub fn fetch<'e, 'c: 'e, E>(self, executor: E) -> BoxStream<'e, Result<DB::Row, Error>>
145    where
146        'q: 'e,
147        E: Executor<'c, Database = DB>,
148    {
149        executor.fetch(self)
150    }
151    /// like sqlx_core::Query::fetch_many
152    /// Execute multiple queries and return the generated results as a stream.
153    ///
154    /// For each query in the stream, any generated rows are returned first,
155    /// then the `QueryResult` with the number of rows affected.
156    #[inline]
157    #[allow(clippy::type_complexity)]
158    pub fn fetch_many<'e, 'c: 'e, E>(
159        self,
160        executor: E,
161    ) -> BoxStream<'e, Result<Either<DB::QueryResult, DB::Row>, Error>>
162    where
163        'q: 'e,
164        E: Executor<'c, Database = DB>,
165    {
166        #[allow(deprecated)]
167        executor.fetch_many(self)
168    }
169    /// like sqlx_core::Query::fetch_all
170    /// Execute the query and return all the resulting rows collected into a [`Vec`].
171    ///
172    /// ### Note: beware result set size.
173    /// This will attempt to collect the full result set of the query into memory.
174    ///
175    /// To avoid exhausting available memory, ensure the result set has a known upper bound,
176    /// e.g. using `LIMIT`.
177    #[inline]
178    pub async fn fetch_all<'e, 'c: 'e, E>(self, executor: E) -> Result<Vec<DB::Row>, Error>
179    where
180        'q: 'e,
181        E: Executor<'c, Database = DB>,
182    {
183        executor.fetch_all(self).await
184    }
185    /// like sqlx_core::Query::fetch_one
186    /// Execute the query, returning the first row or [`Error::RowNotFound`] otherwise.
187    ///
188    /// ### Note: for best performance, ensure the query returns at most one row.
189    /// Depending on the driver implementation, if your query can return more than one row,
190    /// it may lead to wasted CPU time and bandwidth on the database server.
191    ///
192    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
193    /// can result in a more optimal query plan.
194    ///
195    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
196    ///
197    /// Otherwise, you might want to add `LIMIT 1` to your query.
198    #[inline]
199    pub async fn fetch_one<'e, 'c: 'e, E>(self, executor: E) -> Result<DB::Row, Error>
200    where
201        'q: 'e,
202        E: Executor<'c, Database = DB>,
203    {
204        executor.fetch_one(self).await
205    }
206    /// like sqlx_core::Query::fetch_optional
207    /// Execute the query, returning the first row or `None` otherwise.
208    ///
209    /// ### Note: for best performance, ensure the query returns at most one row.
210    /// Depending on the driver implementation, if your query can return more than one row,
211    /// it may lead to wasted CPU time and bandwidth on the database server.
212    ///
213    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
214    /// can result in a more optimal query plan.
215    ///
216    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
217    ///
218    /// Otherwise, you might want to add `LIMIT 1` to your query.
219    #[inline]
220    pub async fn fetch_optional<'e, 'c: 'e, E>(self, executor: E) -> Result<Option<DB::Row>, Error>
221    where
222        'q: 'e,
223        E: Executor<'c, Database = DB>,
224    {
225        executor.fetch_optional(self).await
226    }
227
228    // QueryAs functions wrapp
229
230    /// like sqlx_core::QueryAs::fetch
231    /// Execute the query and return the generated results as a stream.
232    pub fn fetch_as<'e, 'c: 'e, O, E>(self, executor: E) -> BoxStream<'e, Result<O, Error>>
233    where
234        'q: 'e,
235        E: 'e + Executor<'c, Database = DB>,
236        DB: 'e,
237        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
238    {
239        self.fetch_many_as(executor)
240            .try_filter_map(|step| async move { Ok(step.right()) })
241            .boxed()
242    }
243    /// like sqlx_core::QueryAs::fetch_many
244    /// Execute multiple queries and return the generated results as a stream
245    /// from each query, in a stream.
246    pub fn fetch_many_as<'e, 'c: 'e, O, E>(
247        self,
248        executor: E,
249    ) -> BoxStream<'e, Result<Either<DB::QueryResult, O>, Error>>
250    where
251        'q: 'e,
252        E: 'e + Executor<'c, Database = DB>,
253        DB: 'e,
254        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
255    {
256        #[allow(deprecated)]
257        executor
258            .fetch_many(self)
259            .map(|v| match v {
260                Ok(Either::Right(row)) => O::from_row(&row).map(Either::Right),
261                Ok(Either::Left(v)) => Ok(Either::Left(v)),
262                Err(e) => Err(e),
263            })
264            .boxed()
265    }
266    /// like sqlx_core::QueryAs::fetch_all
267    /// Execute the query and return all the resulting rows collected into a [`Vec`].
268    ///
269    /// ### Note: beware result set size.
270    /// This will attempt to collect the full result set of the query into memory.
271    ///
272    /// To avoid exhausting available memory, ensure the result set has a known upper bound,
273    /// e.g. using `LIMIT`.
274    #[inline]
275    pub async fn fetch_all_as<'e, 'c: 'e, O, E>(self, executor: E) -> Result<Vec<O>, Error>
276    where
277        'q: 'e,
278        E: 'e + Executor<'c, Database = DB>,
279        DB: 'e,
280        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
281    {
282        self.fetch_as(executor).try_collect().await
283    }
284    /// like sqlx_core::QueryAs::fetch_one
285    /// Execute the query, returning the first row or [`Error::RowNotFound`] otherwise.
286    ///
287    /// ### Note: for best performance, ensure the query returns at most one row.
288    /// Depending on the driver implementation, if your query can return more than one row,
289    /// it may lead to wasted CPU time and bandwidth on the database server.
290    ///
291    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
292    /// can result in a more optimal query plan.
293    ///
294    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
295    ///
296    /// Otherwise, you might want to add `LIMIT 1` to your query.
297    pub async fn fetch_one_as<'e, 'c: 'e, O, E>(self, executor: E) -> Result<O, Error>
298    where
299        'q: 'e,
300        E: 'e + Executor<'c, Database = DB>,
301        DB: 'e,
302        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
303    {
304        self.fetch_optional_as(executor)
305            .await
306            .and_then(|row| row.ok_or(sqlx_core::Error::RowNotFound))
307    }
308    /// like sqlx_core_core::QueryAs::fetch_optional
309    /// Execute the query, returning the first row or `None` otherwise.
310    ///
311    /// ### Note: for best performance, ensure the query returns at most one row.
312    /// Depending on the driver implementation, if your query can return more than one row,
313    /// it may lead to wasted CPU time and bandwidth on the database server.
314    ///
315    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
316    /// can result in a more optimal query plan.
317    ///
318    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
319    ///
320    /// Otherwise, you might want to add `LIMIT 1` to your query.
321    pub async fn fetch_optional_as<'e, 'c: 'e, O, E>(self, executor: E) -> Result<Option<O>, Error>
322    where
323        'q: 'e,
324        E: 'e + Executor<'c, Database = DB>,
325        DB: 'e,
326        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
327    {
328        let row = executor.fetch_optional(self).await?;
329        if let Some(row) = row {
330            O::from_row(&row).map(Some)
331        } else {
332            Ok(None)
333        }
334    }
335}
336
337impl<'q, DB: Database> Execute<'q, DB> for SqlTemplateExecute<'q, DB> {
338    /// Returns the SQL query string
339    #[inline]
340    fn sql(&self) -> &'q str {
341        self.sql
342    }
343
344    /// Gets prepared statement (not supported in this implementation)
345    #[inline]
346    fn statement(&self) -> Option<&DB::Statement<'q>> {
347        None
348    }
349
350    /// Takes ownership of the bound arguments
351    #[inline]
352    fn take_arguments(
353        &mut self,
354    ) -> Result<Option<DB::Arguments<'q>>, sqlx_core::error::BoxDynError> {
355        Ok(self.arguments.take())
356    }
357
358    /// Checks if query is persistent
359    #[inline]
360    fn persistent(&self) -> bool {
361        self.persistent
362    }
363}