Skip to main content

sqlx_askama_template/v3/
template_adapter.rs

1use std::marker::PhantomData;
2
3use askama::Result;
4use futures_core::stream::BoxStream;
5use futures_util::{StreamExt, TryStreamExt, stream};
6
7use crate::{AdapterExecutor, DBType, SqlTemplate};
8use sqlx_core::{
9    Either, Error, database::Database, encode::Encode, executor::Executor, from_row::FromRow,
10    types::Type,
11};
12
13use super::{DatabaseDialect, db_adapter::BackendDB, sql_template_execute::SqlTemplateExecute};
14
15/// Pagination metadata container
16#[derive(Debug)]
17pub struct PageInfo {
18    /// Total number of records
19    pub total: i64,
20    /// Records per page
21    pub page_size: i64,
22    /// Calculated page count
23    pub page_count: i64,
24}
25
26impl PageInfo {
27    /// Constructs new PageInfo with automatic page count calculation
28    ///
29    /// # Arguments
30    /// * `total` - Total records in dataset
31    /// * `page_size` - Desired records per page
32    pub fn new(total: i64, page_size: i64) -> PageInfo {
33        let mut page_count = total / page_size;
34        if total % page_size > 0 {
35            page_count += 1;
36        }
37        Self {
38            total,
39            page_size,
40            page_count,
41        }
42    }
43}
44/// Database adapter manager handling SQL rendering and execution
45///
46/// # Generic Parameters
47/// - `'q`: Query lifetime
48/// - `DB`: Database type
49/// - `T`: SQL template type
50pub struct DBAdapterManager<'q, DB, T>
51where
52    DB: Database,
53    T: SqlTemplate<'q, DB>,
54{
55    pub(crate) sql: String,
56    pub(crate) template: T,
57    persistent: bool,
58    _p: PhantomData<&'q DB>,
59    page_size: Option<i64>,
60    page_no: Option<i64>,
61}
62
63impl<'q, DB, T> DBAdapterManager<'q, DB, T>
64where
65    DB: Database,
66    T: SqlTemplate<'q, DB>,
67{
68    /// Creates new adapter with SQL buffer
69    ///
70    /// # Arguments
71    /// * `template` - SQL template instance
72    pub fn new(template: T) -> Self {
73        Self {
74            sql: String::new(),
75            template,
76            persistent: true,
77            page_no: None,
78            page_size: None,
79            _p: PhantomData,
80        }
81    }
82
83    pub fn sql(&self) -> &String {
84        &self.sql
85    }
86}
87impl<'q, DB, T> DBAdapterManager<'q, DB, T>
88where
89    DB: Database,
90    T: SqlTemplate<'q, DB>,
91    i64: Encode<'q, DB> + Type<DB>,
92{
93    /// Configures query persistence (default: true)
94    pub fn set_persistent(mut self, persistent: bool) -> Self {
95        self.persistent = persistent;
96        self
97    }
98    /// Executes count query for pagination
99    ///
100    /// # Arguments
101    /// * `db_adapter` - Database connection adapter
102    #[inline]
103    pub async fn count<'c, C, Adapter>(&'q mut self, db_adapter: Adapter) -> Result<i64, Error>
104    where
105        C: Executor<'c, Database = DB> + 'c,
106        Adapter: BackendDB<'c, DB, C>,
107        (i64,): for<'r> FromRow<'r, DB::Row>,
108        AdapterExecutor<'c, DB, C>: Executor<'c, Database = DB> + 'c,
109    {
110        let (mut sql, arg, db_type, executor) = Self::render_sql_with_adapter(
111            self.template.clone(),
112            db_adapter,
113            self.page_no,
114            self.page_size,
115        )
116        .await?;
117
118        db_type.write_count_sql(&mut sql);
119        self.sql = sql;
120        let execute = SqlTemplateExecute::new(&self.sql, arg).set_persistent(self.persistent);
121        let (count,): (i64,) = execute.fetch_one_as(executor).await?;
122        Ok(count)
123    }
124    /// Calculates complete pagination metadata
125    ///
126    /// # Arguments
127    /// * `page_size` - Records per page
128    /// * `db_adapter` - Database connection adapter
129    #[inline]
130    pub async fn count_page<'c, C, Adapter>(
131        &'q mut self,
132
133        page_size: i64,
134        db_adapter: Adapter,
135    ) -> Result<PageInfo, Error>
136    where
137        C: Executor<'c, Database = DB> + 'c,
138        Adapter: BackendDB<'c, DB, C>,
139        AdapterExecutor<'c, DB, C>: Executor<'c, Database = DB> + 'c,
140        (i64,): for<'r> FromRow<'r, DB::Row>,
141    {
142        let count = self.count(db_adapter).await?;
143
144        Ok(PageInfo::new(count, page_size))
145    }
146    /// Sets pagination parameters
147    pub fn set_page(mut self, page_size: i64, page_no: i64) -> Self {
148        self.page_no = Some(page_no);
149        self.page_size = Some(page_size);
150        self
151    }
152    /// Core SQL rendering method with pagination support
153    #[inline]
154    pub async fn render_sql_with_adapter<'c, C, Adapter>(
155        template: T,
156
157        db_adapter: Adapter,
158        page_no: Option<i64>,
159        page_size: Option<i64>,
160    ) -> Result<
161        (
162            String,
163            Option<DB::Arguments<'q>>,
164            DBType,
165            AdapterExecutor<'c, DB, C>,
166        ),
167        Error,
168    >
169    where
170        C: Executor<'c, Database = DB> + 'c,
171        Adapter: BackendDB<'c, DB, C>,
172    {
173        let (db_type, executor) = db_adapter.backend_db().await?;
174        let f = db_type.get_encode_placeholder_fn();
175        let mut sql = String::new();
176        let mut arg = template.render_sql_with_encode_placeholder_fn(f, &mut sql)?;
177
178        if let (Some(page_no), Some(page_size)) = (page_no, page_size) {
179            let mut args = arg.unwrap_or_default();
180            db_type.write_page_sql(&mut sql, page_size, page_no, &mut args)?;
181            arg = Some(args);
182        }
183        Ok((sql, arg, db_type, executor))
184    }
185
186    /// like sqlx::Query::execute
187    /// Execute the query and return the number of rows affected.
188    #[inline]
189    pub async fn execute<'c, C, Adapter>(
190        &'q mut self,
191        db_adapter: Adapter,
192    ) -> Result<DB::QueryResult, Error>
193    where
194        C: Executor<'c, Database = DB> + 'c,
195        Adapter: BackendDB<'c, DB, C>,
196        AdapterExecutor<'c, DB, C>: Executor<'c, Database = DB> + 'c,
197    {
198        self.execute_many(db_adapter).await.try_collect().await
199    }
200    /// like    sqlx::Query::execute_many
201    /// Execute multiple queries and return the rows affected from each query, in a stream.
202    #[inline]
203    pub async fn execute_many<'c, 'e, C, Adapter>(
204        &'q mut self,
205
206        db_adapter: Adapter,
207    ) -> BoxStream<'e, Result<DB::QueryResult, Error>>
208    where
209        'c: 'e,
210        'q: 'e,
211        C: Executor<'c, Database = DB> + 'c,
212        Adapter: BackendDB<'c, DB, C>,
213        AdapterExecutor<'c, DB, C>: Executor<'c, Database = DB> + 'c,
214    {
215        self.fetch_many(db_adapter)
216            .await
217            .try_filter_map(|step| async move {
218                Ok(match step {
219                    Either::Left(rows) => Some(rows),
220                    Either::Right(_) => None,
221                })
222            })
223            .boxed()
224    }
225    /// like sqlx::Query::fetch
226    /// Execute the query and return the generated results as a stream.
227    #[inline]
228    pub async fn fetch<'c, 'e, C, Adapter>(
229        &'q mut self,
230
231        db_adapter: Adapter,
232    ) -> BoxStream<'e, Result<DB::Row, Error>>
233    where
234        C: Executor<'c, Database = DB> + 'c,
235        Adapter: BackendDB<'c, DB, C>,
236        AdapterExecutor<'c, DB, C>: Executor<'c, Database = DB> + 'c,
237        'c: 'e,
238        'q: 'e,
239    {
240        self.fetch_many(db_adapter)
241            .await
242            .try_filter_map(|step| async move {
243                Ok(match step {
244                    Either::Left(_) => None,
245                    Either::Right(row) => Some(row),
246                })
247            })
248            .boxed()
249    }
250    /// like sqlx::Query::fetch_many
251    /// Execute multiple queries and return the generated results as a stream.
252    ///
253    /// For each query in the stream, any generated rows are returned first,
254    /// then the `QueryResult` with the number of rows affected.
255    #[inline]
256    #[allow(clippy::type_complexity)]
257    pub async fn fetch_many<'c, 'e, C, Adapter>(
258        &'q mut self,
259        db_adapter: Adapter,
260    ) -> BoxStream<'e, Result<Either<DB::QueryResult, DB::Row>, Error>>
261    where
262        'c: 'e,
263        'q: 'e,
264        C: Executor<'c, Database = DB> + 'c,
265        Adapter: BackendDB<'c, DB, C>,
266        AdapterExecutor<'c, DB, C>: Executor<'c, Database = DB> + 'c,
267    {
268        let res = Self::render_sql_with_adapter(
269            self.template.clone(),
270            db_adapter,
271            self.page_no,
272            self.page_size,
273        )
274        .await;
275
276        match res {
277            Ok((sql, arg, _db_type, executor)) => {
278                self.sql = sql;
279                let execute =
280                    SqlTemplateExecute::new(&self.sql, arg).set_persistent(self.persistent);
281                execute.fetch_many(executor)
282            }
283            Err(e) => stream::once(async move { Err(e) }).boxed(),
284        }
285    }
286
287    /// like sqlx::Query::fetch_all
288    /// Execute the query and return all the resulting rows collected into a [`Vec`].
289    ///
290    /// ### Note: beware result set size.
291    /// This will attempt to collect the full result set of the query into memory.
292    ///
293    /// To avoid exhausting available memory, ensure the result set has a known upper bound,
294    /// e.g. using `LIMIT`.
295    #[inline]
296    pub async fn fetch_all<'c, C, Adapter>(
297        &'q mut self,
298        db_adapter: Adapter,
299    ) -> Result<Vec<DB::Row>, Error>
300    where
301        C: Executor<'c, Database = DB> + 'c,
302        Adapter: BackendDB<'c, DB, C>,
303        AdapterExecutor<'c, DB, C>: Executor<'c, Database = DB> + 'c,
304    {
305        self.fetch(db_adapter).await.try_collect().await
306    }
307    /// like sqlx::Query::fetch_one
308    /// Execute the query, returning the first row or [`Error::RowNotFound`] otherwise.
309    ///
310    /// ### Note: for best performance, ensure the query returns at most one row.
311    /// Depending on the driver implementation, if your query can return more than one row,
312    /// it may lead to wasted CPU time and bandwidth on the database server.
313    ///
314    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
315    /// can result in a more optimal query plan.
316    ///
317    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
318    ///
319    /// Otherwise, you might want to add `LIMIT 1` to your query.
320    #[inline]
321    pub async fn fetch_one<'c, C, Adapter>(
322        &'q mut self,
323        db_adapter: Adapter,
324    ) -> Result<DB::Row, Error>
325    where
326        C: Executor<'c, Database = DB> + 'c,
327        Adapter: BackendDB<'c, DB, C>,
328        AdapterExecutor<'c, DB, C>: Executor<'c, Database = DB> + 'c,
329    {
330        self.fetch_optional(db_adapter)
331            .await
332            .and_then(|row| match row {
333                Some(row) => Ok(row),
334                None => Err(Error::RowNotFound),
335            })
336    }
337    /// like sqlx::Query::fetch_optional
338    /// Execute the query, returning the first row or `None` otherwise.
339    ///
340    /// ### Note: for best performance, ensure the query returns at most one row.
341    /// Depending on the driver implementation, if your query can return more than one row,
342    /// it may lead to wasted CPU time and bandwidth on the database server.
343    ///
344    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
345    /// can result in a more optimal query plan.
346    ///
347    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
348    ///
349    /// Otherwise, you might want to add `LIMIT 1` to your query.
350    #[inline]
351    pub async fn fetch_optional<'c, C, Adapter>(
352        &'q mut self,
353
354        db_adapter: Adapter,
355    ) -> Result<Option<DB::Row>, Error>
356    where
357        C: Executor<'c, Database = DB> + 'c,
358        Adapter: BackendDB<'c, DB, C>,
359        AdapterExecutor<'c, DB, C>: Executor<'c, Database = DB> + 'c,
360    {
361        self.fetch(db_adapter).await.try_next().await
362    }
363
364    /// like sqlx::QueryAs::fetch
365    /// Execute the query and return the generated results as a stream.
366    pub async fn fetch_as<'c, 'e, C, Adapter, O>(
367        &'q mut self,
368
369        db_adapter: Adapter,
370    ) -> BoxStream<'e, Result<O, Error>>
371    where
372        'c: 'e,
373        'q: 'e,
374        C: Executor<'c, Database = DB> + 'c,
375        Adapter: BackendDB<'c, DB, C>,
376        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
377        AdapterExecutor<'c, DB, C>: Executor<'c, Database = DB> + 'c,
378    {
379        self.fetch_many_as(db_adapter)
380            .await
381            .try_filter_map(|step| async move { Ok(step.right()) })
382            .boxed()
383    }
384    /// like sqlx::QueryAs::fetch_many
385    /// Execute multiple queries and return the generated results as a stream
386    /// from each query, in a stream.
387    pub async fn fetch_many_as<'c, 'e, C, Adapter, O>(
388        &'q mut self,
389        db_adapter: Adapter,
390    ) -> BoxStream<'e, Result<Either<DB::QueryResult, O>, Error>>
391    where
392        'c: 'e,
393        'q: 'e,
394        C: Executor<'c, Database = DB> + 'c,
395        AdapterExecutor<'c, DB, C>: Executor<'c, Database = DB> + 'c,
396        Adapter: BackendDB<'c, DB, C>,
397        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
398    {
399        self.fetch_many(db_adapter)
400            .await
401            .map(|v| match v {
402                Ok(Either::Right(row)) => O::from_row(&row).map(Either::Right),
403                Ok(Either::Left(v)) => Ok(Either::Left(v)),
404                Err(e) => Err(e),
405            })
406            .boxed()
407    }
408    /// like sqlx::QueryAs::fetch_all
409    /// Execute the query and return all the resulting rows collected into a [`Vec`].
410    ///
411    /// ### Note: beware result set size.
412    /// This will attempt to collect the full result set of the query into memory.
413    ///
414    /// To avoid exhausting available memory, ensure the result set has a known upper bound,
415    /// e.g. using `LIMIT`.
416    #[inline]
417    pub async fn fetch_all_as<'c, C, Adapter, O>(
418        &'q mut self,
419        db_adapter: Adapter,
420    ) -> Result<Vec<O>, Error>
421    where
422        C: Executor<'c, Database = DB> + 'c,
423        Adapter: BackendDB<'c, DB, C>,
424        AdapterExecutor<'c, DB, C>: Executor<'c, Database = DB> + 'c,
425        O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
426    {
427        self.fetch_as(db_adapter).await.try_collect().await
428    }
429    /// like sqlx::QueryAs::fetch_one
430    /// Execute the query, returning the first row or [`Error::RowNotFound`] otherwise.
431    ///
432    /// ### Note: for best performance, ensure the query returns at most one row.
433    /// Depending on the driver implementation, if your query can return more than one row,
434    /// it may lead to wasted CPU time and bandwidth on the database server.
435    ///
436    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
437    /// can result in a more optimal query plan.
438    ///
439    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
440    ///
441    /// Otherwise, you might want to add `LIMIT 1` to your query.
442    pub async fn fetch_one_as<'c, C, Adapter, O>(
443        &'q mut self,
444        db_adapter: Adapter,
445    ) -> Result<O, Error>
446    where
447        C: Executor<'c, Database = DB> + 'c,
448        Adapter: BackendDB<'c, DB, C>,
449        O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
450        AdapterExecutor<'c, DB, C>: Executor<'c, Database = DB> + 'c,
451    {
452        self.fetch_optional_as(db_adapter)
453            .await
454            .and_then(|row| row.ok_or(Error::RowNotFound))
455    }
456    /// like sqlx::QueryAs::fetch_optional
457    /// Execute the query, returning the first row or `None` otherwise.
458    ///
459    /// ### Note: for best performance, ensure the query returns at most one row.
460    /// Depending on the driver implementation, if your query can return more than one row,
461    /// it may lead to wasted CPU time and bandwidth on the database server.
462    ///
463    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
464    /// can result in a more optimal query plan.
465    ///
466    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
467    ///
468    /// Otherwise, you might want to add `LIMIT 1` to your query.
469    pub async fn fetch_optional_as<'c, C, Adapter, O>(
470        &'q mut self,
471
472        db_adapter: Adapter,
473    ) -> Result<Option<O>, Error>
474    where
475        C: Executor<'c, Database = DB> + 'c,
476        Adapter: BackendDB<'c, DB, C>,
477        AdapterExecutor<'c, DB, C>: Executor<'c, Database = DB> + 'c,
478        O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
479    {
480        let row = self.fetch_optional(db_adapter).await?;
481        if let Some(row) = row {
482            O::from_row(&row).map(Some)
483        } else {
484            Ok(None)
485        }
486    }
487}