Skip to main content

sqlx_askama_template/
sql_template_execute.rs

1use futures_core::stream::BoxStream;
2use futures_util::{StreamExt, TryStreamExt};
3use sqlx_core::{
4    Either, Error,
5    arguments::IntoArguments,
6    database::{Database, HasStatementCache},
7    executor::{Execute, Executor},
8    from_row::FromRow,
9    query::{Map, Query, query, query_with},
10    query_as::{QueryAs, query_as, query_as_with},
11    sql_str::{AssertSqlSafe, SqlSafeStr, SqlStr},
12};
13/// Internal executor for SQL templates
14pub struct SqlTemplateExecute<DB: Database> {
15    /// Reference to SQL query string
16    pub(crate) sql: String,
17    /// SQL parameters
18    pub(crate) arguments: Option<DB::Arguments>,
19    /// Persistent flag
20    pub(crate) persistent: bool,
21}
22impl<DB: Database> Clone for SqlTemplateExecute<DB>
23where
24    DB::Arguments: Clone,
25{
26    fn clone(&self) -> Self {
27        SqlTemplateExecute {
28            sql: self.sql.clone(),
29            arguments: self.arguments.clone(),
30            persistent: self.persistent,
31        }
32    }
33}
34impl<DB: Database> SqlTemplateExecute<DB> {
35    /// Creates a new SQL template executor
36    pub fn new(sql: String, arguments: Option<DB::Arguments>) -> Self {
37        SqlTemplateExecute {
38            sql,
39            arguments,
40            persistent: true,
41        }
42    }
43    /// If `true`, the statement will get prepared once and cached to the
44    /// connection's statement cache.
45    ///
46    /// If queried once with the flag set to `true`, all subsequent queries
47    /// matching the one with the flag will use the cached statement until the
48    /// cache is cleared.
49    ///
50    /// If `false`, the prepared statement will be closed after execution.
51    ///
52    /// Default: `true`.
53    pub fn set_persistent(mut self, persistent: bool) -> Self {
54        self.persistent = persistent;
55        self
56    }
57}
58impl<'q, DB> SqlTemplateExecute<DB>
59where
60    DB: Database + HasStatementCache,
61    DB::Arguments: IntoArguments<DB>,
62{
63    /// Converts the SQL template to a `sqlx_core::QueryAs` object, which can be executed to fetch rows.
64    #[inline]
65    pub fn to_query_as<O>(self) -> QueryAs<'q, DB, O, DB::Arguments>
66    where
67        O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
68    {
69        let q = match self.arguments {
70            Some(args) => query_as_with(AssertSqlSafe(self.sql), args),
71            None => query_as(AssertSqlSafe(self.sql)),
72        };
73        q.persistent(self.persistent)
74    }
75    /// Converts the SQL template to a `sqlx_core::Query` object, which can be executed to fetch rows.
76    #[inline]
77    pub fn to_query(self) -> Query<'q, DB, DB::Arguments> {
78        let q = match self.arguments {
79            Some(args) => {
80                //   let wrap = ArgWrapper(args);
81                query_with(AssertSqlSafe(self.sql), args)
82            }
83            None => query(AssertSqlSafe(self.sql)),
84        };
85        q.persistent(self.persistent)
86    }
87    /// like sqlx_core::Query::map
88    /// Map each row in the result to another type.
89    #[inline]
90    pub fn map<F, O>(
91        self,
92        f: F,
93    ) -> Map<'q, DB, impl FnMut(DB::Row) -> Result<O, sqlx_core::Error> + Send, DB::Arguments>
94    where
95        F: FnMut(DB::Row) -> O + Send,
96        O: Unpin,
97    {
98        self.to_query().map(f)
99    }
100
101    /// like sqlx_core::Query::try_map
102    /// Map each row in the result to another type, returning an error if the mapping fails.
103    #[inline]
104    pub fn try_map<F, O>(self, f: F) -> Map<'q, DB, F, DB::Arguments>
105    where
106        F: FnMut(DB::Row) -> Result<O, sqlx_core::Error> + Send,
107        O: Unpin,
108    {
109        self.to_query().try_map(f)
110    }
111}
112impl<'c, 'e, DB> SqlTemplateExecute<DB>
113where
114    DB: Database,
115    'c: 'e,
116{
117    /// like sqlx_core::Query::execute
118    /// Execute the query and return the number of rows affected.
119    #[inline]
120    pub async fn execute<E>(self, executor: E) -> Result<DB::QueryResult, Error>
121    where
122        E: Executor<'c, Database = DB>,
123    {
124        executor.execute(self).await
125    }
126    /// like    sqlx_core::Query::execute_many
127    /// Execute multiple queries and return the rows affected from each query, in a stream.
128    #[inline]
129    pub fn execute_many<E>(self, executor: E) -> BoxStream<'e, Result<DB::QueryResult, Error>>
130    where
131        E: Executor<'c, Database = DB>,
132    {
133        #[allow(deprecated)]
134        executor.execute_many(self)
135    }
136    /// like sqlx_core::Query::fetch
137    /// Execute the query and return the generated results as a stream.
138    #[inline]
139    pub fn fetch<E>(self, executor: E) -> BoxStream<'e, Result<DB::Row, Error>>
140    where
141        E: Executor<'c, Database = DB>,
142    {
143        executor.fetch(self)
144    }
145    /// like sqlx_core::Query::fetch_many
146    /// Execute multiple queries and return the generated results as a stream.
147    ///
148    /// For each query in the stream, any generated rows are returned first,
149    /// then the `QueryResult` with the number of rows affected.
150    #[inline]
151    #[allow(clippy::type_complexity)]
152    pub fn fetch_many<E>(
153        self,
154        executor: E,
155    ) -> BoxStream<'e, Result<Either<DB::QueryResult, DB::Row>, Error>>
156    where
157        E: Executor<'c, Database = DB>,
158    {
159        #[allow(deprecated)]
160        executor.fetch_many(self)
161    }
162    /// like sqlx_core::Query::fetch_all
163    /// Execute the query and return all the resulting rows collected into a [`Vec`].
164    ///
165    /// ### Note: beware result set size.
166    /// This will attempt to collect the full result set of the query into memory.
167    ///
168    /// To avoid exhausting available memory, ensure the result set has a known upper bound,
169    /// e.g. using `LIMIT`.
170    #[inline]
171    pub async fn fetch_all<E>(self, executor: E) -> Result<Vec<DB::Row>, Error>
172    where
173        E: Executor<'c, Database = DB>,
174    {
175        executor.fetch_all(self).await
176    }
177    /// like sqlx_core::Query::fetch_one
178    /// Execute the query, returning the first row or [`Error::RowNotFound`] otherwise.
179    ///
180    /// ### Note: for best performance, ensure the query returns at most one row.
181    /// Depending on the driver implementation, if your query can return more than one row,
182    /// it may lead to wasted CPU time and bandwidth on the database server.
183    ///
184    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
185    /// can result in a more optimal query plan.
186    ///
187    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
188    ///
189    /// Otherwise, you might want to add `LIMIT 1` to your query.
190    #[inline]
191    pub async fn fetch_one<E>(self, executor: E) -> Result<DB::Row, Error>
192    where
193        E: Executor<'c, Database = DB>,
194    {
195        executor.fetch_one(self).await
196    }
197    /// like sqlx_core::Query::fetch_optional
198    /// Execute the query, returning the first row or `None` otherwise.
199    ///
200    /// ### Note: for best performance, ensure the query returns at most one row.
201    /// Depending on the driver implementation, if your query can return more than one row,
202    /// it may lead to wasted CPU time and bandwidth on the database server.
203    ///
204    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
205    /// can result in a more optimal query plan.
206    ///
207    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
208    ///
209    /// Otherwise, you might want to add `LIMIT 1` to your query.
210    #[inline]
211    pub async fn fetch_optional<E>(self, executor: E) -> Result<Option<DB::Row>, Error>
212    where
213        E: Executor<'c, Database = DB>,
214    {
215        executor.fetch_optional(self).await
216    }
217
218    // QueryAs functions wrap
219
220    /// like sqlx_core::QueryAs::fetch
221    /// Execute the query and return the generated results as a stream.
222    pub fn fetch_as<O, E>(self, executor: E) -> BoxStream<'e, Result<O, Error>>
223    where
224        E: 'e + Executor<'c, Database = DB>,
225        DB: 'e,
226        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
227    {
228        self.fetch_many_as(executor)
229            .try_filter_map(|step| async move { Ok(step.right()) })
230            .boxed()
231    }
232    /// like sqlx_core::QueryAs::fetch_many
233    /// Execute multiple queries and return the generated results as a stream
234    /// from each query, in a stream.
235    pub fn fetch_many_as<O, E>(
236        self,
237        executor: E,
238    ) -> BoxStream<'e, Result<Either<DB::QueryResult, O>, Error>>
239    where
240        E: 'e + Executor<'c, Database = DB>,
241        DB: 'e,
242        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
243    {
244        #[allow(deprecated)]
245        executor
246            .fetch_many(self)
247            .map(|v| match v {
248                Ok(Either::Right(row)) => O::from_row(&row).map(Either::Right),
249                Ok(Either::Left(v)) => Ok(Either::Left(v)),
250                Err(e) => Err(e),
251            })
252            .boxed()
253    }
254    /// like sqlx_core::QueryAs::fetch_all
255    /// Execute the query and return all the resulting rows collected into a [`Vec`].
256    ///
257    /// ### Note: beware result set size.
258    /// This will attempt to collect the full result set of the query into memory.
259    ///
260    /// To avoid exhausting available memory, ensure the result set has a known upper bound,
261    /// e.g. using `LIMIT`.
262    #[inline]
263    pub async fn fetch_all_as<O, E>(self, executor: E) -> Result<Vec<O>, Error>
264    where
265        E: 'e + Executor<'c, Database = DB>,
266        DB: 'e,
267        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
268    {
269        self.fetch_as(executor).try_collect().await
270    }
271    /// like sqlx_core::QueryAs::fetch_one
272    /// Execute the query, returning the first row or [`Error::RowNotFound`] otherwise.
273    ///
274    /// ### Note: for best performance, ensure the query returns at most one row.
275    /// Depending on the driver implementation, if your query can return more than one row,
276    /// it may lead to wasted CPU time and bandwidth on the database server.
277    ///
278    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
279    /// can result in a more optimal query plan.
280    ///
281    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
282    ///
283    /// Otherwise, you might want to add `LIMIT 1` to your query.
284    pub async fn fetch_one_as<O, E>(self, executor: E) -> Result<O, Error>
285    where
286        E: 'e + Executor<'c, Database = DB>,
287        DB: 'e,
288        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
289    {
290        self.fetch_optional_as(executor)
291            .await
292            .and_then(|row| row.ok_or(sqlx_core::Error::RowNotFound))
293    }
294    /// like sqlx_core::QueryAs::fetch_optional
295    /// Execute the query, returning the first row or `None` otherwise.
296    ///
297    /// ### Note: for best performance, ensure the query returns at most one row.
298    /// Depending on the driver implementation, if your query can return more than one row,
299    /// it may lead to wasted CPU time and bandwidth on the database server.
300    ///
301    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
302    /// can result in a more optimal query plan.
303    ///
304    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
305    ///
306    /// Otherwise, you might want to add `LIMIT 1` to your query.
307    pub async fn fetch_optional_as<O, E>(self, executor: E) -> Result<Option<O>, Error>
308    where
309        E: 'e + Executor<'c, Database = DB>,
310        DB: 'e,
311        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
312    {
313        let row = executor.fetch_optional(self).await?;
314        if let Some(row) = row {
315            O::from_row(&row).map(Some)
316        } else {
317            Ok(None)
318        }
319    }
320}
321
322impl<'q, DB: Database> Execute<'q, DB> for SqlTemplateExecute<DB> {
323    /// Returns the SQL query string
324    #[inline]
325    fn sql(self) -> SqlStr {
326        tracing::debug!("Executing SQL: {}", self.sql);
327        AssertSqlSafe(self.sql).into_sql_str()
328    }
329
330    /// Gets prepared statement (not supported in this implementation)
331    #[inline]
332    fn statement(&self) -> Option<&DB::Statement> {
333        None
334    }
335
336    /// Takes ownership of the bound arguments
337    #[inline]
338    fn take_arguments(&mut self) -> Result<Option<DB::Arguments>, sqlx_core::error::BoxDynError> {
339        Ok(self.arguments.take())
340    }
341
342    /// Checks if query is persistent
343    #[inline]
344    fn persistent(&self) -> bool {
345        self.persistent
346    }
347}