1use std::marker::PhantomData;
2
3use crate::SqlTemplate;
4use askama::Result;
5use futures_core::{Stream, future::BoxFuture, stream::BoxStream};
6
7use futures_util::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, future};
8use sqlx_core::{
9 Either, Error, database::Database, encode::Encode, from_row::FromRow, types::Type,
10};
11
12use super::{DatabaseDialect, db_adapter::BackendDB, sql_template_execute::SqlTemplateExecute};
13
14#[derive(Debug, PartialEq, Eq)]
16pub struct PageInfo {
17 pub total: i64,
19 pub page_size: i64,
21 pub page_count: i64,
23}
24
25impl PageInfo {
26 pub fn new(total: i64, page_size: i64) -> PageInfo {
32 let mut page_count = total / page_size;
33 if total % page_size > 0 {
34 page_count += 1;
35 }
36 Self {
37 total,
38 page_size,
39 page_count,
40 }
41 }
42}
43pub struct DBAdapterManager<'q, DB, T>
50where
51 DB: Database,
52 T: SqlTemplate<'q, DB>,
53{
54 pub(crate) sql: String,
55 pub(crate) template: T,
56 persistent: bool,
57 _p: PhantomData<&'q DB>,
58 page_size: Option<i64>,
59 page_no: Option<i64>,
60}
61
62impl<'q, DB, T> DBAdapterManager<'q, DB, T>
63where
64 DB: Database,
65 T: SqlTemplate<'q, DB>,
66{
67 pub fn new(template: T) -> Self {
72 Self {
73 sql: String::new(),
74 template,
75 persistent: true,
76 page_no: None,
77 page_size: None,
78 _p: PhantomData,
79 }
80 }
81
82 pub fn sql(&self) -> &String {
83 &self.sql
84 }
85}
86impl<'q, 'c, 'e, DB, T> DBAdapterManager<'q, DB, T>
87where
88 DB: Database + Sync,
89 T: SqlTemplate<'q, DB> + Send + 'q,
90 i64: Encode<'q, DB> + Type<DB>,
91 DB::Arguments<'q>: 'q,
92 'q: 'e,
93 'c: 'e,
94{
95 pub fn set_persistent(mut self, persistent: bool) -> Self {
97 self.persistent = persistent;
98 self
99 }
100 #[inline]
105 pub fn count<Adapter>(&'q mut self, db_adapter: Adapter) -> BoxFuture<'e, Result<i64, Error>>
106 where
107 Adapter: BackendDB<'c, DB> + 'c,
108 (i64,): for<'r> FromRow<'r, DB::Row>,
109 {
110 let template = self.template.clone();
111 let page_no = self.page_no;
112 let page_size = self.page_size;
113 async move {
114 let (db_type, executor) = db_adapter.backend_db().await?;
115 let f = db_type.get_encode_placeholder_fn();
116 let mut sql = String::new();
117 let mut arg = template.render_sql_with_encode_placeholder_fn(f, &mut sql)?;
118
119 if let (Some(page_no), Some(page_size)) = (page_no, page_size) {
120 let mut args = arg.unwrap_or_default();
121 db_type.write_page_sql(&mut sql, page_size, page_no, &mut args)?;
122 arg = Some(args);
123 }
124
125 db_type.write_count_sql(&mut sql);
126 self.sql = sql;
127 let execute = SqlTemplateExecute::new(&self.sql, arg).set_persistent(self.persistent);
128 let (count,): (i64,) = execute.fetch_one_as(executor).await?;
129 Ok(count)
130 }
131 .boxed()
132 }
133 #[inline]
139 pub async fn count_page<Adapter>(
140 &'q mut self,
141 page_size: i64,
142 db_adapter: Adapter,
143 ) -> Result<PageInfo, Error>
144 where
145 Adapter: BackendDB<'c, DB> + 'c,
146 (i64,): for<'r> FromRow<'r, DB::Row>,
147 {
148 let count = self.count(db_adapter).await?;
149 Ok(PageInfo::new(count, page_size))
150 }
151 pub fn set_page(mut self, page_size: i64, page_no: i64) -> Self {
153 self.page_no = Some(page_no);
154 self.page_size = Some(page_size);
155 self
156 }
157
158 #[inline]
161 pub async fn execute<Adapter>(
162 &'q mut self,
163 db_adapter: Adapter,
164 ) -> Result<DB::QueryResult, Error>
165 where
166 Adapter: BackendDB<'c, DB> + 'c,
167 {
168 self.execute_many(db_adapter).try_collect().await
169 }
170 #[inline]
173 pub fn execute_many<Adapter>(
174 &'q mut self,
175
176 db_adapter: Adapter,
177 ) -> impl Stream<Item = Result<DB::QueryResult, Error>>
178 where
179 Adapter: BackendDB<'c, DB> + 'c,
180 {
181 self.fetch_many(db_adapter)
182 .try_filter_map(|step| async move {
183 Ok(match step {
184 Either::Left(rows) => Some(rows),
185 Either::Right(_) => None,
186 })
187 })
188 }
189 #[inline]
192 pub fn fetch<Adapter>(
193 &'q mut self,
194 db_adapter: Adapter,
195 ) -> impl Stream<Item = Result<DB::Row, Error>>
196 where
197 Adapter: BackendDB<'c, DB> + 'c,
198 {
199 self.fetch_many(db_adapter)
200 .try_filter_map(|step| async move {
201 Ok(match step {
202 Either::Left(_) => None,
203 Either::Right(row) => Some(row),
204 })
205 })
206 }
207 #[inline]
213 #[allow(clippy::type_complexity)]
214 pub fn fetch_many<Adapter>(
215 &'q mut self,
216 db_adapter: Adapter,
217 ) -> BoxStream<'e, Result<Either<DB::QueryResult, DB::Row>, Error>>
218 where
219 Adapter: BackendDB<'c, DB> + 'c,
220 {
221 let template = self.template.clone();
222 let page_no = self.page_no;
223 let page_size = self.page_size;
224 Box::pin(async_stream::try_stream! {
225 let (db_type, executor) = db_adapter.backend_db().await?;
226 let f = db_type.get_encode_placeholder_fn();
227 let mut sql = String::new();
228 let mut arg = template.render_sql_with_encode_placeholder_fn(f, &mut sql)?;
229
230 if let (Some(page_no), Some(page_size)) = (page_no, page_size) {
231 let mut args = arg.unwrap_or_default();
232 db_type.write_page_sql(&mut sql, page_size, page_no, &mut args)?;
233 arg = Some(args);
234 }
235
236 self.sql = sql;
237 let execute = SqlTemplateExecute::new(&self.sql, arg).set_persistent(self.persistent);
238 let mut stream = execute.fetch_many(executor);
239 while let Some(item) = stream.try_next().await? {
240 yield item;
241 }
242 })
243 }
244
245 #[inline]
254 pub async fn fetch_all<Adapter>(
255 &'q mut self,
256 db_adapter: Adapter,
257 ) -> Result<Vec<DB::Row>, Error>
258 where
259 Adapter: BackendDB<'c, DB> + 'c,
260 {
261 self.fetch(db_adapter).try_collect().await
262 }
263 #[inline]
277 pub async fn fetch_one<Adapter>(&'q mut self, db_adapter: Adapter) -> Result<DB::Row, Error>
278 where
279 Adapter: BackendDB<'c, DB> + 'c,
280 {
281 self.fetch_optional(db_adapter)
282 .and_then(|row| match row {
283 Some(row) => future::ok(row),
284 None => future::err(Error::RowNotFound),
285 })
286 .await
287 }
288 #[inline]
302 pub async fn fetch_optional<Adapter>(
303 &'q mut self,
304
305 db_adapter: Adapter,
306 ) -> Result<Option<DB::Row>, Error>
307 where
308 Adapter: BackendDB<'c, DB> + 'c,
309 {
310 let row = self.fetch_many(db_adapter).try_next().await?;
311 match row {
312 Some(Either::Right(row)) => Ok(Some(row)),
313 Some(Either::Left(_)) => Ok(None),
314 None => Ok(None),
315 }
316 }
317
318 pub async fn fetch_as<Adapter, O>(
321 &'q mut self,
322
323 db_adapter: Adapter,
324 ) -> impl Stream<Item = Result<O, Error>>
325 where
326 Adapter: BackendDB<'c, DB> + 'c,
327 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
328 {
329 self.fetch_many_as(db_adapter)
330 .try_filter_map(|step| async move { Ok(step.right()) })
331 }
332 pub fn fetch_many_as<Adapter, O>(
336 &'q mut self,
337 db_adapter: Adapter,
338 ) -> BoxStream<'e, Result<Either<DB::QueryResult, O>, Error>>
339 where
340 'q: 'e,
341 Adapter: BackendDB<'c, DB> + 'c,
342 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
343 {
344 let template = self.template.clone();
345 let page_no = self.page_no;
346 let page_size = self.page_size;
347 Box::pin(async_stream::try_stream! {
348 let (db_type, executor) = db_adapter.backend_db().await?;
349 let f = db_type.get_encode_placeholder_fn();
350 let mut sql = String::new();
351 let mut arg = template.render_sql_with_encode_placeholder_fn(f, &mut sql)?;
352
353 if let (Some(page_no), Some(page_size)) = (page_no, page_size) {
354 let mut args = arg.unwrap_or_default();
355 db_type.write_page_sql(&mut sql, page_size, page_no, &mut args)?;
356 arg = Some(args);
357 }
358
359 self.sql = sql;
360 let execute = SqlTemplateExecute::new(&self.sql, arg).set_persistent(self.persistent);
361 let mut stream = execute.fetch_many(executor).map(|v| match v {
362 Ok(Either::Right(row)) => O::from_row(&row).map(Either::Right),
363 Ok(Either::Left(v)) => Ok(Either::Left(v)),
364 Err(e) => Err(e),
365 });
366 while let Some(item) = stream.try_next().await? {
367 yield item;
368 }
369 })
370 }
371 #[inline]
380 pub async fn fetch_all_as<Adapter, O>(
381 &'q mut self,
382 db_adapter: Adapter,
383 ) -> Result<Vec<O>, Error>
384 where
385 Adapter: BackendDB<'c, DB> + 'c,
386 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
387 {
388 self.fetch_as(db_adapter).await.try_collect().await
389 }
390 pub async fn fetch_one_as<Adapter, O>(&'q mut self, db_adapter: Adapter) -> Result<O, Error>
404 where
405 Adapter: BackendDB<'c, DB> + 'c,
406 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
407 {
408 self.fetch_optional_as(db_adapter)
409 .and_then(|o| match o {
410 Some(o) => future::ok(o),
411 None => future::err(Error::RowNotFound),
412 })
413 .await
414 }
415 pub async fn fetch_optional_as<Adapter, O>(
429 &'q mut self,
430
431 db_adapter: Adapter,
432 ) -> Result<Option<O>, Error>
433 where
434 Adapter: BackendDB<'c, DB> + 'c,
435 O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
436 {
437 self.fetch_optional(db_adapter)
438 .and_then(|opt_row| async move {
439 if let Some(row) = opt_row {
440 O::from_row(&row).map(Some)
441 } else {
442 Ok(None)
443 }
444 })
445 .await
446 }
447}