Skip to main content

sqlx_askama_template/v3/
template_adapter.rs

1use std::marker::PhantomData;
2
3use crate::SqlTemplate;
4use askama::Result;
5use futures_core::{future::BoxFuture, stream::BoxStream};
6
7use futures_util::{FutureExt, StreamExt, TryStreamExt, stream};
8use sqlx_core::{
9    Either, Error, database::Database, encode::Encode, executor::Executor, from_row::FromRow,
10    types::Type,
11};
12
13use super::{DatabaseDialect, db_adapter::BackendDB, sql_template_execute::SqlTemplateExecute};
14
15/// Pagination metadata container
16#[derive(Debug)]
17pub struct PageInfo {
18    /// Total number of records
19    pub total: i64,
20    /// Records per page
21    pub page_size: i64,
22    /// Calculated page count
23    pub page_count: i64,
24}
25
26impl PageInfo {
27    /// Constructs new PageInfo with automatic page count calculation
28    ///
29    /// # Arguments
30    /// * `total` - Total records in dataset
31    /// * `page_size` - Desired records per page
32    pub fn new(total: i64, page_size: i64) -> PageInfo {
33        let mut page_count = total / page_size;
34        if total % page_size > 0 {
35            page_count += 1;
36        }
37        Self {
38            total,
39            page_size,
40            page_count,
41        }
42    }
43}
44/// Database adapter manager handling SQL rendering and execution
45///
46/// # Generic Parameters
47/// - `'q`: Query lifetime
48/// - `DB`: Database type
49/// - `T`: SQL template type
50pub struct DBAdapterManager<'q, DB, T>
51where
52    DB: Database,
53    T: SqlTemplate<'q, DB>,
54{
55    pub(crate) sql: String,
56    pub(crate) template: T,
57    persistent: bool,
58    _p: PhantomData<&'q DB>,
59    page_size: Option<i64>,
60    page_no: Option<i64>,
61}
62
63impl<'q, DB, T> DBAdapterManager<'q, DB, T>
64where
65    DB: Database,
66    T: SqlTemplate<'q, DB>,
67{
68    /// Creates new adapter with SQL buffer
69    ///
70    /// # Arguments
71    /// * `template` - SQL template instance
72    pub fn new(template: T) -> Self {
73        Self {
74            sql: String::new(),
75            template,
76            persistent: true,
77            page_no: None,
78            page_size: None,
79            _p: PhantomData,
80        }
81    }
82
83    pub fn sql(&self) -> &String {
84        &self.sql
85    }
86}
87impl<'q, DB, T> DBAdapterManager<'q, DB, T>
88where
89    DB: Database,
90    T: SqlTemplate<'q, DB> + Send + 'q,
91    i64: Encode<'q, DB> + Type<DB>,
92{
93    /// Configures query persistence (default: true)
94    pub fn set_persistent(mut self, persistent: bool) -> Self {
95        self.persistent = persistent;
96        self
97    }
98    /// Executes count query for pagination
99    ///
100    /// # Arguments
101    /// * `db_adapter` - Database connection adapter
102    #[inline]
103    pub async fn count<Adapter>(&'q mut self, db_adapter: Adapter) -> Result<i64, Error>
104    where
105        (i64,): for<'r> FromRow<'r, DB::Row>,
106        Adapter: BackendDB<'q, DB> + 'q,
107    {
108        let (mut sql, arg, db_type, executor) = Self::render_sql_with_adapter(
109            self.template.clone(),
110            db_adapter,
111            self.page_no,
112            self.page_size,
113        )
114        .await?;
115
116        db_type.write_count_sql(&mut sql);
117        self.sql = sql;
118        let execute = SqlTemplateExecute::new(&self.sql, arg).set_persistent(self.persistent);
119        let (count,): (i64,) = execute.fetch_one_as(executor).await?;
120        Ok(count)
121    }
122    /// Calculates complete pagination metadata
123    ///
124    /// # Arguments
125    /// * `page_size` - Records per page
126    /// * `db_adapter` - Database connection adapter
127    #[inline]
128    pub async fn count_page<Adapter>(
129        &'q mut self,
130
131        page_size: i64,
132        db_adapter: Adapter,
133    ) -> Result<PageInfo, Error>
134    where
135        Adapter: BackendDB<'q, DB> + 'q,
136        (i64,): for<'r> FromRow<'r, DB::Row>,
137    {
138        let count = self.count(db_adapter).await?;
139
140        Ok(PageInfo::new(count, page_size))
141    }
142    /// Sets pagination parameters
143    pub fn set_page(mut self, page_size: i64, page_no: i64) -> Self {
144        self.page_no = Some(page_no);
145        self.page_size = Some(page_size);
146        self
147    }
148    /// Core SQL rendering method with pagination support
149    #[allow(clippy::type_complexity)]
150    #[inline]
151    pub fn render_sql_with_adapter<Adapter>(
152        template: T,
153
154        db_adapter: Adapter,
155        page_no: Option<i64>,
156        page_size: Option<i64>,
157    ) -> BoxFuture<
158        'q,
159        Result<
160            (
161                String,
162                Option<DB::Arguments<'q>>,
163                impl DatabaseDialect,
164                impl Executor<'q, Database = DB> + 'q,
165            ),
166            Error,
167        >,
168    >
169    where
170        Adapter: BackendDB<'q, DB> + 'q,
171    {
172        async move {
173            let (db_type, executor) = db_adapter.backend_db().await?;
174            let f = db_type.get_encode_placeholder_fn();
175            let mut sql = String::new();
176            let mut arg = template.render_sql_with_encode_placeholder_fn(f, &mut sql)?;
177
178            if let (Some(page_no), Some(page_size)) = (page_no, page_size) {
179                let mut args = arg.unwrap_or_default();
180                db_type.write_page_sql(&mut sql, page_size, page_no, &mut args)?;
181                arg = Some(args);
182            }
183            Ok((sql, arg, db_type, executor))
184        }
185        .boxed()
186    }
187
188    /// like sqlx::Query::execute
189    /// Execute the query and return the number of rows affected.
190    #[inline]
191    pub async fn execute<'c, Adapter>(
192        &'q mut self,
193        db_adapter: Adapter,
194    ) -> Result<DB::QueryResult, Error>
195    where
196        Adapter: BackendDB<'q, DB> + 'q,
197    {
198        self.execute_many(db_adapter).await.try_collect().await
199    }
200    /// like    sqlx::Query::execute_many
201    /// Execute multiple queries and return the rows affected from each query, in a stream.
202    #[inline]
203    pub async fn execute_many<'e, Adapter>(
204        &'q mut self,
205
206        db_adapter: Adapter,
207    ) -> BoxStream<'e, Result<DB::QueryResult, Error>>
208    where
209        'q: 'e,
210        Adapter: BackendDB<'q, DB> + 'q,
211    {
212        self.fetch_many(db_adapter)
213            .await
214            .try_filter_map(|step| async move {
215                Ok(match step {
216                    Either::Left(rows) => Some(rows),
217                    Either::Right(_) => None,
218                })
219            })
220            .boxed()
221    }
222    /// like sqlx::Query::fetch
223    /// Execute the query and return the generated results as a stream.
224    #[inline]
225    pub async fn fetch<'e, Adapter>(
226        &'q mut self,
227
228        db_adapter: Adapter,
229    ) -> BoxStream<'e, Result<DB::Row, Error>>
230    where
231        Adapter: BackendDB<'q, DB> + 'q,
232        'q: 'e,
233    {
234        self.fetch_many(db_adapter)
235            .await
236            .try_filter_map(|step| async move {
237                Ok(match step {
238                    Either::Left(_) => None,
239                    Either::Right(row) => Some(row),
240                })
241            })
242            .boxed()
243    }
244    /// like sqlx::Query::fetch_many
245    /// Execute multiple queries and return the generated results as a stream.
246    ///
247    /// For each query in the stream, any generated rows are returned first,
248    /// then the `QueryResult` with the number of rows affected.
249    #[inline]
250    #[allow(clippy::type_complexity)]
251    pub async fn fetch_many<'e, Adapter>(
252        &'q mut self,
253        db_adapter: Adapter,
254    ) -> BoxStream<'e, Result<Either<DB::QueryResult, DB::Row>, Error>>
255    where
256        'q: 'e,
257        Adapter: BackendDB<'q, DB> + 'q,
258    {
259        let res = Self::render_sql_with_adapter(
260            self.template.clone(),
261            db_adapter,
262            self.page_no,
263            self.page_size,
264        )
265        .await;
266
267        match res {
268            Ok((sql, arg, _db_type, executor)) => {
269                self.sql = sql;
270                let execute =
271                    SqlTemplateExecute::new(&self.sql, arg).set_persistent(self.persistent);
272                execute.fetch_many(executor)
273            }
274            Err(e) => stream::once(async move { Err(e) }).boxed(),
275        }
276    }
277
278    /// like sqlx::Query::fetch_all
279    /// Execute the query and return all the resulting rows collected into a [`Vec`].
280    ///
281    /// ### Note: beware result set size.
282    /// This will attempt to collect the full result set of the query into memory.
283    ///
284    /// To avoid exhausting available memory, ensure the result set has a known upper bound,
285    /// e.g. using `LIMIT`.
286    #[inline]
287    pub async fn fetch_all<C, Adapter>(
288        &'q mut self,
289        db_adapter: Adapter,
290    ) -> Result<Vec<DB::Row>, Error>
291    where
292        Adapter: BackendDB<'q, DB> + 'q,
293    {
294        self.fetch(db_adapter).await.try_collect().await
295    }
296    /// like sqlx::Query::fetch_one
297    /// Execute the query, returning the first row or [`Error::RowNotFound`] otherwise.
298    ///
299    /// ### Note: for best performance, ensure the query returns at most one row.
300    /// Depending on the driver implementation, if your query can return more than one row,
301    /// it may lead to wasted CPU time and bandwidth on the database server.
302    ///
303    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
304    /// can result in a more optimal query plan.
305    ///
306    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
307    ///
308    /// Otherwise, you might want to add `LIMIT 1` to your query.
309    #[inline]
310    pub async fn fetch_one<Adapter>(&'q mut self, db_adapter: Adapter) -> Result<DB::Row, Error>
311    where
312        Adapter: BackendDB<'q, DB> + 'q,
313    {
314        self.fetch_optional(db_adapter)
315            .await
316            .and_then(|row| match row {
317                Some(row) => Ok(row),
318                None => Err(Error::RowNotFound),
319            })
320    }
321    /// like sqlx::Query::fetch_optional
322    /// Execute the query, returning the first row or `None` otherwise.
323    ///
324    /// ### Note: for best performance, ensure the query returns at most one row.
325    /// Depending on the driver implementation, if your query can return more than one row,
326    /// it may lead to wasted CPU time and bandwidth on the database server.
327    ///
328    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
329    /// can result in a more optimal query plan.
330    ///
331    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
332    ///
333    /// Otherwise, you might want to add `LIMIT 1` to your query.
334    #[inline]
335    pub async fn fetch_optional<Adapter>(
336        &'q mut self,
337
338        db_adapter: Adapter,
339    ) -> Result<Option<DB::Row>, Error>
340    where
341        Adapter: BackendDB<'q, DB> + 'q,
342    {
343        self.fetch(db_adapter).await.try_next().await
344    }
345
346    /// like sqlx::QueryAs::fetch
347    /// Execute the query and return the generated results as a stream.
348    pub async fn fetch_as<'e, Adapter, O>(
349        &'q mut self,
350
351        db_adapter: Adapter,
352    ) -> BoxStream<'e, Result<O, Error>>
353    where
354        'q: 'e,
355        Adapter: BackendDB<'q, DB> + 'q,
356        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
357    {
358        self.fetch_many_as(db_adapter)
359            .await
360            .try_filter_map(|step| async move { Ok(step.right()) })
361            .boxed()
362    }
363    /// like sqlx::QueryAs::fetch_many
364    /// Execute multiple queries and return the generated results as a stream
365    /// from each query, in a stream.
366    pub async fn fetch_many_as<'e, Adapter, O>(
367        &'q mut self,
368        db_adapter: Adapter,
369    ) -> BoxStream<'e, Result<Either<DB::QueryResult, O>, Error>>
370    where
371        'q: 'e,
372        Adapter: BackendDB<'q, DB> + 'q,
373        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
374    {
375        self.fetch_many(db_adapter)
376            .await
377            .map(|v| match v {
378                Ok(Either::Right(row)) => O::from_row(&row).map(Either::Right),
379                Ok(Either::Left(v)) => Ok(Either::Left(v)),
380                Err(e) => Err(e),
381            })
382            .boxed()
383    }
384    /// like sqlx::QueryAs::fetch_all
385    /// Execute the query and return all the resulting rows collected into a [`Vec`].
386    ///
387    /// ### Note: beware result set size.
388    /// This will attempt to collect the full result set of the query into memory.
389    ///
390    /// To avoid exhausting available memory, ensure the result set has a known upper bound,
391    /// e.g. using `LIMIT`.
392    #[inline]
393    pub async fn fetch_all_as<Adapter, O>(
394        &'q mut self,
395        db_adapter: Adapter,
396    ) -> Result<Vec<O>, Error>
397    where
398        Adapter: BackendDB<'q, DB> + 'q,
399        O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
400    {
401        self.fetch_as(db_adapter).await.try_collect().await
402    }
403    /// like sqlx::QueryAs::fetch_one
404    /// Execute the query, returning the first row or [`Error::RowNotFound`] otherwise.
405    ///
406    /// ### Note: for best performance, ensure the query returns at most one row.
407    /// Depending on the driver implementation, if your query can return more than one row,
408    /// it may lead to wasted CPU time and bandwidth on the database server.
409    ///
410    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
411    /// can result in a more optimal query plan.
412    ///
413    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
414    ///
415    /// Otherwise, you might want to add `LIMIT 1` to your query.
416    pub async fn fetch_one_as<Adapter, O>(&'q mut self, db_adapter: Adapter) -> Result<O, Error>
417    where
418        Adapter: BackendDB<'q, DB> + 'q,
419        O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
420    {
421        self.fetch_optional_as(db_adapter)
422            .await
423            .and_then(|row| row.ok_or(Error::RowNotFound))
424    }
425    /// like sqlx::QueryAs::fetch_optional
426    /// Execute the query, returning the first row or `None` otherwise.
427    ///
428    /// ### Note: for best performance, ensure the query returns at most one row.
429    /// Depending on the driver implementation, if your query can return more than one row,
430    /// it may lead to wasted CPU time and bandwidth on the database server.
431    ///
432    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
433    /// can result in a more optimal query plan.
434    ///
435    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
436    ///
437    /// Otherwise, you might want to add `LIMIT 1` to your query.
438    pub async fn fetch_optional_as<Adapter, O>(
439        &'q mut self,
440
441        db_adapter: Adapter,
442    ) -> Result<Option<O>, Error>
443    where
444        Adapter: BackendDB<'q, DB> + 'q,
445        O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
446    {
447        let row = self.fetch_optional(db_adapter).await?;
448        if let Some(row) = row {
449            O::from_row(&row).map(Some)
450        } else {
451            Ok(None)
452        }
453    }
454}