sqlx_askama_template/v3/
template_adapter.rs

1use std::marker::PhantomData;
2
3use askama::Result;
4use futures_core::stream::BoxStream;
5use futures_util::{StreamExt, TryStreamExt, stream};
6
7use crate::SqlTemplate;
8use sqlx_core::{
9    Either, Error, database::Database, encode::Encode, executor::Executor, from_row::FromRow,
10    pool::PoolConnection, try_stream, types::Type,
11};
12
13use super::{
14    db_type::{DBBackend, DBType},
15    sql_templte_execute::SqlTemplateExecute,
16};
17
18/// Pagination metadata container
19#[derive(Debug)]
20pub struct PageInfo {
21    /// Total number of records
22    pub total: i64,
23    /// Records per page
24    pub page_size: i64,
25    /// Calculated page count
26    pub page_count: i64,
27}
28
29impl PageInfo {
30    /// Constructs new PageInfo with automatic page count calculation
31    ///
32    /// # Arguments
33    /// * `total` - Total records in dataset
34    /// * `page_size` - Desired records per page
35    pub fn new(total: i64, page_size: i64) -> PageInfo {
36        let mut page_count = total / page_size;
37        if total % page_size > 0 {
38            page_count += 1;
39        }
40        Self {
41            total,
42            page_size,
43            page_count,
44        }
45    }
46}
47/// Database adapter manager handling SQL rendering and execution
48///
49/// # Generic Parameters
50/// - `'q`: Query lifetime
51/// - `DB`: Database type
52/// - `T`: SQL template type
53pub struct DBAdapterManager<'q, DB, T>
54where
55    DB: Database,
56    T: SqlTemplate<'q, DB>,
57{
58    pub(crate) sql_buff: &'q mut String,
59    pub(crate) template: T,
60    persistent: bool,
61    _p: PhantomData<&'q DB>,
62    page_size: Option<i64>,
63    page_no: Option<i64>,
64}
65impl<'q, DB, T> DBAdapterManager<'q, DB, T>
66where
67    DB: Database,
68    T: SqlTemplate<'q, DB>,
69{
70    /// Creates new adapter with SQL buffer
71    ///
72    /// # Arguments
73    /// * `template` - SQL template instance
74    /// * `sql_buff` - Mutable reference to SQL string buffer
75    pub fn new(template: T, sql_buff: &'q mut String) -> Self {
76        Self {
77            sql_buff,
78            template,
79            persistent: true,
80            page_no: None,
81            page_size: None,
82            _p: PhantomData,
83        }
84    }
85}
86impl<'q, DB, T> DBAdapterManager<'q, DB, T>
87where
88    DB: Database,
89    T: SqlTemplate<'q, DB>,
90    i64: Encode<'q, DB> + Type<DB>,
91    for<'c> &'c mut DB::Connection: Executor<'c, Database = DB>,
92{
93    /// Configures query persistence (default: true)
94    pub fn set_persistent(mut self, persistent: bool) -> Self {
95        self.persistent = persistent;
96        self
97    }
98    /// Executes count query for pagination
99    ///
100    /// # Arguments
101    /// * `db_adapter` - Database connection adapter
102    #[inline]
103    pub async fn count<'c, Adapter>(self, db_adapter: Adapter) -> Result<i64, Error>
104    where
105        Adapter: DBBackend<'c, DB>,
106        (i64,): for<'r> FromRow<'r, DB::Row>,
107        'q: 'c,
108    {
109        let sql_buff = self.sql_buff;
110        let (mut sql, arg, db_type, executor) =
111            Self::render_adapter_sql(self.template, db_adapter, None, None).await?;
112
113        db_type.write_count_sql(&mut sql);
114        *sql_buff = sql;
115        let execute = SqlTemplateExecute::new(&*sql_buff, arg).set_persistent(self.persistent);
116        match executor {
117            Either::Left(conn) => {
118                let (count,): (i64,) = execute.fetch_one_as(conn).await?;
119
120                Ok(count)
121            }
122            Either::Right(mut conn) => {
123                let (count,): (i64,) = execute.fetch_one_as(&mut *conn).await?;
124
125                Ok(count)
126            }
127        }
128    }
129    /// Calculates complete pagination metadata
130    ///
131    /// # Arguments
132    /// * `page_size` - Records per page
133    /// * `db_adapter` - Database connection adapter
134    #[inline]
135    pub async fn count_page<'c, Adapter>(
136        self,
137
138        page_size: i64,
139        db_adapter: Adapter,
140    ) -> Result<PageInfo, Error>
141    where
142        Adapter: DBBackend<'c, DB>,
143        (i64,): for<'r> FromRow<'r, DB::Row>,
144        'q: 'c,
145    {
146        let count = self.count(db_adapter).await?;
147
148        Ok(PageInfo::new(count, page_size))
149    }
150    /// Sets pagination parameters
151    pub fn set_page(mut self, page_size: i64, page_no: i64) -> Self {
152        self.page_no = Some(page_no);
153        self.page_size = Some(page_size);
154        self
155    }
156    /// Core SQL rendering method with pagination support
157    #[inline]
158    pub async fn render_adapter_sql<'c, Adapter>(
159        template: T,
160
161        db_adapter: Adapter,
162        page_no: Option<i64>,
163        page_size: Option<i64>,
164    ) -> Result<
165        (
166            String,
167            Option<DB::Arguments<'q>>,
168            DBType,
169            Either<impl Executor<'c, Database = DB>, PoolConnection<DB>>,
170        ),
171        Error,
172    >
173    where
174        'q: 'c,
175        Adapter: DBBackend<'c, DB>,
176    {
177        let (db_type, executor) = db_adapter.backend_db().await?;
178        let f = db_type.get_encode_placeholder_fn();
179        let mut sql = String::new();
180        let mut arg = template.render_sql_with_encode_placeholder_fn(f, &mut sql)?;
181
182        if let (Some(page_no), Some(page_size)) = (page_no, page_size) {
183            let mut args = arg.unwrap_or_default();
184            db_type.write_page_sql(&mut sql, page_size, page_no, &mut args)?;
185            arg = Some(args);
186        }
187        Ok((sql, arg, db_type, executor))
188    }
189
190    /// like sqlx::Query::execute
191    /// Execute the query and return the number of rows affected.
192    #[inline]
193    pub async fn execute<'c, 'e, Adapter>(
194        self,
195        db_adapter: Adapter,
196    ) -> Result<DB::QueryResult, Error>
197    where
198        'q: 'c,
199        Adapter: DBBackend<'c, DB>,
200    {
201        self.execute_many(db_adapter).await.try_collect().await
202    }
203    /// like    sqlx::Query::execute_many
204    /// Execute multiple queries and return the rows affected from each query, in a stream.
205    #[inline]
206    pub async fn execute_many<'c, 'e, Adapter>(
207        self,
208
209        db_adapter: Adapter,
210    ) -> BoxStream<'e, Result<DB::QueryResult, Error>>
211    where
212        'q: 'c,
213        'c: 'e,
214        Adapter: DBBackend<'c, DB>,
215    {
216        self.fetch_many(db_adapter)
217            .await
218            .try_filter_map(|step| async move {
219                Ok(match step {
220                    Either::Left(rows) => Some(rows),
221                    Either::Right(_) => None,
222                })
223            })
224            .boxed()
225    }
226    /// like sqlx::Query::fetch
227    /// Execute the query and return the generated results as a stream.
228    #[inline]
229    pub async fn fetch<'c, 'e, Adapter>(
230        self,
231
232        db_adapter: Adapter,
233    ) -> BoxStream<'e, Result<DB::Row, Error>>
234    where
235        'q: 'c,
236        'c: 'e,
237        Adapter: DBBackend<'c, DB>,
238    {
239        self.fetch_many(db_adapter)
240            .await
241            .try_filter_map(|step| async move {
242                Ok(match step {
243                    Either::Left(_) => None,
244                    Either::Right(row) => Some(row),
245                })
246            })
247            .boxed()
248    }
249    /// like sqlx::Query::fetch_many
250    /// Execute multiple queries and return the generated results as a stream.
251    ///
252    /// For each query in the stream, any generated rows are returned first,
253    /// then the `QueryResult` with the number of rows affected.
254    #[inline]
255    #[allow(clippy::type_complexity)]
256    pub async fn fetch_many<'c, 'e, Adapter>(
257        self,
258        db_adapter: Adapter,
259    ) -> BoxStream<'e, Result<Either<DB::QueryResult, DB::Row>, Error>>
260    where
261        'q: 'c,
262        'c: 'e,
263        Adapter: DBBackend<'c, DB>,
264    {
265        let sql_buff = self.sql_buff;
266        let res =
267            Self::render_adapter_sql(self.template, db_adapter, self.page_no, self.page_size).await;
268
269        match res {
270            Ok((sql, arg, _db_type, executor)) => {
271                *sql_buff = sql;
272                let execute =
273                    SqlTemplateExecute::new(&*sql_buff, arg).set_persistent(self.persistent);
274
275                match executor {
276                    Either::Left(conn) => execute.fetch_many(conn),
277                    Either::Right(mut conn) => Box::pin(try_stream! {
278
279                        let mut s = conn.fetch_many(execute);
280
281                        while let Some(v) = s.try_next().await? {
282                            r#yield!(v);
283                        }
284
285                        Ok(())
286                    }),
287                }
288            }
289            Err(e) => stream::once(async move { Err(e) }).boxed(),
290        }
291    }
292
293    /// like sqlx::Query::fetch_all
294    /// Execute the query and return all the resulting rows collected into a [`Vec`].
295    ///
296    /// ### Note: beware result set size.
297    /// This will attempt to collect the full result set of the query into memory.
298    ///
299    /// To avoid exhausting available memory, ensure the result set has a known upper bound,
300    /// e.g. using `LIMIT`.
301    #[inline]
302    pub async fn fetch_all<'c, Adapter>(self, db_adapter: Adapter) -> Result<Vec<DB::Row>, Error>
303    where
304        'q: 'c,
305        Adapter: DBBackend<'c, DB>,
306    {
307        self.fetch(db_adapter).await.try_collect().await
308    }
309    /// like sqlx::Query::fetch_one
310    /// Execute the query, returning the first row or [`Error::RowNotFound`] otherwise.
311    ///
312    /// ### Note: for best performance, ensure the query returns at most one row.
313    /// Depending on the driver implementation, if your query can return more than one row,
314    /// it may lead to wasted CPU time and bandwidth on the database server.
315    ///
316    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
317    /// can result in a more optimal query plan.
318    ///
319    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
320    ///
321    /// Otherwise, you might want to add `LIMIT 1` to your query.
322    #[inline]
323    pub async fn fetch_one<'c, Adapter>(self, db_adapter: Adapter) -> Result<DB::Row, Error>
324    where
325        'q: 'c,
326        Adapter: DBBackend<'c, DB>,
327    {
328        self.fetch_optional(db_adapter)
329            .await
330            .and_then(|row| match row {
331                Some(row) => Ok(row),
332                None => Err(Error::RowNotFound),
333            })
334    }
335    /// like sqlx::Query::fetch_optional
336    /// Execute the query, returning the first row or `None` otherwise.
337    ///
338    /// ### Note: for best performance, ensure the query returns at most one row.
339    /// Depending on the driver implementation, if your query can return more than one row,
340    /// it may lead to wasted CPU time and bandwidth on the database server.
341    ///
342    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
343    /// can result in a more optimal query plan.
344    ///
345    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
346    ///
347    /// Otherwise, you might want to add `LIMIT 1` to your query.
348    #[inline]
349    pub async fn fetch_optional<'c, Adapter>(
350        self,
351
352        db_adapter: Adapter,
353    ) -> Result<Option<DB::Row>, Error>
354    where
355        'q: 'c,
356        Adapter: DBBackend<'c, DB>,
357    {
358        self.fetch(db_adapter).await.try_next().await
359    }
360
361    /// like sqlx::QueryAs::fetch
362    /// Execute the query and return the generated results as a stream.
363    pub async fn fetch_as<'c, 'e, Adapter, O>(
364        self,
365
366        db_adapter: Adapter,
367    ) -> BoxStream<'e, Result<O, Error>>
368    where
369        'q: 'c,
370        'c: 'e,
371        Adapter: DBBackend<'c, DB>,
372        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
373    {
374        self.fetch_many_as(db_adapter)
375            .await
376            .try_filter_map(|step| async move { Ok(step.right()) })
377            .boxed()
378    }
379    /// like sqlx::QueryAs::fetch_many
380    /// Execute multiple queries and return the generated results as a stream
381    /// from each query, in a stream.
382    pub async fn fetch_many_as<'c, 'e, Adapter, O>(
383        self,
384
385        db_adapter: Adapter,
386    ) -> BoxStream<'e, Result<Either<DB::QueryResult, O>, Error>>
387    where
388        'q: 'c,
389        'c: 'e,
390        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
391        Adapter: DBBackend<'c, DB>,
392    {
393        self.fetch_many(db_adapter)
394            .await
395            .map(|v| match v {
396                Ok(Either::Right(row)) => O::from_row(&row).map(Either::Right),
397                Ok(Either::Left(v)) => Ok(Either::Left(v)),
398                Err(e) => Err(e),
399            })
400            .boxed()
401    }
402    /// like sqlx::QueryAs::fetch_all
403    /// Execute the query and return all the resulting rows collected into a [`Vec`].
404    ///
405    /// ### Note: beware result set size.
406    /// This will attempt to collect the full result set of the query into memory.
407    ///
408    /// To avoid exhausting available memory, ensure the result set has a known upper bound,
409    /// e.g. using `LIMIT`.
410    #[inline]
411    pub async fn fetch_all_as<'c, Adapter, O>(self, db_adapter: Adapter) -> Result<Vec<O>, Error>
412    where
413        'q: 'c,
414        Adapter: DBBackend<'c, DB>,
415        O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
416    {
417        self.fetch_as(db_adapter).await.try_collect().await
418    }
419    /// like sqlx::QueryAs::fetch_one
420    /// Execute the query, returning the first row or [`Error::RowNotFound`] otherwise.
421    ///
422    /// ### Note: for best performance, ensure the query returns at most one row.
423    /// Depending on the driver implementation, if your query can return more than one row,
424    /// it may lead to wasted CPU time and bandwidth on the database server.
425    ///
426    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
427    /// can result in a more optimal query plan.
428    ///
429    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
430    ///
431    /// Otherwise, you might want to add `LIMIT 1` to your query.
432    pub async fn fetch_one_as<'c, Adapter, O>(self, db_adapter: Adapter) -> Result<O, Error>
433    where
434        'q: 'c,
435        Adapter: DBBackend<'c, DB>,
436        O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
437    {
438        self.fetch_optional_as(db_adapter)
439            .await
440            .and_then(|row| row.ok_or(Error::RowNotFound))
441    }
442    /// like sqlx::QueryAs::fetch_optional
443    /// Execute the query, returning the first row or `None` otherwise.
444    ///
445    /// ### Note: for best performance, ensure the query returns at most one row.
446    /// Depending on the driver implementation, if your query can return more than one row,
447    /// it may lead to wasted CPU time and bandwidth on the database server.
448    ///
449    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
450    /// can result in a more optimal query plan.
451    ///
452    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
453    ///
454    /// Otherwise, you might want to add `LIMIT 1` to your query.
455    pub async fn fetch_optional_as<'c, Adapter, O>(
456        self,
457
458        db_adapter: Adapter,
459    ) -> Result<Option<O>, Error>
460    where
461        'q: 'c,
462        Adapter: DBBackend<'c, DB>,
463        O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
464    {
465        let row = self.fetch_optional(db_adapter).await?;
466        if let Some(row) = row {
467            O::from_row(&row).map(Some)
468        } else {
469            Ok(None)
470        }
471    }
472}