1use std::marker::PhantomData;
2
3use askama::Result;
4use futures_core::stream::BoxStream;
5use futures_util::{StreamExt, TryStreamExt, stream};
6
7use crate::{AdapterExecutor, DBType, SqlTemplate};
8use sqlx_core::{
9 Either, Error, database::Database, encode::Encode, executor::Executor, from_row::FromRow,
10 types::Type,
11};
12
13use super::{DatabaseDialect, db_adapter::BackendDB, sql_template_execute::SqlTemplateExecute};
14
15#[derive(Debug)]
17pub struct PageInfo {
18 pub total: i64,
20 pub page_size: i64,
22 pub page_count: i64,
24}
25
26impl PageInfo {
27 pub fn new(total: i64, page_size: i64) -> PageInfo {
33 let mut page_count = total / page_size;
34 if total % page_size > 0 {
35 page_count += 1;
36 }
37 Self {
38 total,
39 page_size,
40 page_count,
41 }
42 }
43}
44pub struct DBAdapterManager<'q, DB, T>
51where
52 DB: Database,
53 T: SqlTemplate<'q, DB>,
54{
55 pub(crate) sql: String,
56 pub(crate) template: T,
57 persistent: bool,
58 _p: PhantomData<&'q DB>,
59 page_size: Option<i64>,
60 page_no: Option<i64>,
61}
62
63impl<'q, DB, T> DBAdapterManager<'q, DB, T>
64where
65 DB: Database,
66 T: SqlTemplate<'q, DB>,
67{
68 pub fn new(template: T) -> Self {
73 Self {
74 sql: String::new(),
75 template,
76 persistent: true,
77 page_no: None,
78 page_size: None,
79 _p: PhantomData,
80 }
81 }
82
83 pub fn sql(&self) -> &String {
84 &self.sql
85 }
86}
87impl<'q, DB, T> DBAdapterManager<'q, DB, T>
88where
89 DB: Database,
90 T: SqlTemplate<'q, DB>,
91 i64: Encode<'q, DB> + Type<DB>,
92{
93 pub fn set_persistent(mut self, persistent: bool) -> Self {
95 self.persistent = persistent;
96 self
97 }
98 #[inline]
103 pub async fn count<'c, C, Adapter>(&'q mut self, db_adapter: Adapter) -> Result<i64, Error>
104 where
105 C: Executor<'c, Database = DB> + 'c,
106 Adapter: BackendDB<'c, DB, C>,
107 (i64,): for<'r> FromRow<'r, DB::Row>,
108 AdapterExecutor<'c, DB, C>: Executor<'c, Database = DB> + 'c,
109 {
110 let (mut sql, arg, db_type, executor) = Self::render_sql_with_adapter(
111 self.template.clone(),
112 db_adapter,
113 self.page_no,
114 self.page_size,
115 )
116 .await?;
117
118 db_type.write_count_sql(&mut sql);
119 self.sql = sql;
120 let execute = SqlTemplateExecute::new(&self.sql, arg).set_persistent(self.persistent);
121 let (count,): (i64,) = execute.fetch_one_as(executor).await?;
122 Ok(count)
123 }
124 #[inline]
130 pub async fn count_page<'c, C, Adapter>(
131 &'q mut self,
132
133 page_size: i64,
134 db_adapter: Adapter,
135 ) -> Result<PageInfo, Error>
136 where
137 C: Executor<'c, Database = DB> + 'c,
138 Adapter: BackendDB<'c, DB, C>,
139 AdapterExecutor<'c, DB, C>: Executor<'c, Database = DB> + 'c,
140 (i64,): for<'r> FromRow<'r, DB::Row>,
141 {
142 let count = self.count(db_adapter).await?;
143
144 Ok(PageInfo::new(count, page_size))
145 }
146 pub fn set_page(mut self, page_size: i64, page_no: i64) -> Self {
148 self.page_no = Some(page_no);
149 self.page_size = Some(page_size);
150 self
151 }
152 #[inline]
154 pub async fn render_sql_with_adapter<'c, C, Adapter>(
155 template: T,
156
157 db_adapter: Adapter,
158 page_no: Option<i64>,
159 page_size: Option<i64>,
160 ) -> Result<
161 (
162 String,
163 Option<DB::Arguments<'q>>,
164 DBType,
165 AdapterExecutor<'c, DB, C>,
166 ),
167 Error,
168 >
169 where
170 C: Executor<'c, Database = DB> + 'c,
171 Adapter: BackendDB<'c, DB, C>,
172 {
173 let (db_type, executor) = db_adapter.backend_db().await?;
174 let f = db_type.get_encode_placeholder_fn();
175 let mut sql = String::new();
176 let mut arg = template.render_sql_with_encode_placeholder_fn(f, &mut sql)?;
177
178 if let (Some(page_no), Some(page_size)) = (page_no, page_size) {
179 let mut args = arg.unwrap_or_default();
180 db_type.write_page_sql(&mut sql, page_size, page_no, &mut args)?;
181 arg = Some(args);
182 }
183 Ok((sql, arg, db_type, executor))
184 }
185
186 #[inline]
189 pub async fn execute<'c, C, Adapter>(
190 &'q mut self,
191 db_adapter: Adapter,
192 ) -> Result<DB::QueryResult, Error>
193 where
194 C: Executor<'c, Database = DB> + 'c,
195 Adapter: BackendDB<'c, DB, C>,
196 AdapterExecutor<'c, DB, C>: Executor<'c, Database = DB> + 'c,
197 {
198 self.execute_many(db_adapter).await.try_collect().await
199 }
200 #[inline]
203 pub async fn execute_many<'c, 'e, C, Adapter>(
204 &'q mut self,
205
206 db_adapter: Adapter,
207 ) -> BoxStream<'e, Result<DB::QueryResult, Error>>
208 where
209 'c: 'e,
210 'q: 'e,
211 C: Executor<'c, Database = DB> + 'c,
212 Adapter: BackendDB<'c, DB, C>,
213 AdapterExecutor<'c, DB, C>: Executor<'c, Database = DB> + 'c,
214 {
215 self.fetch_many(db_adapter)
216 .await
217 .try_filter_map(|step| async move {
218 Ok(match step {
219 Either::Left(rows) => Some(rows),
220 Either::Right(_) => None,
221 })
222 })
223 .boxed()
224 }
225 #[inline]
228 pub async fn fetch<'c, 'e, C, Adapter>(
229 &'q mut self,
230
231 db_adapter: Adapter,
232 ) -> BoxStream<'e, Result<DB::Row, Error>>
233 where
234 C: Executor<'c, Database = DB> + 'c,
235 Adapter: BackendDB<'c, DB, C>,
236 AdapterExecutor<'c, DB, C>: Executor<'c, Database = DB> + 'c,
237 'c: 'e,
238 'q: 'e,
239 {
240 self.fetch_many(db_adapter)
241 .await
242 .try_filter_map(|step| async move {
243 Ok(match step {
244 Either::Left(_) => None,
245 Either::Right(row) => Some(row),
246 })
247 })
248 .boxed()
249 }
250 #[inline]
256 #[allow(clippy::type_complexity)]
257 pub async fn fetch_many<'c, 'e, C, Adapter>(
258 &'q mut self,
259 db_adapter: Adapter,
260 ) -> BoxStream<'e, Result<Either<DB::QueryResult, DB::Row>, Error>>
261 where
262 'c: 'e,
263 'q: 'e,
264 C: Executor<'c, Database = DB> + 'c,
265 Adapter: BackendDB<'c, DB, C>,
266 AdapterExecutor<'c, DB, C>: Executor<'c, Database = DB> + 'c,
267 {
268 let res = Self::render_sql_with_adapter(
269 self.template.clone(),
270 db_adapter,
271 self.page_no,
272 self.page_size,
273 )
274 .await;
275
276 match res {
277 Ok((sql, arg, _db_type, executor)) => {
278 self.sql = sql;
279 let execute =
280 SqlTemplateExecute::new(&self.sql, arg).set_persistent(self.persistent);
281 execute.fetch_many(executor)
282 }
283 Err(e) => stream::once(async move { Err(e) }).boxed(),
284 }
285 }
286
287 #[inline]
296 pub async fn fetch_all<'c, C, Adapter>(
297 &'q mut self,
298 db_adapter: Adapter,
299 ) -> Result<Vec<DB::Row>, Error>
300 where
301 C: Executor<'c, Database = DB> + 'c,
302 Adapter: BackendDB<'c, DB, C>,
303 AdapterExecutor<'c, DB, C>: Executor<'c, Database = DB> + 'c,
304 {
305 self.fetch(db_adapter).await.try_collect().await
306 }
307 #[inline]
321 pub async fn fetch_one<'c, C, Adapter>(
322 &'q mut self,
323 db_adapter: Adapter,
324 ) -> Result<DB::Row, Error>
325 where
326 C: Executor<'c, Database = DB> + 'c,
327 Adapter: BackendDB<'c, DB, C>,
328 AdapterExecutor<'c, DB, C>: Executor<'c, Database = DB> + 'c,
329 {
330 self.fetch_optional(db_adapter)
331 .await
332 .and_then(|row| match row {
333 Some(row) => Ok(row),
334 None => Err(Error::RowNotFound),
335 })
336 }
337 #[inline]
351 pub async fn fetch_optional<'c, C, Adapter>(
352 &'q mut self,
353
354 db_adapter: Adapter,
355 ) -> Result<Option<DB::Row>, Error>
356 where
357 C: Executor<'c, Database = DB> + 'c,
358 Adapter: BackendDB<'c, DB, C>,
359 AdapterExecutor<'c, DB, C>: Executor<'c, Database = DB> + 'c,
360 {
361 self.fetch(db_adapter).await.try_next().await
362 }
363
364 pub async fn fetch_as<'c, 'e, C, Adapter, O>(
367 &'q mut self,
368
369 db_adapter: Adapter,
370 ) -> BoxStream<'e, Result<O, Error>>
371 where
372 'c: 'e,
373 'q: 'e,
374 C: Executor<'c, Database = DB> + 'c,
375 Adapter: BackendDB<'c, DB, C>,
376 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
377 AdapterExecutor<'c, DB, C>: Executor<'c, Database = DB> + 'c,
378 {
379 self.fetch_many_as(db_adapter)
380 .await
381 .try_filter_map(|step| async move { Ok(step.right()) })
382 .boxed()
383 }
384 pub async fn fetch_many_as<'c, 'e, C, Adapter, O>(
388 &'q mut self,
389 db_adapter: Adapter,
390 ) -> BoxStream<'e, Result<Either<DB::QueryResult, O>, Error>>
391 where
392 'c: 'e,
393 'q: 'e,
394 C: Executor<'c, Database = DB> + 'c,
395 AdapterExecutor<'c, DB, C>: Executor<'c, Database = DB> + 'c,
396 Adapter: BackendDB<'c, DB, C>,
397 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
398 {
399 self.fetch_many(db_adapter)
400 .await
401 .map(|v| match v {
402 Ok(Either::Right(row)) => O::from_row(&row).map(Either::Right),
403 Ok(Either::Left(v)) => Ok(Either::Left(v)),
404 Err(e) => Err(e),
405 })
406 .boxed()
407 }
408 #[inline]
417 pub async fn fetch_all_as<'c, C, Adapter, O>(
418 &'q mut self,
419 db_adapter: Adapter,
420 ) -> Result<Vec<O>, Error>
421 where
422 C: Executor<'c, Database = DB> + 'c,
423 Adapter: BackendDB<'c, DB, C>,
424 AdapterExecutor<'c, DB, C>: Executor<'c, Database = DB> + 'c,
425 O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
426 {
427 self.fetch_as(db_adapter).await.try_collect().await
428 }
429 pub async fn fetch_one_as<'c, C, Adapter, O>(
443 &'q mut self,
444 db_adapter: Adapter,
445 ) -> Result<O, Error>
446 where
447 C: Executor<'c, Database = DB> + 'c,
448 Adapter: BackendDB<'c, DB, C>,
449 O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
450 AdapterExecutor<'c, DB, C>: Executor<'c, Database = DB> + 'c,
451 {
452 self.fetch_optional_as(db_adapter)
453 .await
454 .and_then(|row| row.ok_or(Error::RowNotFound))
455 }
456 pub async fn fetch_optional_as<'c, C, Adapter, O>(
470 &'q mut self,
471
472 db_adapter: Adapter,
473 ) -> Result<Option<O>, Error>
474 where
475 C: Executor<'c, Database = DB> + 'c,
476 Adapter: BackendDB<'c, DB, C>,
477 AdapterExecutor<'c, DB, C>: Executor<'c, Database = DB> + 'c,
478 O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
479 {
480 let row = self.fetch_optional(db_adapter).await?;
481 if let Some(row) = row {
482 O::from_row(&row).map(Some)
483 } else {
484 Ok(None)
485 }
486 }
487}