1use std::marker::PhantomData;
2
3use askama::Result;
4use futures_core::stream::BoxStream;
5use futures_util::{StreamExt, TryStreamExt, stream};
6
7use crate::SqlTemplate;
8use sqlx_core::{
9 Either, Error, database::Database, encode::Encode, executor::Executor, from_row::FromRow,
10 pool::PoolConnection, try_stream, types::Type,
11};
12
13use super::{
14 db_type::{DBBackend, DBType},
15 sql_templte_execute::SqlTemplateExecute,
16};
17
18#[derive(Debug)]
20pub struct PageInfo {
21 pub total: i64,
23 pub page_size: i64,
25 pub page_count: i64,
27}
28
29impl PageInfo {
30 pub fn new(total: i64, page_size: i64) -> PageInfo {
36 let mut page_count = total / page_size;
37 if total % page_size > 0 {
38 page_count += 1;
39 }
40 Self {
41 total,
42 page_size,
43 page_count,
44 }
45 }
46}
47pub struct DBAdapterManager<'q, DB, T>
54where
55 DB: Database,
56 T: SqlTemplate<'q, DB>,
57{
58 pub(crate) sql_buff: &'q mut String,
59 pub(crate) template: T,
60 persistent: bool,
61 _p: PhantomData<&'q DB>,
62 page_size: Option<i64>,
63 page_no: Option<i64>,
64}
65impl<'q, DB, T> DBAdapterManager<'q, DB, T>
66where
67 DB: Database,
68 T: SqlTemplate<'q, DB>,
69{
70 pub fn new(template: T, sql_buff: &'q mut String) -> Self {
76 Self {
77 sql_buff,
78 template,
79 persistent: true,
80 page_no: None,
81 page_size: None,
82 _p: PhantomData,
83 }
84 }
85}
86impl<'q, DB, T> DBAdapterManager<'q, DB, T>
87where
88 DB: Database,
89 T: SqlTemplate<'q, DB>,
90 i64: Encode<'q, DB> + Type<DB>,
91 for<'c> &'c mut DB::Connection: Executor<'c, Database = DB>,
92{
93 pub fn set_persistent(mut self, persistent: bool) -> Self {
95 self.persistent = persistent;
96 self
97 }
98 #[inline]
103 pub async fn count<'c, Adapter>(self, db_adapter: Adapter) -> Result<i64, Error>
104 where
105 Adapter: DBBackend<'c, DB>,
106 (i64,): for<'r> FromRow<'r, DB::Row>,
107 'q: 'c,
108 {
109 let sql_buff = self.sql_buff;
110 let (mut sql, arg, db_type, executor) =
111 Self::render_adapter_sql(self.template, db_adapter, None, None).await?;
112
113 db_type.write_count_sql(&mut sql);
114 *sql_buff = sql;
115 let execute = SqlTemplateExecute::new(&*sql_buff, arg).set_persistent(self.persistent);
116 match executor {
117 Either::Left(conn) => {
118 let (count,): (i64,) = execute.fetch_one_as(conn).await?;
119
120 Ok(count)
121 }
122 Either::Right(mut conn) => {
123 let (count,): (i64,) = execute.fetch_one_as(&mut *conn).await?;
124
125 Ok(count)
126 }
127 }
128 }
129 #[inline]
135 pub async fn count_page<'c, Adapter>(
136 self,
137
138 page_size: i64,
139 db_adapter: Adapter,
140 ) -> Result<PageInfo, Error>
141 where
142 Adapter: DBBackend<'c, DB>,
143 (i64,): for<'r> FromRow<'r, DB::Row>,
144 'q: 'c,
145 {
146 let count = self.count(db_adapter).await?;
147
148 Ok(PageInfo::new(count, page_size))
149 }
150 pub fn set_page(mut self, page_size: i64, page_no: i64) -> Self {
152 self.page_no = Some(page_no);
153 self.page_size = Some(page_size);
154 self
155 }
156 #[inline]
158 pub async fn render_adapter_sql<'c, Adapter>(
159 template: T,
160
161 db_adapter: Adapter,
162 page_no: Option<i64>,
163 page_size: Option<i64>,
164 ) -> Result<
165 (
166 String,
167 Option<DB::Arguments<'q>>,
168 DBType,
169 Either<impl Executor<'c, Database = DB>, PoolConnection<DB>>,
170 ),
171 Error,
172 >
173 where
174 'q: 'c,
175 Adapter: DBBackend<'c, DB>,
176 {
177 let (db_type, executor) = db_adapter.backend_db().await?;
178 let f = db_type.get_encode_placeholder_fn();
179 let mut sql = String::new();
180 let mut arg = template.render_sql_with_encode_placeholder_fn(f, &mut sql)?;
181
182 if let (Some(page_no), Some(page_size)) = (page_no, page_size) {
183 let mut args = arg.unwrap_or_default();
184 db_type.write_page_sql(&mut sql, page_size, page_no, &mut args)?;
185 arg = Some(args);
186 }
187 Ok((sql, arg, db_type, executor))
188 }
189
190 #[inline]
193 pub async fn execute<'c, 'e, Adapter>(
194 self,
195 db_adapter: Adapter,
196 ) -> Result<DB::QueryResult, Error>
197 where
198 'q: 'c,
199 Adapter: DBBackend<'c, DB>,
200 {
201 self.execute_many(db_adapter).await.try_collect().await
202 }
203 #[inline]
206 pub async fn execute_many<'c, 'e, Adapter>(
207 self,
208
209 db_adapter: Adapter,
210 ) -> BoxStream<'e, Result<DB::QueryResult, Error>>
211 where
212 'q: 'c,
213 'c: 'e,
214 Adapter: DBBackend<'c, DB>,
215 {
216 self.fetch_many(db_adapter)
217 .await
218 .try_filter_map(|step| async move {
219 Ok(match step {
220 Either::Left(rows) => Some(rows),
221 Either::Right(_) => None,
222 })
223 })
224 .boxed()
225 }
226 #[inline]
229 pub async fn fetch<'c, 'e, Adapter>(
230 self,
231
232 db_adapter: Adapter,
233 ) -> BoxStream<'e, Result<DB::Row, Error>>
234 where
235 'q: 'c,
236 'c: 'e,
237 Adapter: DBBackend<'c, DB>,
238 {
239 self.fetch_many(db_adapter)
240 .await
241 .try_filter_map(|step| async move {
242 Ok(match step {
243 Either::Left(_) => None,
244 Either::Right(row) => Some(row),
245 })
246 })
247 .boxed()
248 }
249 #[inline]
255 #[allow(clippy::type_complexity)]
256 pub async fn fetch_many<'c, 'e, Adapter>(
257 self,
258 db_adapter: Adapter,
259 ) -> BoxStream<'e, Result<Either<DB::QueryResult, DB::Row>, Error>>
260 where
261 'q: 'c,
262 'c: 'e,
263 Adapter: DBBackend<'c, DB>,
264 {
265 let sql_buff = self.sql_buff;
266 let res =
267 Self::render_adapter_sql(self.template, db_adapter, self.page_no, self.page_size).await;
268
269 match res {
270 Ok((sql, arg, _db_type, executor)) => {
271 *sql_buff = sql;
272 let execute =
273 SqlTemplateExecute::new(&*sql_buff, arg).set_persistent(self.persistent);
274
275 match executor {
276 Either::Left(conn) => execute.fetch_many(conn),
277 Either::Right(mut conn) => Box::pin(try_stream! {
278
279 let mut s = conn.fetch_many(execute);
280
281 while let Some(v) = s.try_next().await? {
282 r#yield!(v);
283 }
284
285 Ok(())
286 }),
287 }
288 }
289 Err(e) => stream::once(async move { Err(e) }).boxed(),
290 }
291 }
292
293 #[inline]
302 pub async fn fetch_all<'c, Adapter>(self, db_adapter: Adapter) -> Result<Vec<DB::Row>, Error>
303 where
304 'q: 'c,
305 Adapter: DBBackend<'c, DB>,
306 {
307 self.fetch(db_adapter).await.try_collect().await
308 }
309 #[inline]
323 pub async fn fetch_one<'c, Adapter>(self, db_adapter: Adapter) -> Result<DB::Row, Error>
324 where
325 'q: 'c,
326 Adapter: DBBackend<'c, DB>,
327 {
328 self.fetch_optional(db_adapter)
329 .await
330 .and_then(|row| match row {
331 Some(row) => Ok(row),
332 None => Err(Error::RowNotFound),
333 })
334 }
335 #[inline]
349 pub async fn fetch_optional<'c, Adapter>(
350 self,
351
352 db_adapter: Adapter,
353 ) -> Result<Option<DB::Row>, Error>
354 where
355 'q: 'c,
356 Adapter: DBBackend<'c, DB>,
357 {
358 self.fetch(db_adapter).await.try_next().await
359 }
360
361 pub async fn fetch_as<'c, 'e, Adapter, O>(
364 self,
365
366 db_adapter: Adapter,
367 ) -> BoxStream<'e, Result<O, Error>>
368 where
369 'q: 'c,
370 'c: 'e,
371 Adapter: DBBackend<'c, DB>,
372 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
373 {
374 self.fetch_many_as(db_adapter)
375 .await
376 .try_filter_map(|step| async move { Ok(step.right()) })
377 .boxed()
378 }
379 pub async fn fetch_many_as<'c, 'e, Adapter, O>(
383 self,
384
385 db_adapter: Adapter,
386 ) -> BoxStream<'e, Result<Either<DB::QueryResult, O>, Error>>
387 where
388 'q: 'c,
389 'c: 'e,
390 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
391 Adapter: DBBackend<'c, DB>,
392 {
393 self.fetch_many(db_adapter)
394 .await
395 .map(|v| match v {
396 Ok(Either::Right(row)) => O::from_row(&row).map(Either::Right),
397 Ok(Either::Left(v)) => Ok(Either::Left(v)),
398 Err(e) => Err(e),
399 })
400 .boxed()
401 }
402 #[inline]
411 pub async fn fetch_all_as<'c, Adapter, O>(self, db_adapter: Adapter) -> Result<Vec<O>, Error>
412 where
413 'q: 'c,
414 Adapter: DBBackend<'c, DB>,
415 O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
416 {
417 self.fetch_as(db_adapter).await.try_collect().await
418 }
419 pub async fn fetch_one_as<'c, Adapter, O>(self, db_adapter: Adapter) -> Result<O, Error>
433 where
434 'q: 'c,
435 Adapter: DBBackend<'c, DB>,
436 O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
437 {
438 self.fetch_optional_as(db_adapter)
439 .await
440 .and_then(|row| row.ok_or(Error::RowNotFound))
441 }
442 pub async fn fetch_optional_as<'c, Adapter, O>(
456 self,
457
458 db_adapter: Adapter,
459 ) -> Result<Option<O>, Error>
460 where
461 'q: 'c,
462 Adapter: DBBackend<'c, DB>,
463 O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
464 {
465 let row = self.fetch_optional(db_adapter).await?;
466 if let Some(row) = row {
467 O::from_row(&row).map(Some)
468 } else {
469 Ok(None)
470 }
471 }
472}