Skip to main content

sqlx_askama_template/v3/
template_adapter.rs

1use std::marker::PhantomData;
2
3use crate::SqlTemplate;
4use askama::Result;
5use futures_core::{Stream, future::BoxFuture, stream::BoxStream};
6
7use futures_util::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, future};
8use sqlx_core::{
9    Either, Error, database::Database, encode::Encode, from_row::FromRow, types::Type,
10};
11
12use super::{DatabaseDialect, db_adapter::BackendDB, sql_template_execute::SqlTemplateExecute};
13
14/// Pagination metadata container
15#[derive(Debug, PartialEq, Eq)]
16pub struct PageInfo {
17    /// Total number of records
18    pub total: i64,
19    /// Records per page
20    pub page_size: i64,
21    /// Calculated page count
22    pub page_count: i64,
23}
24
25impl PageInfo {
26    /// Constructs new PageInfo with automatic page count calculation
27    ///
28    /// # Arguments
29    /// * `total` - Total records in dataset
30    /// * `page_size` - Desired records per page
31    pub fn new(total: i64, page_size: i64) -> PageInfo {
32        let mut page_count = total / page_size;
33        if total % page_size > 0 {
34            page_count += 1;
35        }
36        Self {
37            total,
38            page_size,
39            page_count,
40        }
41    }
42}
43/// Database adapter manager handling SQL rendering and execution
44///
45/// # Generic Parameters
46/// - `'q`: Query lifetime
47/// - `DB`: Database type
48/// - `T`: SQL template type
49pub struct DBAdapterManager<'q, DB, T>
50where
51    DB: Database,
52    T: SqlTemplate<'q, DB>,
53{
54    pub(crate) sql: String,
55    pub(crate) template: T,
56    persistent: bool,
57    _p: PhantomData<&'q DB>,
58    page_size: Option<i64>,
59    page_no: Option<i64>,
60}
61
62impl<'q, DB, T> DBAdapterManager<'q, DB, T>
63where
64    DB: Database,
65    T: SqlTemplate<'q, DB>,
66{
67    /// Creates new adapter with SQL buffer
68    ///
69    /// # Arguments
70    /// * `template` - SQL template instance
71    pub fn new(template: T) -> Self {
72        Self {
73            sql: String::new(),
74            template,
75            persistent: true,
76            page_no: None,
77            page_size: None,
78            _p: PhantomData,
79        }
80    }
81
82    pub fn sql(&self) -> &String {
83        &self.sql
84    }
85}
86impl<'q, 'c, 'e, DB, T> DBAdapterManager<'q, DB, T>
87where
88    DB: Database + Sync,
89    T: SqlTemplate<'q, DB> + Send + 'q,
90    i64: Encode<'q, DB> + Type<DB>,
91    DB::Arguments<'q>: 'q,
92    'q: 'e,
93    'c: 'e,
94{
95    /// Configures query persistence (default: true)
96    pub fn set_persistent(mut self, persistent: bool) -> Self {
97        self.persistent = persistent;
98        self
99    }
100    /// Executes count query for pagination
101    ///
102    /// # Arguments
103    /// * `db_adapter` - Database connection adapter
104    #[inline]
105    pub fn count<Adapter>(&'q mut self, db_adapter: Adapter) -> BoxFuture<'e, Result<i64, Error>>
106    where
107        Adapter: BackendDB<'c, DB> + 'c,
108        (i64,): for<'r> FromRow<'r, DB::Row>,
109    {
110        let template = self.template.clone();
111        let page_no = self.page_no;
112        let page_size = self.page_size;
113        async move {
114            let (db_type, executor) = db_adapter.backend_db().await?;
115            let f = db_type.get_encode_placeholder_fn();
116            let mut sql = String::new();
117            let mut arg = template.render_sql_with_encode_placeholder_fn(f, &mut sql)?;
118
119            if let (Some(page_no), Some(page_size)) = (page_no, page_size) {
120                let mut args = arg.unwrap_or_default();
121                db_type.write_page_sql(&mut sql, page_size, page_no, &mut args)?;
122                arg = Some(args);
123            }
124
125            db_type.write_count_sql(&mut sql);
126            self.sql = sql;
127            let execute = SqlTemplateExecute::new(&self.sql, arg).set_persistent(self.persistent);
128            let (count,): (i64,) = execute.fetch_one_as(executor).await?;
129            Ok(count)
130        }
131        .boxed()
132    }
133    /// Calculates complete pagination metadata
134    ///
135    /// # Arguments
136    /// * `page_size` - Records per page
137    /// * `db_adapter` - Database connection adapter
138    #[inline]
139    pub async fn count_page<Adapter>(
140        &'q mut self,
141        page_size: i64,
142        db_adapter: Adapter,
143    ) -> Result<PageInfo, Error>
144    where
145        Adapter: BackendDB<'c, DB> + 'c,
146        (i64,): for<'r> FromRow<'r, DB::Row>,
147    {
148        let count = self.count(db_adapter).await?;
149        Ok(PageInfo::new(count, page_size))
150    }
151    /// Sets pagination parameters
152    pub fn set_page(mut self, page_size: i64, page_no: i64) -> Self {
153        self.page_no = Some(page_no);
154        self.page_size = Some(page_size);
155        self
156    }
157
158    /// like sqlx::Query::execute
159    /// Execute the query and return the number of rows affected.
160    #[inline]
161    pub async fn execute<Adapter>(
162        &'q mut self,
163        db_adapter: Adapter,
164    ) -> Result<DB::QueryResult, Error>
165    where
166        Adapter: BackendDB<'c, DB> + 'c,
167    {
168        self.execute_many(db_adapter).try_collect().await
169    }
170    /// like    sqlx::Query::execute_many
171    /// Execute multiple queries and return the rows affected from each query, in a stream.
172    #[inline]
173    pub fn execute_many<Adapter>(
174        &'q mut self,
175
176        db_adapter: Adapter,
177    ) -> impl Stream<Item = Result<DB::QueryResult, Error>>
178    where
179        Adapter: BackendDB<'c, DB> + 'c,
180    {
181        self.fetch_many(db_adapter)
182            .try_filter_map(|step| async move {
183                Ok(match step {
184                    Either::Left(rows) => Some(rows),
185                    Either::Right(_) => None,
186                })
187            })
188    }
189    /// like sqlx::Query::fetch
190    /// Execute the query and return the generated results as a stream.
191    #[inline]
192    pub fn fetch<Adapter>(
193        &'q mut self,
194        db_adapter: Adapter,
195    ) -> impl Stream<Item = Result<DB::Row, Error>>
196    where
197        Adapter: BackendDB<'c, DB> + 'c,
198    {
199        self.fetch_many(db_adapter)
200            .try_filter_map(|step| async move {
201                Ok(match step {
202                    Either::Left(_) => None,
203                    Either::Right(row) => Some(row),
204                })
205            })
206    }
207    /// like sqlx::Query::fetch_many
208    /// Execute multiple queries and return the generated results as a stream.
209    ///
210    /// For each query in the stream, any generated rows are returned first,
211    /// then the `QueryResult` with the number of rows affected.
212    #[inline]
213    #[allow(clippy::type_complexity)]
214    pub fn fetch_many<Adapter>(
215        &'q mut self,
216        db_adapter: Adapter,
217    ) -> BoxStream<'e, Result<Either<DB::QueryResult, DB::Row>, Error>>
218    where
219        Adapter: BackendDB<'c, DB> + 'c,
220    {
221        let template = self.template.clone();
222        let page_no = self.page_no;
223        let page_size = self.page_size;
224        Box::pin(async_stream::try_stream! {
225            let (db_type, executor) = db_adapter.backend_db().await?;
226            let f = db_type.get_encode_placeholder_fn();
227            let mut sql = String::new();
228            let mut arg = template.render_sql_with_encode_placeholder_fn(f, &mut sql)?;
229
230            if let (Some(page_no), Some(page_size)) = (page_no, page_size) {
231                let mut args = arg.unwrap_or_default();
232                db_type.write_page_sql(&mut sql, page_size, page_no, &mut args)?;
233                arg = Some(args);
234            }
235
236            self.sql = sql;
237            let execute = SqlTemplateExecute::new(&self.sql, arg).set_persistent(self.persistent);
238            let mut stream = execute.fetch_many(executor);
239            while let Some(item) = stream.try_next().await? {
240                yield item;
241            }
242        })
243    }
244
245    /// like sqlx::Query::fetch_all
246    /// Execute the query and return all the resulting rows collected into a [`Vec`].
247    ///
248    /// ### Note: beware result set size.
249    /// This will attempt to collect the full result set of the query into memory.
250    ///
251    /// To avoid exhausting available memory, ensure the result set has a known upper bound,
252    /// e.g. using `LIMIT`.
253    #[inline]
254    pub async fn fetch_all<Adapter>(
255        &'q mut self,
256        db_adapter: Adapter,
257    ) -> Result<Vec<DB::Row>, Error>
258    where
259        Adapter: BackendDB<'c, DB> + 'c,
260    {
261        self.fetch(db_adapter).try_collect().await
262    }
263    /// like sqlx::Query::fetch_one
264    /// Execute the query, returning the first row or [`Error::RowNotFound`] otherwise.
265    ///
266    /// ### Note: for best performance, ensure the query returns at most one row.
267    /// Depending on the driver implementation, if your query can return more than one row,
268    /// it may lead to wasted CPU time and bandwidth on the database server.
269    ///
270    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
271    /// can result in a more optimal query plan.
272    ///
273    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
274    ///
275    /// Otherwise, you might want to add `LIMIT 1` to your query.
276    #[inline]
277    pub async fn fetch_one<Adapter>(&'q mut self, db_adapter: Adapter) -> Result<DB::Row, Error>
278    where
279        Adapter: BackendDB<'c, DB> + 'c,
280    {
281        self.fetch_optional(db_adapter)
282            .and_then(|row| match row {
283                Some(row) => future::ok(row),
284                None => future::err(Error::RowNotFound),
285            })
286            .await
287    }
288    /// like sqlx::Query::fetch_optional
289    /// Execute the query, returning the first row or `None` otherwise.
290    ///
291    /// ### Note: for best performance, ensure the query returns at most one row.
292    /// Depending on the driver implementation, if your query can return more than one row,
293    /// it may lead to wasted CPU time and bandwidth on the database server.
294    ///
295    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
296    /// can result in a more optimal query plan.
297    ///
298    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
299    ///
300    /// Otherwise, you might want to add `LIMIT 1` to your query.
301    #[inline]
302    pub async fn fetch_optional<Adapter>(
303        &'q mut self,
304
305        db_adapter: Adapter,
306    ) -> Result<Option<DB::Row>, Error>
307    where
308        Adapter: BackendDB<'c, DB> + 'c,
309    {
310        let row = self.fetch_many(db_adapter).try_next().await?;
311        match row {
312            Some(Either::Right(row)) => Ok(Some(row)),
313            Some(Either::Left(_)) => Ok(None),
314            None => Ok(None),
315        }
316    }
317
318    /// like sqlx::QueryAs::fetch
319    /// Execute the query and return the generated results as a stream.
320    pub async fn fetch_as<Adapter, O>(
321        &'q mut self,
322
323        db_adapter: Adapter,
324    ) -> impl Stream<Item = Result<O, Error>>
325    where
326        Adapter: BackendDB<'c, DB> + 'c,
327        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
328    {
329        self.fetch_many_as(db_adapter)
330            .try_filter_map(|step| async move { Ok(step.right()) })
331    }
332    /// like sqlx::QueryAs::fetch_many
333    /// Execute multiple queries and return the generated results as a stream
334    /// from each query, in a stream.
335    pub fn fetch_many_as<Adapter, O>(
336        &'q mut self,
337        db_adapter: Adapter,
338    ) -> BoxStream<'e, Result<Either<DB::QueryResult, O>, Error>>
339    where
340        'q: 'e,
341        Adapter: BackendDB<'c, DB> + 'c,
342        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
343    {
344        let template = self.template.clone();
345        let page_no = self.page_no;
346        let page_size = self.page_size;
347        Box::pin(async_stream::try_stream! {
348        let (db_type, executor) = db_adapter.backend_db().await?;
349        let f = db_type.get_encode_placeholder_fn();
350        let mut sql = String::new();
351        let mut arg = template.render_sql_with_encode_placeholder_fn(f, &mut sql)?;
352
353        if let (Some(page_no), Some(page_size)) = (page_no, page_size) {
354            let mut args = arg.unwrap_or_default();
355            db_type.write_page_sql(&mut sql, page_size, page_no, &mut args)?;
356            arg = Some(args);
357        }
358
359        self.sql = sql;
360        let execute = SqlTemplateExecute::new(&self.sql, arg).set_persistent(self.persistent);
361        let mut stream = execute.fetch_many(executor).map(|v| match v {
362            Ok(Either::Right(row)) => O::from_row(&row).map(Either::Right),
363            Ok(Either::Left(v)) => Ok(Either::Left(v)),
364            Err(e) => Err(e),
365        });
366        while let Some(item) = stream.try_next().await? {
367            yield item;
368        }
369          })
370    }
371    /// like sqlx::QueryAs::fetch_all
372    /// Execute the query and return all the resulting rows collected into a [`Vec`].
373    ///
374    /// ### Note: beware result set size.
375    /// This will attempt to collect the full result set of the query into memory.
376    ///
377    /// To avoid exhausting available memory, ensure the result set has a known upper bound,
378    /// e.g. using `LIMIT`.
379    #[inline]
380    pub async fn fetch_all_as<Adapter, O>(
381        &'q mut self,
382        db_adapter: Adapter,
383    ) -> Result<Vec<O>, Error>
384    where
385        Adapter: BackendDB<'c, DB> + 'c,
386        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
387    {
388        self.fetch_as(db_adapter).await.try_collect().await
389    }
390    /// like sqlx::QueryAs::fetch_one
391    /// Execute the query, returning the first row or [`Error::RowNotFound`] otherwise.
392    ///
393    /// ### Note: for best performance, ensure the query returns at most one row.
394    /// Depending on the driver implementation, if your query can return more than one row,
395    /// it may lead to wasted CPU time and bandwidth on the database server.
396    ///
397    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
398    /// can result in a more optimal query plan.
399    ///
400    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
401    ///
402    /// Otherwise, you might want to add `LIMIT 1` to your query.
403    pub async fn fetch_one_as<Adapter, O>(&'q mut self, db_adapter: Adapter) -> Result<O, Error>
404    where
405        Adapter: BackendDB<'c, DB> + 'c,
406        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
407    {
408        self.fetch_optional_as(db_adapter)
409            .and_then(|o| match o {
410                Some(o) => future::ok(o),
411                None => future::err(Error::RowNotFound),
412            })
413            .await
414    }
415    /// like sqlx::QueryAs::fetch_optional
416    /// Execute the query, returning the first row or `None` otherwise.
417    ///
418    /// ### Note: for best performance, ensure the query returns at most one row.
419    /// Depending on the driver implementation, if your query can return more than one row,
420    /// it may lead to wasted CPU time and bandwidth on the database server.
421    ///
422    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
423    /// can result in a more optimal query plan.
424    ///
425    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
426    ///
427    /// Otherwise, you might want to add `LIMIT 1` to your query.
428    pub async fn fetch_optional_as<Adapter, O>(
429        &'q mut self,
430
431        db_adapter: Adapter,
432    ) -> Result<Option<O>, Error>
433    where
434        Adapter: BackendDB<'c, DB> + 'c,
435        O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
436    {
437        self.fetch_optional(db_adapter)
438            .and_then(|opt_row| async move {
439                if let Some(row) = opt_row {
440                    O::from_row(&row).map(Some)
441                } else {
442                    Ok(None)
443                }
444            })
445            .await
446    }
447}