Skip to main content

sqlx_askama_template/
template_adapter.rs

1use std::marker::PhantomData;
2
3use crate::SqlTemplate;
4use askama::Result;
5use futures_core::{Stream, future::BoxFuture, stream::BoxStream};
6
7use futures_util::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, future};
8use sqlx_core::{
9    Either, Error, database::Database, encode::Encode, from_row::FromRow, types::Type,
10};
11
12use crate::{DatabaseDialect, db_adapter::BackendDB, sql_template_execute::SqlTemplateExecute};
13
14/// Pagination metadata container
15#[derive(Debug, PartialEq, Eq)]
16pub struct PaginationInfo {
17    /// Total number of records
18    pub total: i64,
19    /// Records per pagination
20    pub pagination_size: i64,
21    /// Calculated pagination count
22    pub pagination_count: i64,
23}
24
25impl PaginationInfo {
26    /// Constructs new PaginationInfo with automatic pagination count calculation
27    ///
28    /// # Arguments
29    /// * `total` - Total records in dataset
30    /// * `pagination_size` - Desired records per pagination
31    pub fn new(total: i64, pagination_size: i64) -> PaginationInfo {
32        let mut pagination_count = total / pagination_size;
33        if total % pagination_size > 0 {
34            pagination_count += 1;
35        }
36        Self {
37            total,
38            pagination_size,
39            pagination_count,
40        }
41    }
42}
43/// Database adapter manager handling SQL rendering and execution
44///
45/// # Generic Parameters
46/// - `'q`: Query lifetime
47/// - `DB`: Database type
48/// - `T`: SQL template type
49pub struct DBAdapter<'q, DB, T>
50where
51    DB: Database,
52    T: SqlTemplate<'q, DB>,
53{
54    pub(crate) template: T,
55    persistent: bool,
56    _p: PhantomData<&'q DB>,
57    pagination_size: Option<i64>,
58    pagination_no: Option<i64>,
59}
60
61impl<'q, DB, T> DBAdapter<'q, DB, T>
62where
63    DB: Database,
64    T: SqlTemplate<'q, DB>,
65{
66    /// Creates a new DBAdapter for the SQL template.
67    ///
68    /// # Arguments
69    /// * `template` - SQL template instance
70    pub fn new(template: T) -> Self {
71        Self {
72            template,
73            persistent: true,
74            pagination_no: None,
75            pagination_size: None,
76            _p: PhantomData,
77        }
78    }
79}
80impl<'q, 'c, 'e, DB, T> DBAdapter<'q, DB, T>
81where
82    DB: Database + Sync,
83    T: SqlTemplate<'q, DB> + Send + 'q,
84    i64: Encode<'q, DB> + Type<DB>,
85    DB::Arguments: 'q,
86    'q: 'e,
87    'c: 'e,
88{
89    /// Configures query persistence (default: true)
90    pub fn set_persistent(mut self, persistent: bool) -> Self {
91        self.persistent = persistent;
92        self
93    }
94    /// Executes count query for pagination
95    ///
96    /// # Arguments
97    /// * `db_adapter` - Database connection adapter
98    #[inline]
99    pub fn count<Adapter>(self, db_adapter: Adapter) -> BoxFuture<'e, Result<i64, Error>>
100    where
101        Adapter: BackendDB<'c, DB> + 'c,
102        (i64,): for<'r> FromRow<'r, DB::Row>,
103    {
104        let template = self.template.clone();
105
106        async move {
107            let (db_type, executor) = db_adapter.backend_db().await?;
108            let f = db_type.placeholder_fn();
109            let mut sql = String::new();
110            let arg = template.render_with_placeholder(f, &mut sql)?;
111
112            db_type.write_count_sql(&mut sql);
113            let execute = SqlTemplateExecute::new(sql, arg).set_persistent(self.persistent);
114            let (count,): (i64,) = execute.fetch_one_as(executor).await?;
115            Ok(count)
116        }
117        .boxed()
118    }
119    /// Calculates complete pagination metadata
120    ///
121    /// # Arguments
122    /// * `pagination_size` - Records per pagination
123    /// * `db_adapter` - Database connection adapter
124    #[inline]
125    pub async fn pagination_info<Adapter>(
126        self,
127        pagination_size: i64,
128        db_adapter: Adapter,
129    ) -> Result<PaginationInfo, Error>
130    where
131        Adapter: BackendDB<'c, DB> + 'c,
132        (i64,): for<'r> FromRow<'r, DB::Row>,
133    {
134        let count = self.count(db_adapter).await?;
135        Ok(PaginationInfo::new(count, pagination_size))
136    }
137    /// Sets pagination parameters
138    pub fn set_pagination(mut self, pagination_size: i64, pagination_no: i64) -> Self {
139        self.pagination_no = Some(pagination_no);
140        self.pagination_size = Some(pagination_size);
141        self
142    }
143
144    /// like sqlx::Query::execute
145    /// Execute the query and return the number of rows affected.
146    #[inline]
147    pub async fn execute<Adapter>(self, db_adapter: Adapter) -> Result<DB::QueryResult, Error>
148    where
149        Adapter: BackendDB<'c, DB> + 'c,
150    {
151        self.execute_many(db_adapter).try_collect().await
152    }
153    /// like    sqlx::Query::execute_many
154    /// Execute multiple queries and return the rows affected from each query, in a stream.
155    #[inline]
156    pub fn execute_many<Adapter>(
157        self,
158
159        db_adapter: Adapter,
160    ) -> impl Stream<Item = Result<DB::QueryResult, Error>>
161    where
162        Adapter: BackendDB<'c, DB> + 'c,
163    {
164        self.fetch_many(db_adapter)
165            .try_filter_map(|step| async move {
166                Ok(match step {
167                    Either::Left(rows) => Some(rows),
168                    Either::Right(_) => None,
169                })
170            })
171    }
172    /// like sqlx::Query::fetch
173    /// Execute the query and return the generated results as a stream.
174    #[inline]
175    pub fn fetch<Adapter>(self, db_adapter: Adapter) -> impl Stream<Item = Result<DB::Row, Error>>
176    where
177        Adapter: BackendDB<'c, DB> + 'c,
178    {
179        self.fetch_many(db_adapter)
180            .try_filter_map(|step| async move {
181                Ok(match step {
182                    Either::Left(_) => None,
183                    Either::Right(row) => Some(row),
184                })
185            })
186    }
187    /// like sqlx::Query::fetch_many
188    /// Execute multiple queries and return the generated results as a stream.
189    ///
190    /// For each query in the stream, any generated rows are returned first,
191    /// then the `QueryResult` with the number of rows affected.
192    #[inline]
193    #[allow(clippy::type_complexity)]
194    pub fn fetch_many<Adapter>(
195        self,
196        db_adapter: Adapter,
197    ) -> BoxStream<'e, Result<Either<DB::QueryResult, DB::Row>, Error>>
198    where
199        Adapter: BackendDB<'c, DB> + 'c,
200    {
201        let template = self.template.clone();
202        let pagination_no = self.pagination_no;
203        let pagination_size = self.pagination_size;
204        Box::pin(async_stream::try_stream! {
205            let (db_type, executor) = db_adapter.backend_db().await?;
206            let f = db_type.placeholder_fn();
207            let mut sql = String::new();
208            let mut arg = template.render_with_placeholder(f, &mut sql)?;
209
210            if let (Some(pagination_no), Some(pagination_size)) = (pagination_no, pagination_size) {
211                let mut args = arg.unwrap_or_default();
212                db_type.write_pagination_sql(&mut sql, pagination_size, pagination_no, &mut args)?;
213                arg = Some(args);
214            }
215
216            let execute = SqlTemplateExecute::new(sql, arg).set_persistent(self.persistent);
217            let mut stream = execute.fetch_many(executor);
218            while let Some(item) = stream.try_next().await? {
219                yield item;
220            }
221        })
222    }
223
224    /// like sqlx::Query::fetch_all
225    /// Execute the query and return all the resulting rows collected into a [`Vec`].
226    ///
227    /// ### Note: beware result set size.
228    /// This will attempt to collect the full result set of the query into memory.
229    ///
230    /// To avoid exhausting available memory, ensure the result set has a known upper bound,
231    /// e.g. using `LIMIT`.
232    #[inline]
233    pub async fn fetch_all<Adapter>(self, db_adapter: Adapter) -> Result<Vec<DB::Row>, Error>
234    where
235        Adapter: BackendDB<'c, DB> + 'c,
236    {
237        self.fetch(db_adapter).try_collect().await
238    }
239    /// like sqlx::Query::fetch_one
240    /// Execute the query, returning the first row or [`Error::RowNotFound`] otherwise.
241    ///
242    /// ### Note: for best performance, ensure the query returns at most one row.
243    /// Depending on the driver implementation, if your query can return more than one row,
244    /// it may lead to wasted CPU time and bandwidth on the database server.
245    ///
246    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
247    /// can result in a more optimal query plan.
248    ///
249    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
250    ///
251    /// Otherwise, you might want to add `LIMIT 1` to your query.
252    #[inline]
253    pub async fn fetch_one<Adapter>(self, db_adapter: Adapter) -> Result<DB::Row, Error>
254    where
255        Adapter: BackendDB<'c, DB> + 'c,
256    {
257        self.fetch_optional(db_adapter)
258            .and_then(|row| match row {
259                Some(row) => future::ok(row),
260                None => future::err(Error::RowNotFound),
261            })
262            .await
263    }
264    /// like sqlx::Query::fetch_optional
265    /// Execute the query, returning the first row or `None` otherwise.
266    ///
267    /// ### Note: for best performance, ensure the query returns at most one row.
268    /// Depending on the driver implementation, if your query can return more than one row,
269    /// it may lead to wasted CPU time and bandwidth on the database server.
270    ///
271    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
272    /// can result in a more optimal query plan.
273    ///
274    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
275    ///
276    /// Otherwise, you might want to add `LIMIT 1` to your query.
277    #[inline]
278    pub async fn fetch_optional<Adapter>(
279        self,
280
281        db_adapter: Adapter,
282    ) -> Result<Option<DB::Row>, Error>
283    where
284        Adapter: BackendDB<'c, DB> + 'c,
285    {
286        let row = self.fetch_many(db_adapter).try_next().await?;
287        match row {
288            Some(Either::Right(row)) => Ok(Some(row)),
289            Some(Either::Left(_)) => Ok(None),
290            None => Ok(None),
291        }
292    }
293
294    /// like sqlx::QueryAs::fetch
295    /// Execute the query and return the generated results as a stream.
296    pub async fn fetch_as<Adapter, O>(
297        self,
298
299        db_adapter: Adapter,
300    ) -> impl Stream<Item = Result<O, Error>>
301    where
302        Adapter: BackendDB<'c, DB> + 'c,
303        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
304    {
305        self.fetch_many_as(db_adapter)
306            .try_filter_map(|step| async move { Ok(step.right()) })
307    }
308    /// like sqlx::QueryAs::fetch_many
309    /// Execute multiple queries and return the generated results as a stream
310    /// from each query, in a stream.
311    pub fn fetch_many_as<Adapter, O>(
312        self,
313        db_adapter: Adapter,
314    ) -> BoxStream<'e, Result<Either<DB::QueryResult, O>, Error>>
315    where
316        'q: 'e,
317        Adapter: BackendDB<'c, DB> + 'c,
318        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
319    {
320        let template = self.template.clone();
321        let pagination_no = self.pagination_no;
322        let pagination_size = self.pagination_size;
323        Box::pin(async_stream::try_stream! {
324        let (db_type, executor) = db_adapter.backend_db().await?;
325        let f = db_type.placeholder_fn();
326        let mut sql = String::new();
327        let mut arg = template.render_with_placeholder(f, &mut sql)?;
328
329        if let (Some(pagination_no), Some(pagination_size)) = (pagination_no, pagination_size) {
330            let mut args = arg.unwrap_or_default();
331            db_type.write_pagination_sql(&mut sql, pagination_size, pagination_no, &mut args)?;
332            arg = Some(args);
333        }
334
335        let execute = SqlTemplateExecute::new(sql, arg).set_persistent(self.persistent);
336        let mut stream = execute.fetch_many(executor).map(|v| match v {
337            Ok(Either::Right(row)) => O::from_row(&row).map(Either::Right),
338            Ok(Either::Left(v)) => Ok(Either::Left(v)),
339            Err(e) => Err(e),
340        });
341        while let Some(item) = stream.try_next().await? {
342            yield item;
343        }
344          })
345    }
346    /// like sqlx::QueryAs::fetch_all
347    /// Execute the query and return all the resulting rows collected into a [`Vec`].
348    ///
349    /// ### Note: beware result set size.
350    /// This will attempt to collect the full result set of the query into memory.
351    ///
352    /// To avoid exhausting available memory, ensure the result set has a known upper bound,
353    /// e.g. using `LIMIT`.
354    #[inline]
355    pub async fn fetch_all_as<Adapter, O>(self, db_adapter: Adapter) -> Result<Vec<O>, Error>
356    where
357        Adapter: BackendDB<'c, DB> + 'c,
358        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
359    {
360        self.fetch_as(db_adapter).await.try_collect().await
361    }
362    /// like sqlx::QueryAs::fetch_one
363    /// Execute the query, returning the first row or [`Error::RowNotFound`] otherwise.
364    ///
365    /// ### Note: for best performance, ensure the query returns at most one row.
366    /// Depending on the driver implementation, if your query can return more than one row,
367    /// it may lead to wasted CPU time and bandwidth on the database server.
368    ///
369    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
370    /// can result in a more optimal query plan.
371    ///
372    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
373    ///
374    /// Otherwise, you might want to add `LIMIT 1` to your query.
375    pub async fn fetch_one_as<Adapter, O>(self, db_adapter: Adapter) -> Result<O, Error>
376    where
377        Adapter: BackendDB<'c, DB> + 'c,
378        O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
379    {
380        self.fetch_optional_as(db_adapter)
381            .and_then(|o| match o {
382                Some(o) => future::ok(o),
383                None => future::err(Error::RowNotFound),
384            })
385            .await
386    }
387    /// like sqlx::QueryAs::fetch_optional
388    /// Execute the query, returning the first row or `None` otherwise.
389    ///
390    /// ### Note: for best performance, ensure the query returns at most one row.
391    /// Depending on the driver implementation, if your query can return more than one row,
392    /// it may lead to wasted CPU time and bandwidth on the database server.
393    ///
394    /// Even when the driver implementation takes this into account, ensuring the query returns at most one row
395    /// can result in a more optimal query plan.
396    ///
397    /// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
398    ///
399    /// Otherwise, you might want to add `LIMIT 1` to your query.
400    pub async fn fetch_optional_as<Adapter, O>(
401        self,
402
403        db_adapter: Adapter,
404    ) -> Result<Option<O>, Error>
405    where
406        Adapter: BackendDB<'c, DB> + 'c,
407        O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
408    {
409        let row = self.fetch_many_as(db_adapter).try_next().await?;
410        match row {
411            Some(Either::Right(o)) => Ok(Some(o)),
412            Some(Either::Left(_)) => Ok(None),
413            None => Ok(None),
414        }
415    }
416}