sqlx_askama_template/v3/
template_adapter.rs

1use std::marker::PhantomData;
2
3use askama::Result;
4use futures_core::stream::BoxStream;
5use futures_util::{StreamExt, TryStreamExt, stream};
6
7use crate::SqlTemplate;
8use sqlx_core::{
9    Either, Error, database::Database, encode::Encode, executor::Executor, from_row::FromRow,
10    types::Type,
11};
12
13use super::{DatabaseDialect, db_adapter::BackendDB, sql_template_execute::SqlTemplateExecute};
14
15/// Pagination metadata container
16#[derive(Debug)]
17pub struct PageInfo {
18    /// Total number of records
19    pub total: i64,
20    /// Records per page
21    pub page_size: i64,
22    /// Calculated page count
23    pub page_count: i64,
24}
25
26impl PageInfo {
27    /// Constructs new PageInfo with automatic page count calculation
28    ///
29    /// # Arguments
30    /// * `total` - Total records in dataset
31    /// * `page_size` - Desired records per page
32    pub fn new(total: i64, page_size: i64) -> PageInfo {
33        let mut page_count = total / page_size;
34        if total % page_size > 0 {
35            page_count += 1;
36        }
37        Self {
38            total,
39            page_size,
40            page_count,
41        }
42    }
43}
44/// Database adapter manager handling SQL rendering and execution
45///
46/// # Generic Parameters
47/// - `'q`: Query lifetime
48/// - `DB`: Database type
49/// - `T`: SQL template type
50pub struct DBAdapterManager<'q, DB, T>
51where
52    DB: Database,
53    T: SqlTemplate<'q, DB>,
54{
55    pub(crate) sql: String,
56    pub(crate) template: T,
57    persistent: bool,
58    _p: PhantomData<&'q DB>,
59    page_size: Option<i64>,
60    page_no: Option<i64>,
61}
62
63impl<'q, DB, T> DBAdapterManager<'q, DB, T>
64where
65    DB: Database,
66    T: SqlTemplate<'q, DB>,
67{
68    /// Creates new adapter with SQL buffer
69    ///
70    /// # Arguments
71    /// * `template` - SQL template instance
72    pub fn new(template: T) -> Self {
73        Self {
74            sql: String::new(),
75            template,
76            persistent: true,
77            page_no: None,
78            page_size: None,
79            _p: PhantomData,
80        }
81    }
82
83    pub fn sql(&self) -> &String {
84        &self.sql
85    }
86}
87impl<'q, DB, T> DBAdapterManager<'q, DB, T>
88where
89    DB: Database,
90    T: SqlTemplate<'q, DB>,
91    i64: Encode<'q, DB> + Type<DB>,
92{
93    /// Configures query persistence (default: true)
94    pub fn set_persistent(mut self, persistent: bool) -> Self {
95        self.persistent = persistent;
96        self
97    }
98    /// Executes count query for pagination
99    ///
100    /// # Arguments
101    /// * `db_adapter` - Database connection adapter
102    #[inline]
103    pub async fn count<'c, Adapter>(&'q mut self, db_adapter: Adapter) -> Result<i64, Error>
104    where
105        Adapter: BackendDB<'c, DB>,
106        (i64,): for<'r> FromRow<'r, DB::Row>,
107    {
108        let (mut sql, arg, db_type, executor) = Self::render_sql_with_adapter(
109            self.template.clone(),
110            db_adapter,
111            self.page_no,
112            self.page_size,
113        )
114        .await?;
115
116        db_type.write_count_sql(&mut sql);
117        self.sql = sql;
118        let execute = SqlTemplateExecute::new(&self.sql, arg).set_persistent(self.persistent);
119        let (count,): (i64,) = execute.fetch_one_as(executor).await?;
120        Ok(count)
121    }
122    /// Calculates complete pagination metadata
123    ///
124    /// # Arguments
125    /// * `page_size` - Records per page
126    /// * `db_adapter` - Database connection adapter
127    #[inline]
128    pub async fn count_page<'c, Adapter>(
129        &'q mut self,
130
131        page_size: i64,
132        db_adapter: Adapter,
133    ) -> Result<PageInfo, Error>
134    where
135        Adapter: BackendDB<'c, DB>,
136        (i64,): for<'r> FromRow<'r, DB::Row>,
137    {
138        let count = self.count(db_adapter).await?;
139
140        Ok(PageInfo::new(count, page_size))
141    }
142    /// Sets pagination parameters
143    pub fn set_page(mut self, page_size: i64, page_no: i64) -> Self {
144        self.page_no = Some(page_no);
145        self.page_size = Some(page_size);
146        self
147    }
148    /// Core SQL rendering method with pagination support
149    #[inline]
150    pub async fn render_sql_with_adapter<'c, Adapter>(
151        template: T,
152
153        db_adapter: Adapter,
154        page_no: Option<i64>,
155        page_size: Option<i64>,
156    ) -> Result<
157        (
158            String,
159            Option<DB::Arguments<'q>>,
160            impl DatabaseDialect,
161            impl Executor<'c, Database = DB>,
162        ),
163        Error,
164    >
165    where
166        Adapter: BackendDB<'c, DB>,
167    {
168        let (db_type, executor) = db_adapter.backend_db().await?;
169        let f = db_type.get_encode_placeholder_fn();
170        let mut sql = String::new();
171        let mut arg = template.render_sql_with_encode_placeholder_fn(f, &mut sql)?;
172
173        if let (Some(page_no), Some(page_size)) = (page_no, page_size) {
174            let mut args = arg.unwrap_or_default();
175            db_type.write_page_sql(&mut sql, page_size, page_no, &mut args)?;
176            arg = Some(args);
177        }
178        Ok((sql, arg, db_type, executor))
179    }
180
181    /// like sqlx::Query::execute
182    /// Execute the query and return the number of rows affected.
183    #[inline]
184    pub async fn execute<'c, Adapter>(
185        &'q mut self,
186        db_adapter: Adapter,
187    ) -> Result<DB::QueryResult, Error>
188    where
189        Adapter: BackendDB<'c, DB>,
190    {
191        self.execute_many(db_adapter).await.try_collect().await
192    }
193    /// like    sqlx::Query::execute_many
194    /// Execute multiple queries and return the rows affected from each query, in a stream.
195    #[inline]
196    pub async fn execute_many<'c, 'e, Adapter>(
197        &'q mut self,
198
199        db_adapter: Adapter,
200    ) -> BoxStream<'e, Result<DB::QueryResult, Error>>
201    where
202        'c: 'e,
203        'q: 'e,
204        Adapter: BackendDB<'c, DB>,
205    {
206        self.fetch_many(db_adapter)
207            .await
208            .try_filter_map(|step| async move {
209                Ok(match step {
210                    Either::Left(rows) => Some(rows),
211                    Either::Right(_) => None,
212                })
213            })
214            .boxed()
215    }
216    /// like sqlx::Query::fetch
217    /// Execute the query and return the generated results as a stream.
218    #[inline]
219    pub async fn fetch<'c, 'e, Adapter>(
220        &'q mut self,
221
222        db_adapter: Adapter,
223    ) -> BoxStream<'e, Result<DB::Row, Error>>
224    where
225        'c: 'e,
226        'q: 'e,
227        Adapter: BackendDB<'c, DB>,
228    {
229        self.fetch_many(db_adapter)
230            .await
231            .try_filter_map(|step| async move {
232                Ok(match step {
233                    Either::Left(_) => None,
234                    Either::Right(row) => Some(row),
235                })
236            })
237            .boxed()
238    }
239    /// like sqlx::Query::fetch_many
240    /// Execute multiple queries and return the generated results as a stream.
241    ///
242    /// For each query in the stream, any generated rows are returned first,
243    /// then the `QueryResult` with the number of rows affected.
244    #[inline]
245    #[allow(clippy::type_complexity)]
246    pub async fn fetch_many<'c, 'e, Adapter>(
247        &'q mut self,
248        db_adapter: Adapter,
249    ) -> BoxStream<'e, Result<Either<DB::QueryResult, DB::Row>, Error>>
250    where
251        'c: 'e,
252        'q: 'e,
253        Adapter: BackendDB<'c, DB>,
254    {
255        let res = Self::render_sql_with_adapter(
256            self.template.clone(),
257            db_adapter,
258            self.page_no,
259            self.page_size,
260        )
261        .await;
262
263        match res {
264            Ok((sql, arg, _db_type, executor)) => {
265                self.sql = sql;
266                let execute =
267                    SqlTemplateExecute::new(&self.sql, arg).set_persistent(self.persistent);
268                execute.fetch_many(executor)
269            }
270            Err(e) => stream::once(async move { Err(e) }).boxed(),
271        }
272    }
273
274    /// like sqlx::Query::fetch_all
275    /// Execute the query and return all the resulting rows collected into a [`Vec`].
276    ///
277    /// ### Note: beware result set size.
278    /// This will attempt to collect the full result set of the query into memory.
279    ///
280    /// To avoid exhausting available memory, ensure the result set has a known upper bound,
281    /// e.g. using `LIMIT`.
282    #[inline]
283    pub async fn fetch_all<'c, Adapter>(
284        &'q mut self,
285        db_adapter: Adapter,
286    ) -> Result<Vec<DB::Row>, Error>
287    where
288        Adapter: BackendDB<'c, DB>,
289    {
290        self.fetch(db_adapter).await.try_collect().await
291    }
292    /// like sqlx::Query::fetch_one
293    /// Execute the query, returning the first row or [`Error::RowNotFound`] otherwise.
294    ///
295    /// ### Note: for best performance, ensure the query returns at most one row.
296    /// Depending on the driver implementation, if your query can return more than one row,
297    /// it may lead to wasted CPU time and bandwidth on the database server.
298    ///
299    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
300    /// can result in a more optimal query plan.
301    ///
302    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
303    ///
304    /// Otherwise, you might want to add `LIMIT 1` to your query.
305    #[inline]
306    pub async fn fetch_one<'c, Adapter>(&'q mut self, db_adapter: Adapter) -> Result<DB::Row, Error>
307    where
308        Adapter: BackendDB<'c, DB>,
309    {
310        self.fetch_optional(db_adapter)
311            .await
312            .and_then(|row| match row {
313                Some(row) => Ok(row),
314                None => Err(Error::RowNotFound),
315            })
316    }
317    /// like sqlx::Query::fetch_optional
318    /// Execute the query, returning the first row or `None` otherwise.
319    ///
320    /// ### Note: for best performance, ensure the query returns at most one row.
321    /// Depending on the driver implementation, if your query can return more than one row,
322    /// it may lead to wasted CPU time and bandwidth on the database server.
323    ///
324    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
325    /// can result in a more optimal query plan.
326    ///
327    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
328    ///
329    /// Otherwise, you might want to add `LIMIT 1` to your query.
330    #[inline]
331    pub async fn fetch_optional<'c, Adapter>(
332        &'q mut self,
333
334        db_adapter: Adapter,
335    ) -> Result<Option<DB::Row>, Error>
336    where
337        Adapter: BackendDB<'c, DB>,
338    {
339        self.fetch(db_adapter).await.try_next().await
340    }
341
342    /// like sqlx::QueryAs::fetch
343    /// Execute the query and return the generated results as a stream.
344    pub async fn fetch_as<'c, 'e, Adapter, O>(
345        &'q mut self,
346
347        db_adapter: Adapter,
348    ) -> BoxStream<'e, Result<O, Error>>
349    where
350        'c: 'e,
351        'q: 'e,
352        Adapter: BackendDB<'c, DB>,
353        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
354    {
355        self.fetch_many_as(db_adapter)
356            .await
357            .try_filter_map(|step| async move { Ok(step.right()) })
358            .boxed()
359    }
360    /// like sqlx::QueryAs::fetch_many
361    /// Execute multiple queries and return the generated results as a stream
362    /// from each query, in a stream.
363    pub async fn fetch_many_as<'c, 'e, Adapter, O>(
364        &'q mut self,
365        db_adapter: Adapter,
366    ) -> BoxStream<'e, Result<Either<DB::QueryResult, O>, Error>>
367    where
368        'c: 'e,
369        'q: 'e,
370        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
371        Adapter: BackendDB<'c, DB>,
372    {
373        self.fetch_many(db_adapter)
374            .await
375            .map(|v| match v {
376                Ok(Either::Right(row)) => O::from_row(&row).map(Either::Right),
377                Ok(Either::Left(v)) => Ok(Either::Left(v)),
378                Err(e) => Err(e),
379            })
380            .boxed()
381    }
382    /// like sqlx::QueryAs::fetch_all
383    /// Execute the query and return all the resulting rows collected into a [`Vec`].
384    ///
385    /// ### Note: beware result set size.
386    /// This will attempt to collect the full result set of the query into memory.
387    ///
388    /// To avoid exhausting available memory, ensure the result set has a known upper bound,
389    /// e.g. using `LIMIT`.
390    #[inline]
391    pub async fn fetch_all_as<'c, Adapter, O>(
392        &'q mut self,
393        db_adapter: Adapter,
394    ) -> Result<Vec<O>, Error>
395    where
396        Adapter: BackendDB<'c, DB>,
397        O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
398    {
399        self.fetch_as(db_adapter).await.try_collect().await
400    }
401    /// like sqlx::QueryAs::fetch_one
402    /// Execute the query, returning the first row or [`Error::RowNotFound`] otherwise.
403    ///
404    /// ### Note: for best performance, ensure the query returns at most one row.
405    /// Depending on the driver implementation, if your query can return more than one row,
406    /// it may lead to wasted CPU time and bandwidth on the database server.
407    ///
408    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
409    /// can result in a more optimal query plan.
410    ///
411    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
412    ///
413    /// Otherwise, you might want to add `LIMIT 1` to your query.
414    pub async fn fetch_one_as<'c, Adapter, O>(&'q mut self, db_adapter: Adapter) -> Result<O, Error>
415    where
416        Adapter: BackendDB<'c, DB>,
417        O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
418    {
419        self.fetch_optional_as(db_adapter)
420            .await
421            .and_then(|row| row.ok_or(Error::RowNotFound))
422    }
423    /// like sqlx::QueryAs::fetch_optional
424    /// Execute the query, returning the first row or `None` otherwise.
425    ///
426    /// ### Note: for best performance, ensure the query returns at most one row.
427    /// Depending on the driver implementation, if your query can return more than one row,
428    /// it may lead to wasted CPU time and bandwidth on the database server.
429    ///
430    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
431    /// can result in a more optimal query plan.
432    ///
433    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
434    ///
435    /// Otherwise, you might want to add `LIMIT 1` to your query.
436    pub async fn fetch_optional_as<'c, Adapter, O>(
437        &'q mut self,
438
439        db_adapter: Adapter,
440    ) -> Result<Option<O>, Error>
441    where
442        Adapter: BackendDB<'c, DB>,
443        O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
444    {
445        let row = self.fetch_optional(db_adapter).await?;
446        if let Some(row) = row {
447            O::from_row(&row).map(Some)
448        } else {
449            Ok(None)
450        }
451    }
452}