Skip to main content

sqlx_askama_template/v3/
template_adapter.rs

1use std::marker::PhantomData;
2
3use crate::SqlTemplate;
4use askama::Result;
5use futures_core::{Stream, future::BoxFuture, stream::BoxStream};
6
7use futures_util::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, future};
8use sqlx_core::{
9    Either, Error, database::Database, encode::Encode, from_row::FromRow, types::Type,
10};
11
12use super::{DatabaseDialect, db_adapter::BackendDB, sql_template_execute::SqlTemplateExecute};
13
14/// Pagination metadata container
15#[derive(Debug, PartialEq, Eq)]
16pub struct PageInfo {
17    /// Total number of records
18    pub total: i64,
19    /// Records per page
20    pub page_size: i64,
21    /// Calculated page count
22    pub page_count: i64,
23}
24
25impl PageInfo {
26    /// Constructs new PageInfo with automatic page count calculation
27    ///
28    /// # Arguments
29    /// * `total` - Total records in dataset
30    /// * `page_size` - Desired records per page
31    pub fn new(total: i64, page_size: i64) -> PageInfo {
32        let mut page_count = total / page_size;
33        if total % page_size > 0 {
34            page_count += 1;
35        }
36        Self {
37            total,
38            page_size,
39            page_count,
40        }
41    }
42}
43/// Database adapter manager handling SQL rendering and execution
44///
45/// # Generic Parameters
46/// - `'q`: Query lifetime
47/// - `DB`: Database type
48/// - `T`: SQL template type
49pub struct DBAdapterManager<'q, DB, T>
50where
51    DB: Database,
52    T: SqlTemplate<'q, DB>,
53{
54    pub(crate) sql: String,
55    pub(crate) template: T,
56    persistent: bool,
57    _p: PhantomData<&'q DB>,
58    page_size: Option<i64>,
59    page_no: Option<i64>,
60}
61
62impl<'q, DB, T> DBAdapterManager<'q, DB, T>
63where
64    DB: Database,
65    T: SqlTemplate<'q, DB>,
66{
67    /// Creates new adapter with SQL buffer
68    ///
69    /// # Arguments
70    /// * `template` - SQL template instance
71    pub fn new(template: T) -> Self {
72        Self {
73            sql: String::new(),
74            template,
75            persistent: true,
76            page_no: None,
77            page_size: None,
78            _p: PhantomData,
79        }
80    }
81
82    pub fn sql(&self) -> &String {
83        &self.sql
84    }
85}
86impl<'q, 'c, 'e, DB, T> DBAdapterManager<'q, DB, T>
87where
88    DB: Database + Sync,
89    T: SqlTemplate<'q, DB> + Send + 'q,
90    i64: Encode<'q, DB> + Type<DB>,
91    DB::Arguments<'q>: 'q,
92    'q: 'e,
93    'c: 'e,
94{
95    /// Configures query persistence (default: true)
96    pub fn set_persistent(mut self, persistent: bool) -> Self {
97        self.persistent = persistent;
98        self
99    }
100    /// Executes count query for pagination
101    ///
102    /// # Arguments
103    /// * `db_adapter` - Database connection adapter
104    #[inline]
105    pub fn count<Adapter>(&'q mut self, db_adapter: Adapter) -> BoxFuture<'e, Result<i64, Error>>
106    where
107        Adapter: BackendDB<'c, DB> + 'c,
108        (i64,): for<'r> FromRow<'r, DB::Row>,
109    {
110        let template = self.template.clone();
111
112        async move {
113            let (db_type, executor) = db_adapter.backend_db().await?;
114            let f = db_type.get_encode_placeholder_fn();
115            let mut sql = String::new();
116            let arg = template.render_sql_with_encode_placeholder_fn(f, &mut sql)?;
117
118            db_type.write_count_sql(&mut sql);
119            self.sql = sql;
120            let execute = SqlTemplateExecute::new(&self.sql, arg).set_persistent(self.persistent);
121            let (count,): (i64,) = execute.fetch_one_as(executor).await?;
122            Ok(count)
123        }
124        .boxed()
125    }
126    /// Calculates complete pagination metadata
127    ///
128    /// # Arguments
129    /// * `page_size` - Records per page
130    /// * `db_adapter` - Database connection adapter
131    #[inline]
132    pub async fn count_page<Adapter>(
133        &'q mut self,
134        page_size: i64,
135        db_adapter: Adapter,
136    ) -> Result<PageInfo, Error>
137    where
138        Adapter: BackendDB<'c, DB> + 'c,
139        (i64,): for<'r> FromRow<'r, DB::Row>,
140    {
141        let count = self.count(db_adapter).await?;
142        Ok(PageInfo::new(count, page_size))
143    }
144    /// Sets pagination parameters
145    pub fn set_page(mut self, page_size: i64, page_no: i64) -> Self {
146        self.page_no = Some(page_no);
147        self.page_size = Some(page_size);
148        self
149    }
150
151    /// like sqlx::Query::execute
152    /// Execute the query and return the number of rows affected.
153    #[inline]
154    pub async fn execute<Adapter>(
155        &'q mut self,
156        db_adapter: Adapter,
157    ) -> Result<DB::QueryResult, Error>
158    where
159        Adapter: BackendDB<'c, DB> + 'c,
160    {
161        self.execute_many(db_adapter).try_collect().await
162    }
163    /// like    sqlx::Query::execute_many
164    /// Execute multiple queries and return the rows affected from each query, in a stream.
165    #[inline]
166    pub fn execute_many<Adapter>(
167        &'q mut self,
168
169        db_adapter: Adapter,
170    ) -> impl Stream<Item = Result<DB::QueryResult, Error>>
171    where
172        Adapter: BackendDB<'c, DB> + 'c,
173    {
174        self.fetch_many(db_adapter)
175            .try_filter_map(|step| async move {
176                Ok(match step {
177                    Either::Left(rows) => Some(rows),
178                    Either::Right(_) => None,
179                })
180            })
181    }
182    /// like sqlx::Query::fetch
183    /// Execute the query and return the generated results as a stream.
184    #[inline]
185    pub fn fetch<Adapter>(
186        &'q mut self,
187        db_adapter: Adapter,
188    ) -> impl Stream<Item = Result<DB::Row, Error>>
189    where
190        Adapter: BackendDB<'c, DB> + 'c,
191    {
192        self.fetch_many(db_adapter)
193            .try_filter_map(|step| async move {
194                Ok(match step {
195                    Either::Left(_) => None,
196                    Either::Right(row) => Some(row),
197                })
198            })
199    }
200    /// like sqlx::Query::fetch_many
201    /// Execute multiple queries and return the generated results as a stream.
202    ///
203    /// For each query in the stream, any generated rows are returned first,
204    /// then the `QueryResult` with the number of rows affected.
205    #[inline]
206    #[allow(clippy::type_complexity)]
207    pub fn fetch_many<Adapter>(
208        &'q mut self,
209        db_adapter: Adapter,
210    ) -> BoxStream<'e, Result<Either<DB::QueryResult, DB::Row>, Error>>
211    where
212        Adapter: BackendDB<'c, DB> + 'c,
213    {
214        let template = self.template.clone();
215        let page_no = self.page_no;
216        let page_size = self.page_size;
217        Box::pin(async_stream::try_stream! {
218            let (db_type, executor) = db_adapter.backend_db().await?;
219            let f = db_type.get_encode_placeholder_fn();
220            let mut sql = String::new();
221            let mut arg = template.render_sql_with_encode_placeholder_fn(f, &mut sql)?;
222
223            if let (Some(page_no), Some(page_size)) = (page_no, page_size) {
224                let mut args = arg.unwrap_or_default();
225                db_type.write_page_sql(&mut sql, page_size, page_no, &mut args)?;
226                arg = Some(args);
227            }
228
229            self.sql = sql;
230            let execute = SqlTemplateExecute::new(&self.sql, arg).set_persistent(self.persistent);
231            let mut stream = execute.fetch_many(executor);
232            while let Some(item) = stream.try_next().await? {
233                yield item;
234            }
235        })
236    }
237
238    /// like sqlx::Query::fetch_all
239    /// Execute the query and return all the resulting rows collected into a [`Vec`].
240    ///
241    /// ### Note: beware result set size.
242    /// This will attempt to collect the full result set of the query into memory.
243    ///
244    /// To avoid exhausting available memory, ensure the result set has a known upper bound,
245    /// e.g. using `LIMIT`.
246    #[inline]
247    pub async fn fetch_all<Adapter>(
248        &'q mut self,
249        db_adapter: Adapter,
250    ) -> Result<Vec<DB::Row>, Error>
251    where
252        Adapter: BackendDB<'c, DB> + 'c,
253    {
254        self.fetch(db_adapter).try_collect().await
255    }
256    /// like sqlx::Query::fetch_one
257    /// Execute the query, returning the first row or [`Error::RowNotFound`] otherwise.
258    ///
259    /// ### Note: for best performance, ensure the query returns at most one row.
260    /// Depending on the driver implementation, if your query can return more than one row,
261    /// it may lead to wasted CPU time and bandwidth on the database server.
262    ///
263    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
264    /// can result in a more optimal query plan.
265    ///
266    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
267    ///
268    /// Otherwise, you might want to add `LIMIT 1` to your query.
269    #[inline]
270    pub async fn fetch_one<Adapter>(&'q mut self, db_adapter: Adapter) -> Result<DB::Row, Error>
271    where
272        Adapter: BackendDB<'c, DB> + 'c,
273    {
274        self.fetch_optional(db_adapter)
275            .and_then(|row| match row {
276                Some(row) => future::ok(row),
277                None => future::err(Error::RowNotFound),
278            })
279            .await
280    }
281    /// like sqlx::Query::fetch_optional
282    /// Execute the query, returning the first row or `None` otherwise.
283    ///
284    /// ### Note: for best performance, ensure the query returns at most one row.
285    /// Depending on the driver implementation, if your query can return more than one row,
286    /// it may lead to wasted CPU time and bandwidth on the database server.
287    ///
288    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
289    /// can result in a more optimal query plan.
290    ///
291    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
292    ///
293    /// Otherwise, you might want to add `LIMIT 1` to your query.
294    #[inline]
295    pub async fn fetch_optional<Adapter>(
296        &'q mut self,
297
298        db_adapter: Adapter,
299    ) -> Result<Option<DB::Row>, Error>
300    where
301        Adapter: BackendDB<'c, DB> + 'c,
302    {
303        let row = self.fetch_many(db_adapter).try_next().await?;
304        match row {
305            Some(Either::Right(row)) => Ok(Some(row)),
306            Some(Either::Left(_)) => Ok(None),
307            None => Ok(None),
308        }
309    }
310
311    /// like sqlx::QueryAs::fetch
312    /// Execute the query and return the generated results as a stream.
313    pub async fn fetch_as<Adapter, O>(
314        &'q mut self,
315
316        db_adapter: Adapter,
317    ) -> impl Stream<Item = Result<O, Error>>
318    where
319        Adapter: BackendDB<'c, DB> + 'c,
320        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
321    {
322        self.fetch_many_as(db_adapter)
323            .try_filter_map(|step| async move { Ok(step.right()) })
324    }
325    /// like sqlx::QueryAs::fetch_many
326    /// Execute multiple queries and return the generated results as a stream
327    /// from each query, in a stream.
328    pub fn fetch_many_as<Adapter, O>(
329        &'q mut self,
330        db_adapter: Adapter,
331    ) -> BoxStream<'e, Result<Either<DB::QueryResult, O>, Error>>
332    where
333        Adapter: BackendDB<'c, DB> + 'c,
334        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
335    {
336        let template = self.template.clone();
337        let page_no = self.page_no;
338        let page_size = self.page_size;
339        Box::pin(async_stream::try_stream! {
340        let (db_type, executor) = db_adapter.backend_db().await?;
341        let f = db_type.get_encode_placeholder_fn();
342        let mut sql = String::new();
343        let mut arg = template.render_sql_with_encode_placeholder_fn(f, &mut sql)?;
344
345        if let (Some(page_no), Some(page_size)) = (page_no, page_size) {
346            let mut args = arg.unwrap_or_default();
347            db_type.write_page_sql(&mut sql, page_size, page_no, &mut args)?;
348            arg = Some(args);
349        }
350
351        self.sql = sql;
352        let execute = SqlTemplateExecute::new(&self.sql, arg).set_persistent(self.persistent);
353        let mut stream = execute.fetch_many(executor).map(|v| match v {
354            Ok(Either::Right(row)) => O::from_row(&row).map(Either::Right),
355            Ok(Either::Left(v)) => Ok(Either::Left(v)),
356            Err(e) => Err(e),
357        });
358        while let Some(item) = stream.try_next().await? {
359            yield item;
360        }
361          })
362    }
363    /// like sqlx::QueryAs::fetch_all
364    /// Execute the query and return all the resulting rows collected into a [`Vec`].
365    ///
366    /// ### Note: beware result set size.
367    /// This will attempt to collect the full result set of the query into memory.
368    ///
369    /// To avoid exhausting available memory, ensure the result set has a known upper bound,
370    /// e.g. using `LIMIT`.
371    #[inline]
372    pub async fn fetch_all_as<Adapter, O>(
373        &'q mut self,
374        db_adapter: Adapter,
375    ) -> Result<Vec<O>, Error>
376    where
377        Adapter: BackendDB<'c, DB> + 'c,
378        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
379    {
380        self.fetch_as(db_adapter).await.try_collect().await
381    }
382    /// like sqlx::QueryAs::fetch_one
383    /// Execute the query, returning the first row or [`Error::RowNotFound`] otherwise.
384    ///
385    /// ### Note: for best performance, ensure the query returns at most one row.
386    /// Depending on the driver implementation, if your query can return more than one row,
387    /// it may lead to wasted CPU time and bandwidth on the database server.
388    ///
389    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
390    /// can result in a more optimal query plan.
391    ///
392    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
393    ///
394    /// Otherwise, you might want to add `LIMIT 1` to your query.
395    pub async fn fetch_one_as<Adapter, O>(&'q mut self, db_adapter: Adapter) -> Result<O, Error>
396    where
397        Adapter: BackendDB<'c, DB> + 'c,
398        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
399    {
400        self.fetch_optional_as(db_adapter)
401            .and_then(|o| match o {
402                Some(o) => future::ok(o),
403                None => future::err(Error::RowNotFound),
404            })
405            .await
406    }
407    /// like sqlx::QueryAs::fetch_optional
408    /// Execute the query, returning the first row or `None` otherwise.
409    ///
410    /// ### Note: for best performance, ensure the query returns at most one row.
411    /// Depending on the driver implementation, if your query can return more than one row,
412    /// it may lead to wasted CPU time and bandwidth on the database server.
413    ///
414    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
415    /// can result in a more optimal query plan.
416    ///
417    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
418    ///
419    /// Otherwise, you might want to add `LIMIT 1` to your query.
420    pub async fn fetch_optional_as<Adapter, O>(
421        &'q mut self,
422
423        db_adapter: Adapter,
424    ) -> Result<Option<O>, Error>
425    where
426        Adapter: BackendDB<'c, DB> + 'c,
427        O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
428    {
429        let obj = self.fetch_many_as(db_adapter).try_next().await?;
430        match obj {
431            Some(Either::Right(o)) => Ok(Some(o)),
432            Some(Either::Left(_)) => Ok(None),
433            None => Ok(None),
434        }
435    }
436}