sqlx_askama_template/v3/
sql_template_execute.rs

1use futures_core::stream::BoxStream;
2use futures_util::{StreamExt, TryStreamExt};
3use sqlx_core::{
4    Either, Error,
5    arguments::IntoArguments,
6    database::{Database, HasStatementCache},
7    executor::{Execute, Executor},
8    from_row::FromRow,
9    query::{Map, Query, query, query_with},
10    query_as::{QueryAs, query_as, query_as_with},
11};
12/// Internal executor for SQL templates
13pub struct SqlTemplateExecute<'q, DB: Database> {
14    /// Reference to SQL query string
15    pub(crate) sql: &'q str,
16    /// SQL parameters
17    pub(crate) arguments: Option<DB::Arguments<'q>>,
18    /// Persistent flag
19    pub(crate) persistent: bool,
20}
21impl<'q, DB: Database> Clone for SqlTemplateExecute<'q, DB>
22where
23    DB::Arguments<'q>: Clone,
24{
25    fn clone(&self) -> Self {
26        SqlTemplateExecute {
27            sql: self.sql,
28            arguments: self.arguments.clone(),
29            persistent: self.persistent,
30        }
31    }
32}
33impl<'q, DB: Database> SqlTemplateExecute<'q, DB> {
34    /// Creates a new SQL template executor
35    pub fn new(sql: &'q str, arguments: Option<DB::Arguments<'q>>) -> Self {
36        SqlTemplateExecute {
37            sql,
38            arguments,
39            persistent: true,
40        }
41    }
42    /// If `true`, the statement will get prepared once and cached to the
43    /// connection's statement cache.
44    ///
45    /// If queried once with the flag set to `true`, all subsequent queries
46    /// matching the one with the flag will use the cached statement until the
47    /// cache is cleared.
48    ///
49    /// If `false`, the prepared statement will be closed after execution.
50    ///
51    /// Default: `true`.
52    pub fn set_persistent(mut self, persistent: bool) -> Self {
53        self.persistent = persistent;
54        self
55    }
56}
57impl<'q, DB> SqlTemplateExecute<'q, DB>
58where
59    DB: Database + HasStatementCache,
60    DB::Arguments<'q>: IntoArguments<'q, DB>,
61{
62    /// to sqlx_core::QueryAs
63    /// Converts the SQL template to a `QueryAs` object, which can be executed to fetch rows
64    #[inline]
65    pub fn to_query_as<O>(self) -> QueryAs<'q, DB, O, DB::Arguments<'q>>
66    where
67        O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
68    {
69        let q = match self.arguments {
70            Some(args) => query_as_with(self.sql, args),
71            None => query_as(self.sql),
72        };
73        q.persistent(self.persistent)
74    }
75    /// to sqlx_core::Query
76    /// Converts the SQL template to a `Query` object, which can be executed to fetch rows
77    #[inline]
78    pub fn to_query(self) -> Query<'q, DB, DB::Arguments<'q>> {
79        let q = match self.arguments {
80            Some(args) => {
81                //   let wrap = ArgWrapper(args);
82                query_with(self.sql, args)
83            }
84            None => query(self.sql),
85        };
86        q.persistent(self.persistent)
87    }
88    /// like sqlx_core::Query::map
89    /// Map each row in the result to another type.
90    #[inline]
91    pub fn map<F, O>(
92        self,
93        f: F,
94    ) -> Map<'q, DB, impl FnMut(DB::Row) -> Result<O, sqlx_core::Error> + Send, DB::Arguments<'q>>
95    where
96        F: FnMut(DB::Row) -> O + Send,
97        O: Unpin,
98    {
99        self.to_query().map(f)
100    }
101
102    /// like sqlx_core::Query::try_map
103    /// Map each row in the result to another type, returning an error if the mapping fails.
104    #[inline]
105    pub fn try_map<F, O>(self, f: F) -> Map<'q, DB, F, DB::Arguments<'q>>
106    where
107        F: FnMut(DB::Row) -> Result<O, sqlx_core::Error> + Send,
108        O: Unpin,
109    {
110        self.to_query().try_map(f)
111    }
112}
113impl<'q, DB> SqlTemplateExecute<'q, DB>
114where
115    DB: Database,
116{
117    /// like sqlx_core::Query::execute
118    /// Execute the query and return the number of rows affected.
119    #[inline]
120    pub async fn execute<'e, 'c: 'e, E>(self, executor: E) -> Result<DB::QueryResult, Error>
121    where
122        'q: 'e,
123        DB::Arguments<'q>: 'e,
124        E: Executor<'c, Database = DB>,
125    {
126        executor.execute(self).await
127    }
128    /// like    sqlx_core::Query::execute_many
129    /// Execute multiple queries and return the rows affected from each query, in a stream.
130    #[inline]
131    pub fn execute_many<'e, 'c: 'e, E>(
132        self,
133        executor: E,
134    ) -> BoxStream<'e, Result<DB::QueryResult, Error>>
135    where
136        'q: 'e,
137        DB::Arguments<'q>: 'e,
138        E: Executor<'c, Database = DB>,
139    {
140        #[allow(deprecated)]
141        executor.execute_many(self)
142    }
143    /// like sqlx_core::Query::fetch
144    /// Execute the query and return the generated results as a stream.
145    #[inline]
146    pub fn fetch<'e, 'c: 'e, E>(self, executor: E) -> BoxStream<'e, Result<DB::Row, Error>>
147    where
148        'q: 'e,
149        DB::Arguments<'q>: 'e,
150        E: Executor<'c, Database = DB>,
151    {
152        executor.fetch(self)
153    }
154    /// like sqlx_core::Query::fetch_many
155    /// Execute multiple queries and return the generated results as a stream.
156    ///
157    /// For each query in the stream, any generated rows are returned first,
158    /// then the `QueryResult` with the number of rows affected.
159    #[inline]
160    #[allow(clippy::type_complexity)]
161    pub fn fetch_many<'e, 'c: 'e, E>(
162        self,
163        executor: E,
164    ) -> BoxStream<'e, Result<Either<DB::QueryResult, DB::Row>, Error>>
165    where
166        'q: 'e,
167        DB::Arguments<'q>: 'e,
168        E: Executor<'c, Database = DB>,
169    {
170        #[allow(deprecated)]
171        executor.fetch_many(self)
172    }
173    /// like sqlx_core::Query::fetch_all
174    /// Execute the query and return all the resulting rows collected into a [`Vec`].
175    ///
176    /// ### Note: beware result set size.
177    /// This will attempt to collect the full result set of the query into memory.
178    ///
179    /// To avoid exhausting available memory, ensure the result set has a known upper bound,
180    /// e.g. using `LIMIT`.
181    #[inline]
182    pub async fn fetch_all<'e, 'c: 'e, E>(self, executor: E) -> Result<Vec<DB::Row>, Error>
183    where
184        'q: 'e,
185        DB::Arguments<'q>: 'e,
186        E: Executor<'c, Database = DB>,
187    {
188        executor.fetch_all(self).await
189    }
190    /// like sqlx_core::Query::fetch_one
191    /// Execute the query, returning the first row or [`Error::RowNotFound`] otherwise.
192    ///
193    /// ### Note: for best performance, ensure the query returns at most one row.
194    /// Depending on the driver implementation, if your query can return more than one row,
195    /// it may lead to wasted CPU time and bandwidth on the database server.
196    ///
197    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
198    /// can result in a more optimal query plan.
199    ///
200    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
201    ///
202    /// Otherwise, you might want to add `LIMIT 1` to your query.
203    #[inline]
204    pub async fn fetch_one<'e, 'c: 'e, E>(self, executor: E) -> Result<DB::Row, Error>
205    where
206        'q: 'e,
207        DB::Arguments<'q>: 'e,
208        E: Executor<'c, Database = DB>,
209    {
210        executor.fetch_one(self).await
211    }
212    /// like sqlx_core::Query::fetch_optional
213    /// Execute the query, returning the first row or `None` otherwise.
214    ///
215    /// ### Note: for best performance, ensure the query returns at most one row.
216    /// Depending on the driver implementation, if your query can return more than one row,
217    /// it may lead to wasted CPU time and bandwidth on the database server.
218    ///
219    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
220    /// can result in a more optimal query plan.
221    ///
222    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
223    ///
224    /// Otherwise, you might want to add `LIMIT 1` to your query.
225    #[inline]
226    pub async fn fetch_optional<'e, 'c: 'e, E>(self, executor: E) -> Result<Option<DB::Row>, Error>
227    where
228        'q: 'e,
229        DB::Arguments<'q>: 'e,
230        E: Executor<'c, Database = DB>,
231    {
232        executor.fetch_optional(self).await
233    }
234
235    // QueryAs functions wrapp
236
237    /// like sqlx_core::QueryAs::fetch
238    /// Execute the query and return the generated results as a stream.
239    pub fn fetch_as<'e, 'c: 'e, O, E>(self, executor: E) -> BoxStream<'e, Result<O, Error>>
240    where
241        'q: 'e,
242        DB::Arguments<'q>: 'e,
243        E: 'e + Executor<'c, Database = DB>,
244        DB: 'e,
245        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
246    {
247        self.fetch_many_as(executor)
248            .try_filter_map(|step| async move { Ok(step.right()) })
249            .boxed()
250    }
251    /// like sqlx_core::QueryAs::fetch_many
252    /// Execute multiple queries and return the generated results as a stream
253    /// from each query, in a stream.
254    pub fn fetch_many_as<'e, 'c: 'e, O, E>(
255        self,
256        executor: E,
257    ) -> BoxStream<'e, Result<Either<DB::QueryResult, O>, Error>>
258    where
259        'q: 'e,
260        DB::Arguments<'q>: 'e,
261        E: 'e + Executor<'c, Database = DB>,
262        DB: 'e,
263        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
264    {
265        #[allow(deprecated)]
266        executor
267            .fetch_many(self)
268            .map(|v| match v {
269                Ok(Either::Right(row)) => O::from_row(&row).map(Either::Right),
270                Ok(Either::Left(v)) => Ok(Either::Left(v)),
271                Err(e) => Err(e),
272            })
273            .boxed()
274    }
275    /// like sqlx_core::QueryAs::fetch_all
276    /// Execute the query and return all the resulting rows collected into a [`Vec`].
277    ///
278    /// ### Note: beware result set size.
279    /// This will attempt to collect the full result set of the query into memory.
280    ///
281    /// To avoid exhausting available memory, ensure the result set has a known upper bound,
282    /// e.g. using `LIMIT`.
283    #[inline]
284    pub async fn fetch_all_as<'e, 'c: 'e, O, E>(self, executor: E) -> Result<Vec<O>, Error>
285    where
286        'q: 'e,
287        DB::Arguments<'q>: 'e,
288        E: 'e + Executor<'c, Database = DB>,
289        DB: 'e,
290        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
291    {
292        self.fetch_as(executor).try_collect().await
293    }
294    /// like sqlx_core::QueryAs::fetch_one
295    /// Execute the query, returning the first row or [`Error::RowNotFound`] otherwise.
296    ///
297    /// ### Note: for best performance, ensure the query returns at most one row.
298    /// Depending on the driver implementation, if your query can return more than one row,
299    /// it may lead to wasted CPU time and bandwidth on the database server.
300    ///
301    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
302    /// can result in a more optimal query plan.
303    ///
304    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
305    ///
306    /// Otherwise, you might want to add `LIMIT 1` to your query.
307    pub async fn fetch_one_as<'e, 'c: 'e, O, E>(self, executor: E) -> Result<O, Error>
308    where
309        'q: 'e,
310        DB::Arguments<'q>: 'e,
311        E: 'e + Executor<'c, Database = DB>,
312        DB: 'e,
313        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
314    {
315        self.fetch_optional_as(executor)
316            .await
317            .and_then(|row| row.ok_or(sqlx_core::Error::RowNotFound))
318    }
319    /// like sqlx_core_core::QueryAs::fetch_optional
320    /// Execute the query, returning the first row or `None` otherwise.
321    ///
322    /// ### Note: for best performance, ensure the query returns at most one row.
323    /// Depending on the driver implementation, if your query can return more than one row,
324    /// it may lead to wasted CPU time and bandwidth on the database server.
325    ///
326    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
327    /// can result in a more optimal query plan.
328    ///
329    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
330    ///
331    /// Otherwise, you might want to add `LIMIT 1` to your query.
332    pub async fn fetch_optional_as<'e, 'c: 'e, O, E>(self, executor: E) -> Result<Option<O>, Error>
333    where
334        'q: 'e,
335        DB::Arguments<'q>: 'e,
336        E: 'e + Executor<'c, Database = DB>,
337        DB: 'e,
338        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
339    {
340        let row = executor.fetch_optional(self).await?;
341        if let Some(row) = row {
342            O::from_row(&row).map(Some)
343        } else {
344            Ok(None)
345        }
346    }
347}
348
349impl<'q, DB: Database> Execute<'q, DB> for SqlTemplateExecute<'q, DB> {
350    /// Returns the SQL query string
351    #[inline]
352    fn sql(&self) -> &'q str {
353        log::debug!("Executing SQL: {}", self.sql);
354        self.sql
355    }
356
357    /// Gets prepared statement (not supported in this implementation)
358    #[inline]
359    fn statement(&self) -> Option<&DB::Statement<'q>> {
360        None
361    }
362
363    /// Takes ownership of the bound arguments
364    #[inline]
365    fn take_arguments(
366        &mut self,
367    ) -> Result<Option<DB::Arguments<'q>>, sqlx_core::error::BoxDynError> {
368        Ok(self.arguments.take())
369    }
370
371    /// Checks if query is persistent
372    #[inline]
373    fn persistent(&self) -> bool {
374        self.persistent
375    }
376}