1use std::marker::PhantomData;
2
3use crate::SqlTemplate;
4use askama::Result;
5use futures_core::{future::BoxFuture, stream::BoxStream};
6
7use futures_util::{FutureExt, StreamExt, TryStreamExt, stream};
8use sqlx_core::{
9 Either, Error, database::Database, encode::Encode, executor::Executor, from_row::FromRow,
10 types::Type,
11};
12
13use super::{DatabaseDialect, db_adapter::BackendDB, sql_template_execute::SqlTemplateExecute};
14
15#[derive(Debug)]
17pub struct PageInfo {
18 pub total: i64,
20 pub page_size: i64,
22 pub page_count: i64,
24}
25
26impl PageInfo {
27 pub fn new(total: i64, page_size: i64) -> PageInfo {
33 let mut page_count = total / page_size;
34 if total % page_size > 0 {
35 page_count += 1;
36 }
37 Self {
38 total,
39 page_size,
40 page_count,
41 }
42 }
43}
44pub struct DBAdapterManager<'q, DB, T>
51where
52 DB: Database,
53 T: SqlTemplate<'q, DB>,
54{
55 pub(crate) sql: String,
56 pub(crate) template: T,
57 persistent: bool,
58 _p: PhantomData<&'q DB>,
59 page_size: Option<i64>,
60 page_no: Option<i64>,
61}
62
63impl<'q, DB, T> DBAdapterManager<'q, DB, T>
64where
65 DB: Database,
66 T: SqlTemplate<'q, DB>,
67{
68 pub fn new(template: T) -> Self {
73 Self {
74 sql: String::new(),
75 template,
76 persistent: true,
77 page_no: None,
78 page_size: None,
79 _p: PhantomData,
80 }
81 }
82
83 pub fn sql(&self) -> &String {
84 &self.sql
85 }
86}
87impl<'q, DB, T> DBAdapterManager<'q, DB, T>
88where
89 DB: Database,
90 T: SqlTemplate<'q, DB> + Send + 'q,
91 i64: Encode<'q, DB> + Type<DB>,
92{
93 pub fn set_persistent(mut self, persistent: bool) -> Self {
95 self.persistent = persistent;
96 self
97 }
98 #[inline]
103 pub async fn count<Adapter>(&'q mut self, db_adapter: Adapter) -> Result<i64, Error>
104 where
105 (i64,): for<'r> FromRow<'r, DB::Row>,
106 Adapter: BackendDB<'q, DB> + 'q,
107 {
108 let (mut sql, arg, db_type, executor) = Self::render_sql_with_adapter(
109 self.template.clone(),
110 db_adapter,
111 self.page_no,
112 self.page_size,
113 )
114 .await?;
115
116 db_type.write_count_sql(&mut sql);
117 self.sql = sql;
118 let execute = SqlTemplateExecute::new(&self.sql, arg).set_persistent(self.persistent);
119 let (count,): (i64,) = execute.fetch_one_as(executor).await?;
120 Ok(count)
121 }
122 #[inline]
128 pub async fn count_page<Adapter>(
129 &'q mut self,
130
131 page_size: i64,
132 db_adapter: Adapter,
133 ) -> Result<PageInfo, Error>
134 where
135 Adapter: BackendDB<'q, DB> + 'q,
136 (i64,): for<'r> FromRow<'r, DB::Row>,
137 {
138 let count = self.count(db_adapter).await?;
139
140 Ok(PageInfo::new(count, page_size))
141 }
142 pub fn set_page(mut self, page_size: i64, page_no: i64) -> Self {
144 self.page_no = Some(page_no);
145 self.page_size = Some(page_size);
146 self
147 }
148 #[allow(clippy::type_complexity)]
150 #[inline]
151 pub fn render_sql_with_adapter<Adapter>(
152 template: T,
153
154 db_adapter: Adapter,
155 page_no: Option<i64>,
156 page_size: Option<i64>,
157 ) -> BoxFuture<
158 'q,
159 Result<
160 (
161 String,
162 Option<DB::Arguments<'q>>,
163 impl DatabaseDialect,
164 impl Executor<'q, Database = DB> + 'q,
165 ),
166 Error,
167 >,
168 >
169 where
170 Adapter: BackendDB<'q, DB> + 'q,
171 {
172 async move {
173 let (db_type, executor) = db_adapter.backend_db().await?;
174 let f = db_type.get_encode_placeholder_fn();
175 let mut sql = String::new();
176 let mut arg = template.render_sql_with_encode_placeholder_fn(f, &mut sql)?;
177
178 if let (Some(page_no), Some(page_size)) = (page_no, page_size) {
179 let mut args = arg.unwrap_or_default();
180 db_type.write_page_sql(&mut sql, page_size, page_no, &mut args)?;
181 arg = Some(args);
182 }
183 Ok((sql, arg, db_type, executor))
184 }
185 .boxed()
186 }
187
188 #[inline]
191 pub async fn execute<'c, Adapter>(
192 &'q mut self,
193 db_adapter: Adapter,
194 ) -> Result<DB::QueryResult, Error>
195 where
196 Adapter: BackendDB<'q, DB> + 'q,
197 {
198 self.execute_many(db_adapter).await.try_collect().await
199 }
200 #[inline]
203 pub async fn execute_many<'e, Adapter>(
204 &'q mut self,
205
206 db_adapter: Adapter,
207 ) -> BoxStream<'e, Result<DB::QueryResult, Error>>
208 where
209 'q: 'e,
210 Adapter: BackendDB<'q, DB> + 'q,
211 {
212 self.fetch_many(db_adapter)
213 .await
214 .try_filter_map(|step| async move {
215 Ok(match step {
216 Either::Left(rows) => Some(rows),
217 Either::Right(_) => None,
218 })
219 })
220 .boxed()
221 }
222 #[inline]
225 pub async fn fetch<'e, Adapter>(
226 &'q mut self,
227
228 db_adapter: Adapter,
229 ) -> BoxStream<'e, Result<DB::Row, Error>>
230 where
231 Adapter: BackendDB<'q, DB> + 'q,
232 'q: 'e,
233 {
234 self.fetch_many(db_adapter)
235 .await
236 .try_filter_map(|step| async move {
237 Ok(match step {
238 Either::Left(_) => None,
239 Either::Right(row) => Some(row),
240 })
241 })
242 .boxed()
243 }
244 #[inline]
250 #[allow(clippy::type_complexity)]
251 pub async fn fetch_many<'e, Adapter>(
252 &'q mut self,
253 db_adapter: Adapter,
254 ) -> BoxStream<'e, Result<Either<DB::QueryResult, DB::Row>, Error>>
255 where
256 'q: 'e,
257 Adapter: BackendDB<'q, DB> + 'q,
258 {
259 let res = Self::render_sql_with_adapter(
260 self.template.clone(),
261 db_adapter,
262 self.page_no,
263 self.page_size,
264 )
265 .await;
266
267 match res {
268 Ok((sql, arg, _db_type, executor)) => {
269 self.sql = sql;
270 let execute =
271 SqlTemplateExecute::new(&self.sql, arg).set_persistent(self.persistent);
272 execute.fetch_many(executor)
273 }
274 Err(e) => stream::once(async move { Err(e) }).boxed(),
275 }
276 }
277
278 #[inline]
287 pub async fn fetch_all<C, Adapter>(
288 &'q mut self,
289 db_adapter: Adapter,
290 ) -> Result<Vec<DB::Row>, Error>
291 where
292 Adapter: BackendDB<'q, DB> + 'q,
293 {
294 self.fetch(db_adapter).await.try_collect().await
295 }
296 #[inline]
310 pub async fn fetch_one<Adapter>(&'q mut self, db_adapter: Adapter) -> Result<DB::Row, Error>
311 where
312 Adapter: BackendDB<'q, DB> + 'q,
313 {
314 self.fetch_optional(db_adapter)
315 .await
316 .and_then(|row| match row {
317 Some(row) => Ok(row),
318 None => Err(Error::RowNotFound),
319 })
320 }
321 #[inline]
335 pub async fn fetch_optional<Adapter>(
336 &'q mut self,
337
338 db_adapter: Adapter,
339 ) -> Result<Option<DB::Row>, Error>
340 where
341 Adapter: BackendDB<'q, DB> + 'q,
342 {
343 self.fetch(db_adapter).await.try_next().await
344 }
345
346 pub async fn fetch_as<'e, Adapter, O>(
349 &'q mut self,
350
351 db_adapter: Adapter,
352 ) -> BoxStream<'e, Result<O, Error>>
353 where
354 'q: 'e,
355 Adapter: BackendDB<'q, DB> + 'q,
356 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
357 {
358 self.fetch_many_as(db_adapter)
359 .await
360 .try_filter_map(|step| async move { Ok(step.right()) })
361 .boxed()
362 }
363 pub async fn fetch_many_as<'e, Adapter, O>(
367 &'q mut self,
368 db_adapter: Adapter,
369 ) -> BoxStream<'e, Result<Either<DB::QueryResult, O>, Error>>
370 where
371 'q: 'e,
372 Adapter: BackendDB<'q, DB> + 'q,
373 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
374 {
375 self.fetch_many(db_adapter)
376 .await
377 .map(|v| match v {
378 Ok(Either::Right(row)) => O::from_row(&row).map(Either::Right),
379 Ok(Either::Left(v)) => Ok(Either::Left(v)),
380 Err(e) => Err(e),
381 })
382 .boxed()
383 }
384 #[inline]
393 pub async fn fetch_all_as<Adapter, O>(
394 &'q mut self,
395 db_adapter: Adapter,
396 ) -> Result<Vec<O>, Error>
397 where
398 Adapter: BackendDB<'q, DB> + 'q,
399 O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
400 {
401 self.fetch_as(db_adapter).await.try_collect().await
402 }
403 pub async fn fetch_one_as<Adapter, O>(&'q mut self, db_adapter: Adapter) -> Result<O, Error>
417 where
418 Adapter: BackendDB<'q, DB> + 'q,
419 O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
420 {
421 self.fetch_optional_as(db_adapter)
422 .await
423 .and_then(|row| row.ok_or(Error::RowNotFound))
424 }
425 pub async fn fetch_optional_as<Adapter, O>(
439 &'q mut self,
440
441 db_adapter: Adapter,
442 ) -> Result<Option<O>, Error>
443 where
444 Adapter: BackendDB<'q, DB> + 'q,
445 O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
446 {
447 let row = self.fetch_optional(db_adapter).await?;
448 if let Some(row) = row {
449 O::from_row(&row).map(Some)
450 } else {
451 Ok(None)
452 }
453 }
454}