Skip to main content

sqlx_askama_template/
sql_template_execute.rs

1use futures_core::stream::BoxStream;
2use futures_util::{StreamExt, TryStreamExt};
3use sqlx_core::{
4    Either, Error,
5    arguments::IntoArguments,
6    database::{Database, HasStatementCache},
7    executor::{Execute, Executor},
8    from_row::FromRow,
9    query::{Map, Query, query, query_with},
10    query_as::{QueryAs, query_as, query_as_with},
11    sql_str::{AssertSqlSafe, SqlSafeStr, SqlStr},
12};
13/// Internal executor for SQL templates
14pub struct SqlTemplateExecute<DB: Database> {
15    /// Reference to SQL query string
16    pub(crate) sql: String,
17    /// SQL parameters
18    pub(crate) arguments: Option<DB::Arguments>,
19    /// Persistent flag
20    pub(crate) persistent: bool,
21}
22impl<DB: Database> Clone for SqlTemplateExecute<DB>
23where
24    DB::Arguments: Clone,
25{
26    fn clone(&self) -> Self {
27        SqlTemplateExecute {
28            sql: self.sql.clone(),
29            arguments: self.arguments.clone(),
30            persistent: self.persistent,
31        }
32    }
33}
34impl<DB: Database> SqlTemplateExecute<DB> {
35    /// Creates a new SQL template executor
36    pub fn new(sql: String, arguments: Option<DB::Arguments>) -> Self {
37        SqlTemplateExecute {
38            sql,
39            arguments,
40            persistent: true,
41        }
42    }
43    /// If `true`, the statement will get prepared once and cached to the
44    /// connection's statement cache.
45    ///
46    /// If queried once with the flag set to `true`, all subsequent queries
47    /// matching the one with the flag will use the cached statement until the
48    /// cache is cleared.
49    ///
50    /// If `false`, the prepared statement will be closed after execution.
51    ///
52    /// Default: `true`.
53    pub fn set_persistent(mut self, persistent: bool) -> Self {
54        self.persistent = persistent;
55        self
56    }
57}
58impl<'q, DB> SqlTemplateExecute<DB>
59where
60    DB: Database + HasStatementCache,
61    DB::Arguments: IntoArguments<DB>,
62{
63    /// to sqlx_core::QueryAs
64    /// Converts the SQL template to a `QueryAs` object, which can be executed to fetch rows
65    #[inline]
66    pub fn to_query_as<O>(self) -> QueryAs<'q, DB, O, DB::Arguments>
67    where
68        O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
69    {
70        let q = match self.arguments {
71            Some(args) => query_as_with(AssertSqlSafe(self.sql), args),
72            None => query_as(AssertSqlSafe(self.sql)),
73        };
74        q.persistent(self.persistent)
75    }
76    /// to sqlx_core::Query
77    /// Converts the SQL template to a `Query` object, which can be executed to fetch rows
78    #[inline]
79    pub fn to_query(self) -> Query<'q, DB, DB::Arguments> {
80        let q = match self.arguments {
81            Some(args) => {
82                //   let wrap = ArgWrapper(args);
83                query_with(AssertSqlSafe(self.sql), args)
84            }
85            None => query(AssertSqlSafe(self.sql)),
86        };
87        q.persistent(self.persistent)
88    }
89    /// like sqlx_core::Query::map
90    /// Map each row in the result to another type.
91    #[inline]
92    pub fn map<F, O>(
93        self,
94        f: F,
95    ) -> Map<'q, DB, impl FnMut(DB::Row) -> Result<O, sqlx_core::Error> + Send, DB::Arguments>
96    where
97        F: FnMut(DB::Row) -> O + Send,
98        O: Unpin,
99    {
100        self.to_query().map(f)
101    }
102
103    /// like sqlx_core::Query::try_map
104    /// Map each row in the result to another type, returning an error if the mapping fails.
105    #[inline]
106    pub fn try_map<F, O>(self, f: F) -> Map<'q, DB, F, DB::Arguments>
107    where
108        F: FnMut(DB::Row) -> Result<O, sqlx_core::Error> + Send,
109        O: Unpin,
110    {
111        self.to_query().try_map(f)
112    }
113}
114impl<DB> SqlTemplateExecute<DB>
115where
116    DB: Database,
117{
118    /// like sqlx_core::Query::execute
119    /// Execute the query and return the number of rows affected.
120    #[inline]
121    pub async fn execute<'e, 'c: 'e, E>(self, executor: E) -> Result<DB::QueryResult, Error>
122    where
123        DB::Arguments: 'e,
124        E: Executor<'c, Database = DB>,
125    {
126        executor.execute(self).await
127    }
128    /// like    sqlx_core::Query::execute_many
129    /// Execute multiple queries and return the rows affected from each query, in a stream.
130    #[inline]
131    pub fn execute_many<'e, 'c: 'e, E>(
132        self,
133        executor: E,
134    ) -> BoxStream<'e, Result<DB::QueryResult, Error>>
135    where
136        DB::Arguments: 'e,
137        E: Executor<'c, Database = DB>,
138    {
139        #[allow(deprecated)]
140        executor.execute_many(self)
141    }
142    /// like sqlx_core::Query::fetch
143    /// Execute the query and return the generated results as a stream.
144    #[inline]
145    pub fn fetch<'e, 'c: 'e, E>(self, executor: E) -> BoxStream<'e, Result<DB::Row, Error>>
146    where
147        DB::Arguments: 'e,
148        E: Executor<'c, Database = DB>,
149    {
150        executor.fetch(self)
151    }
152    /// like sqlx_core::Query::fetch_many
153    /// Execute multiple queries and return the generated results as a stream.
154    ///
155    /// For each query in the stream, any generated rows are returned first,
156    /// then the `QueryResult` with the number of rows affected.
157    #[inline]
158    #[allow(clippy::type_complexity)]
159    pub fn fetch_many<'e, 'c: 'e, E>(
160        self,
161        executor: E,
162    ) -> BoxStream<'e, Result<Either<DB::QueryResult, DB::Row>, Error>>
163    where
164        DB::Arguments: 'e,
165        E: Executor<'c, Database = DB>,
166    {
167        #[allow(deprecated)]
168        executor.fetch_many(self)
169    }
170    /// like sqlx_core::Query::fetch_all
171    /// Execute the query and return all the resulting rows collected into a [`Vec`].
172    ///
173    /// ### Note: beware result set size.
174    /// This will attempt to collect the full result set of the query into memory.
175    ///
176    /// To avoid exhausting available memory, ensure the result set has a known upper bound,
177    /// e.g. using `LIMIT`.
178    #[inline]
179    pub async fn fetch_all<'e, 'c: 'e, E>(self, executor: E) -> Result<Vec<DB::Row>, Error>
180    where
181        DB::Arguments: 'e,
182        E: Executor<'c, Database = DB>,
183    {
184        executor.fetch_all(self).await
185    }
186    /// like sqlx_core::Query::fetch_one
187    /// Execute the query, returning the first row or [`Error::RowNotFound`] otherwise.
188    ///
189    /// ### Note: for best performance, ensure the query returns at most one row.
190    /// Depending on the driver implementation, if your query can return more than one row,
191    /// it may lead to wasted CPU time and bandwidth on the database server.
192    ///
193    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
194    /// can result in a more optimal query plan.
195    ///
196    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
197    ///
198    /// Otherwise, you might want to add `LIMIT 1` to your query.
199    #[inline]
200    pub async fn fetch_one<'e, 'c: 'e, E>(self, executor: E) -> Result<DB::Row, Error>
201    where
202        DB::Arguments: 'e,
203        E: Executor<'c, Database = DB>,
204    {
205        executor.fetch_one(self).await
206    }
207    /// like sqlx_core::Query::fetch_optional
208    /// Execute the query, returning the first row or `None` otherwise.
209    ///
210    /// ### Note: for best performance, ensure the query returns at most one row.
211    /// Depending on the driver implementation, if your query can return more than one row,
212    /// it may lead to wasted CPU time and bandwidth on the database server.
213    ///
214    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
215    /// can result in a more optimal query plan.
216    ///
217    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
218    ///
219    /// Otherwise, you might want to add `LIMIT 1` to your query.
220    #[inline]
221    pub async fn fetch_optional<'e, 'c: 'e, E>(self, executor: E) -> Result<Option<DB::Row>, Error>
222    where
223        DB::Arguments: 'e,
224        E: Executor<'c, Database = DB>,
225    {
226        executor.fetch_optional(self).await
227    }
228
229    // QueryAs functions wrapp
230
231    /// like sqlx_core::QueryAs::fetch
232    /// Execute the query and return the generated results as a stream.
233    pub fn fetch_as<'e, 'c: 'e, O, E>(self, executor: E) -> BoxStream<'e, Result<O, Error>>
234    where
235        DB::Arguments: 'e,
236        E: 'e + Executor<'c, Database = DB>,
237        DB: 'e,
238        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
239    {
240        self.fetch_many_as(executor)
241            .try_filter_map(|step| async move { Ok(step.right()) })
242            .boxed()
243    }
244    /// like sqlx_core::QueryAs::fetch_many
245    /// Execute multiple queries and return the generated results as a stream
246    /// from each query, in a stream.
247    pub fn fetch_many_as<'e, 'c: 'e, O, E>(
248        self,
249        executor: E,
250    ) -> BoxStream<'e, Result<Either<DB::QueryResult, O>, Error>>
251    where
252        DB::Arguments: 'e,
253        E: 'e + Executor<'c, Database = DB>,
254        DB: 'e,
255        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
256    {
257        #[allow(deprecated)]
258        executor
259            .fetch_many(self)
260            .map(|v| match v {
261                Ok(Either::Right(row)) => O::from_row(&row).map(Either::Right),
262                Ok(Either::Left(v)) => Ok(Either::Left(v)),
263                Err(e) => Err(e),
264            })
265            .boxed()
266    }
267    /// like sqlx_core::QueryAs::fetch_all
268    /// Execute the query and return all the resulting rows collected into a [`Vec`].
269    ///
270    /// ### Note: beware result set size.
271    /// This will attempt to collect the full result set of the query into memory.
272    ///
273    /// To avoid exhausting available memory, ensure the result set has a known upper bound,
274    /// e.g. using `LIMIT`.
275    #[inline]
276    pub async fn fetch_all_as<'e, 'c: 'e, O, E>(self, executor: E) -> Result<Vec<O>, Error>
277    where
278        DB::Arguments: 'e,
279        E: 'e + Executor<'c, Database = DB>,
280        DB: 'e,
281        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
282    {
283        self.fetch_as(executor).try_collect().await
284    }
285    /// like sqlx_core::QueryAs::fetch_one
286    /// Execute the query, returning the first row or [`Error::RowNotFound`] otherwise.
287    ///
288    /// ### Note: for best performance, ensure the query returns at most one row.
289    /// Depending on the driver implementation, if your query can return more than one row,
290    /// it may lead to wasted CPU time and bandwidth on the database server.
291    ///
292    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
293    /// can result in a more optimal query plan.
294    ///
295    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
296    ///
297    /// Otherwise, you might want to add `LIMIT 1` to your query.
298    pub async fn fetch_one_as<'e, 'c: 'e, O, E>(self, executor: E) -> Result<O, Error>
299    where
300        DB::Arguments: 'e,
301        E: 'e + Executor<'c, Database = DB>,
302        DB: 'e,
303        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
304    {
305        self.fetch_optional_as(executor)
306            .await
307            .and_then(|row| row.ok_or(sqlx_core::Error::RowNotFound))
308    }
309    /// like sqlx_core_core::QueryAs::fetch_optional
310    /// Execute the query, returning the first row or `None` otherwise.
311    ///
312    /// ### Note: for best performance, ensure the query returns at most one row.
313    /// Depending on the driver implementation, if your query can return more than one row,
314    /// it may lead to wasted CPU time and bandwidth on the database server.
315    ///
316    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
317    /// can result in a more optimal query plan.
318    ///
319    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
320    ///
321    /// Otherwise, you might want to add `LIMIT 1` to your query.
322    pub async fn fetch_optional_as<'e, 'c: 'e, O, E>(self, executor: E) -> Result<Option<O>, Error>
323    where
324        DB::Arguments: 'e,
325        E: 'e + Executor<'c, Database = DB>,
326        DB: 'e,
327        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
328    {
329        let row = executor.fetch_optional(self).await?;
330        if let Some(row) = row {
331            O::from_row(&row).map(Some)
332        } else {
333            Ok(None)
334        }
335    }
336}
337
338impl<'q, DB: Database> Execute<'q, DB> for SqlTemplateExecute<DB> {
339    /// Returns the SQL query string
340    #[inline]
341    fn sql(self) -> SqlStr {
342        tracing::debug!("Executing SQL: {}", self.sql);
343        AssertSqlSafe(self.sql).into_sql_str()
344    }
345
346    /// Gets prepared statement (not supported in this implementation)
347    #[inline]
348    fn statement(&self) -> Option<&DB::Statement> {
349        None
350    }
351
352    /// Takes ownership of the bound arguments
353    #[inline]
354    fn take_arguments(&mut self) -> Result<Option<DB::Arguments>, sqlx_core::error::BoxDynError> {
355        Ok(self.arguments.take())
356    }
357
358    /// Checks if query is persistent
359    #[inline]
360    fn persistent(&self) -> bool {
361        self.persistent
362    }
363}