1use std::marker::PhantomData;
2
3use askama::Result;
4use futures_core::stream::BoxStream;
5use futures_util::{StreamExt, TryStreamExt, stream};
6
7use crate::SqlTemplate;
8use sqlx_core::{
9 Either, Error, database::Database, encode::Encode, executor::Executor, from_row::FromRow,
10 types::Type,
11};
12
13use super::{DatabaseDialect, db_adapter::BackendDB, sql_templte_execute::SqlTemplateExecute};
14
15#[derive(Debug)]
17pub struct PageInfo {
18 pub total: i64,
20 pub page_size: i64,
22 pub page_count: i64,
24}
25
26impl PageInfo {
27 pub fn new(total: i64, page_size: i64) -> PageInfo {
33 let mut page_count = total / page_size;
34 if total % page_size > 0 {
35 page_count += 1;
36 }
37 Self {
38 total,
39 page_size,
40 page_count,
41 }
42 }
43}
44pub struct DBAdapterManager<'q, DB, T>
51where
52 DB: Database,
53 T: SqlTemplate<'q, DB>,
54{
55 pub(crate) sql_buff: &'q mut String,
56 pub(crate) template: T,
57 persistent: bool,
58 _p: PhantomData<&'q DB>,
59 page_size: Option<i64>,
60 page_no: Option<i64>,
61}
62impl<'q, DB, T> DBAdapterManager<'q, DB, T>
63where
64 DB: Database,
65 T: SqlTemplate<'q, DB>,
66{
67 pub fn new(template: T, sql_buff: &'q mut String) -> Self {
73 Self {
74 sql_buff,
75 template,
76 persistent: true,
77 page_no: None,
78 page_size: None,
79 _p: PhantomData,
80 }
81 }
82}
83impl<'q, DB, T> DBAdapterManager<'q, DB, T>
84where
85 DB: Database,
86 T: SqlTemplate<'q, DB>,
87 i64: Encode<'q, DB> + Type<DB>,
88{
89 pub fn set_persistent(mut self, persistent: bool) -> Self {
91 self.persistent = persistent;
92 self
93 }
94 #[inline]
99 pub async fn count<'c, Adapter>(self, db_adapter: Adapter) -> Result<i64, Error>
100 where
101 Adapter: BackendDB<'c, DB>,
102 (i64,): for<'r> FromRow<'r, DB::Row>,
103 'q: 'c,
104 {
105 let sql_buff = self.sql_buff;
106 let (mut sql, arg, db_type, executor) =
107 Self::render_adapter_sql(self.template, db_adapter, None, None).await?;
108
109 db_type.write_count_sql(&mut sql);
110 *sql_buff = sql;
111 let execute = SqlTemplateExecute::new(&*sql_buff, arg).set_persistent(self.persistent);
112 let (count,): (i64,) = execute.fetch_one_as(executor).await?;
113 Ok(count)
114 }
115 #[inline]
121 pub async fn count_page<'c, Adapter>(
122 self,
123
124 page_size: i64,
125 db_adapter: Adapter,
126 ) -> Result<PageInfo, Error>
127 where
128 Adapter: BackendDB<'c, DB>,
129 (i64,): for<'r> FromRow<'r, DB::Row>,
130 'q: 'c,
131 {
132 let count = self.count(db_adapter).await?;
133
134 Ok(PageInfo::new(count, page_size))
135 }
136 pub fn set_page(mut self, page_size: i64, page_no: i64) -> Self {
138 self.page_no = Some(page_no);
139 self.page_size = Some(page_size);
140 self
141 }
142 #[inline]
144 pub async fn render_adapter_sql<'c, Adapter>(
145 template: T,
146
147 db_adapter: Adapter,
148 page_no: Option<i64>,
149 page_size: Option<i64>,
150 ) -> Result<
151 (
152 String,
153 Option<DB::Arguments<'q>>,
154 impl DatabaseDialect,
155 impl Executor<'c, Database = DB>,
156 ),
157 Error,
158 >
159 where
160 'q: 'c,
161 Adapter: BackendDB<'c, DB>,
162 {
163 let (db_type, executor) = db_adapter.backend_db().await?;
164 let f = db_type.get_encode_placeholder_fn();
165 let mut sql = String::new();
166 let mut arg = template.render_sql_with_encode_placeholder_fn(f, &mut sql)?;
167
168 if let (Some(page_no), Some(page_size)) = (page_no, page_size) {
169 let mut args = arg.unwrap_or_default();
170 db_type.write_page_sql(&mut sql, page_size, page_no, &mut args)?;
171 arg = Some(args);
172 }
173 Ok((sql, arg, db_type, executor))
174 }
175
176 #[inline]
179 pub async fn execute<'c, 'e, Adapter>(
180 self,
181 db_adapter: Adapter,
182 ) -> Result<DB::QueryResult, Error>
183 where
184 'q: 'c,
185 Adapter: BackendDB<'c, DB>,
186 {
187 self.execute_many(db_adapter).await.try_collect().await
188 }
189 #[inline]
192 pub async fn execute_many<'c, 'e, Adapter>(
193 self,
194
195 db_adapter: Adapter,
196 ) -> BoxStream<'e, Result<DB::QueryResult, Error>>
197 where
198 'q: 'c,
199 'c: 'e,
200 Adapter: BackendDB<'c, DB>,
201 {
202 self.fetch_many(db_adapter)
203 .await
204 .try_filter_map(|step| async move {
205 Ok(match step {
206 Either::Left(rows) => Some(rows),
207 Either::Right(_) => None,
208 })
209 })
210 .boxed()
211 }
212 #[inline]
215 pub async fn fetch<'c, 'e, Adapter>(
216 self,
217
218 db_adapter: Adapter,
219 ) -> BoxStream<'e, Result<DB::Row, Error>>
220 where
221 'q: 'c,
222 'c: 'e,
223 Adapter: BackendDB<'c, DB>,
224 {
225 self.fetch_many(db_adapter)
226 .await
227 .try_filter_map(|step| async move {
228 Ok(match step {
229 Either::Left(_) => None,
230 Either::Right(row) => Some(row),
231 })
232 })
233 .boxed()
234 }
235 #[inline]
241 #[allow(clippy::type_complexity)]
242 pub async fn fetch_many<'c, 'e, Adapter>(
243 self,
244 db_adapter: Adapter,
245 ) -> BoxStream<'e, Result<Either<DB::QueryResult, DB::Row>, Error>>
246 where
247 'q: 'c,
248 'c: 'e,
249 Adapter: BackendDB<'c, DB>,
250 {
251 let sql_buff = self.sql_buff;
252 let res =
253 Self::render_adapter_sql(self.template, db_adapter, self.page_no, self.page_size).await;
254
255 match res {
256 Ok((sql, arg, _db_type, executor)) => {
257 *sql_buff = sql;
258 let execute =
259 SqlTemplateExecute::new(&*sql_buff, arg).set_persistent(self.persistent);
260 execute.fetch_many(executor)
261 }
262 Err(e) => stream::once(async move { Err(e) }).boxed(),
263 }
264 }
265
266 #[inline]
275 pub async fn fetch_all<'c, Adapter>(self, db_adapter: Adapter) -> Result<Vec<DB::Row>, Error>
276 where
277 'q: 'c,
278 Adapter: BackendDB<'c, DB>,
279 {
280 self.fetch(db_adapter).await.try_collect().await
281 }
282 #[inline]
296 pub async fn fetch_one<'c, Adapter>(self, db_adapter: Adapter) -> Result<DB::Row, Error>
297 where
298 'q: 'c,
299 Adapter: BackendDB<'c, DB>,
300 {
301 self.fetch_optional(db_adapter)
302 .await
303 .and_then(|row| match row {
304 Some(row) => Ok(row),
305 None => Err(Error::RowNotFound),
306 })
307 }
308 #[inline]
322 pub async fn fetch_optional<'c, Adapter>(
323 self,
324
325 db_adapter: Adapter,
326 ) -> Result<Option<DB::Row>, Error>
327 where
328 'q: 'c,
329 Adapter: BackendDB<'c, DB>,
330 {
331 self.fetch(db_adapter).await.try_next().await
332 }
333
334 pub async fn fetch_as<'c, 'e, Adapter, O>(
337 self,
338
339 db_adapter: Adapter,
340 ) -> BoxStream<'e, Result<O, Error>>
341 where
342 'q: 'c,
343 'c: 'e,
344 Adapter: BackendDB<'c, DB>,
345 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
346 {
347 self.fetch_many_as(db_adapter)
348 .await
349 .try_filter_map(|step| async move { Ok(step.right()) })
350 .boxed()
351 }
352 pub async fn fetch_many_as<'c, 'e, Adapter, O>(
356 self,
357 db_adapter: Adapter,
358 ) -> BoxStream<'e, Result<Either<DB::QueryResult, O>, Error>>
359 where
360 'q: 'c,
361 'c: 'e,
362 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
363 Adapter: BackendDB<'c, DB>,
364 {
365 self.fetch_many(db_adapter)
366 .await
367 .map(|v| match v {
368 Ok(Either::Right(row)) => O::from_row(&row).map(Either::Right),
369 Ok(Either::Left(v)) => Ok(Either::Left(v)),
370 Err(e) => Err(e),
371 })
372 .boxed()
373 }
374 #[inline]
383 pub async fn fetch_all_as<'c, Adapter, O>(self, db_adapter: Adapter) -> Result<Vec<O>, Error>
384 where
385 'q: 'c,
386 Adapter: BackendDB<'c, DB>,
387 O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
388 {
389 self.fetch_as(db_adapter).await.try_collect().await
390 }
391 pub async fn fetch_one_as<'c, Adapter, O>(self, db_adapter: Adapter) -> Result<O, Error>
405 where
406 'q: 'c,
407 Adapter: BackendDB<'c, DB>,
408 O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
409 {
410 self.fetch_optional_as(db_adapter)
411 .await
412 .and_then(|row| row.ok_or(Error::RowNotFound))
413 }
414 pub async fn fetch_optional_as<'c, Adapter, O>(
428 self,
429
430 db_adapter: Adapter,
431 ) -> Result<Option<O>, Error>
432 where
433 'q: 'c,
434 Adapter: BackendDB<'c, DB>,
435 O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
436 {
437 let row = self.fetch_optional(db_adapter).await?;
438 if let Some(row) = row {
439 O::from_row(&row).map(Some)
440 } else {
441 Ok(None)
442 }
443 }
444}