1use std::marker::PhantomData;
2
3use crate::SqlTemplate;
4use askama::Result;
5use futures_core::{Stream, future::BoxFuture, stream::BoxStream};
6
7use futures_util::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, future};
8use sqlx_core::{
9 Either, Error, database::Database, encode::Encode, from_row::FromRow, types::Type,
10};
11
12use super::{DatabaseDialect, db_adapter::BackendDB, sql_template_execute::SqlTemplateExecute};
13
14#[derive(Debug, PartialEq, Eq)]
16pub struct PageInfo {
17 pub total: i64,
19 pub page_size: i64,
21 pub page_count: i64,
23}
24
25impl PageInfo {
26 pub fn new(total: i64, page_size: i64) -> PageInfo {
32 let mut page_count = total / page_size;
33 if total % page_size > 0 {
34 page_count += 1;
35 }
36 Self {
37 total,
38 page_size,
39 page_count,
40 }
41 }
42}
43pub struct DBAdapterManager<'q, DB, T>
50where
51 DB: Database,
52 T: SqlTemplate<'q, DB>,
53{
54 pub(crate) sql: String,
55 pub(crate) template: T,
56 persistent: bool,
57 _p: PhantomData<&'q DB>,
58 page_size: Option<i64>,
59 page_no: Option<i64>,
60}
61
62impl<'q, DB, T> DBAdapterManager<'q, DB, T>
63where
64 DB: Database,
65 T: SqlTemplate<'q, DB>,
66{
67 pub fn new(template: T) -> Self {
72 Self {
73 sql: String::new(),
74 template,
75 persistent: true,
76 page_no: None,
77 page_size: None,
78 _p: PhantomData,
79 }
80 }
81
82 pub fn sql(&self) -> &String {
83 &self.sql
84 }
85}
86impl<'q, 'c, 'e, DB, T> DBAdapterManager<'q, DB, T>
87where
88 DB: Database + Sync,
89 T: SqlTemplate<'q, DB> + Send + 'q,
90 i64: Encode<'q, DB> + Type<DB>,
91 DB::Arguments<'q>: 'q,
92 'q: 'e,
93 'c: 'e,
94{
95 pub fn set_persistent(mut self, persistent: bool) -> Self {
97 self.persistent = persistent;
98 self
99 }
100 #[inline]
105 pub fn count<Adapter>(&'q mut self, db_adapter: Adapter) -> BoxFuture<'e, Result<i64, Error>>
106 where
107 Adapter: BackendDB<'c, DB> + 'c,
108 (i64,): for<'r> FromRow<'r, DB::Row>,
109 {
110 let template = self.template.clone();
111
112 async move {
113 let (db_type, executor) = db_adapter.backend_db().await?;
114 let f = db_type.get_encode_placeholder_fn();
115 let mut sql = String::new();
116 let arg = template.render_sql_with_encode_placeholder_fn(f, &mut sql)?;
117
118 db_type.write_count_sql(&mut sql);
119 self.sql = sql;
120 let execute = SqlTemplateExecute::new(&self.sql, arg).set_persistent(self.persistent);
121 let (count,): (i64,) = execute.fetch_one_as(executor).await?;
122 Ok(count)
123 }
124 .boxed()
125 }
126 #[inline]
132 pub async fn count_page<Adapter>(
133 &'q mut self,
134 page_size: i64,
135 db_adapter: Adapter,
136 ) -> Result<PageInfo, Error>
137 where
138 Adapter: BackendDB<'c, DB> + 'c,
139 (i64,): for<'r> FromRow<'r, DB::Row>,
140 {
141 let count = self.count(db_adapter).await?;
142 Ok(PageInfo::new(count, page_size))
143 }
144 pub fn set_page(mut self, page_size: i64, page_no: i64) -> Self {
146 self.page_no = Some(page_no);
147 self.page_size = Some(page_size);
148 self
149 }
150
151 #[inline]
154 pub async fn execute<Adapter>(
155 &'q mut self,
156 db_adapter: Adapter,
157 ) -> Result<DB::QueryResult, Error>
158 where
159 Adapter: BackendDB<'c, DB> + 'c,
160 {
161 self.execute_many(db_adapter).try_collect().await
162 }
163 #[inline]
166 pub fn execute_many<Adapter>(
167 &'q mut self,
168
169 db_adapter: Adapter,
170 ) -> impl Stream<Item = Result<DB::QueryResult, Error>>
171 where
172 Adapter: BackendDB<'c, DB> + 'c,
173 {
174 self.fetch_many(db_adapter)
175 .try_filter_map(|step| async move {
176 Ok(match step {
177 Either::Left(rows) => Some(rows),
178 Either::Right(_) => None,
179 })
180 })
181 }
182 #[inline]
185 pub fn fetch<Adapter>(
186 &'q mut self,
187 db_adapter: Adapter,
188 ) -> impl Stream<Item = Result<DB::Row, Error>>
189 where
190 Adapter: BackendDB<'c, DB> + 'c,
191 {
192 self.fetch_many(db_adapter)
193 .try_filter_map(|step| async move {
194 Ok(match step {
195 Either::Left(_) => None,
196 Either::Right(row) => Some(row),
197 })
198 })
199 }
200 #[inline]
206 #[allow(clippy::type_complexity)]
207 pub fn fetch_many<Adapter>(
208 &'q mut self,
209 db_adapter: Adapter,
210 ) -> BoxStream<'e, Result<Either<DB::QueryResult, DB::Row>, Error>>
211 where
212 Adapter: BackendDB<'c, DB> + 'c,
213 {
214 let template = self.template.clone();
215 let page_no = self.page_no;
216 let page_size = self.page_size;
217 Box::pin(async_stream::try_stream! {
218 let (db_type, executor) = db_adapter.backend_db().await?;
219 let f = db_type.get_encode_placeholder_fn();
220 let mut sql = String::new();
221 let mut arg = template.render_sql_with_encode_placeholder_fn(f, &mut sql)?;
222
223 if let (Some(page_no), Some(page_size)) = (page_no, page_size) {
224 let mut args = arg.unwrap_or_default();
225 db_type.write_page_sql(&mut sql, page_size, page_no, &mut args)?;
226 arg = Some(args);
227 }
228
229 self.sql = sql;
230 let execute = SqlTemplateExecute::new(&self.sql, arg).set_persistent(self.persistent);
231 let mut stream = execute.fetch_many(executor);
232 while let Some(item) = stream.try_next().await? {
233 yield item;
234 }
235 })
236 }
237
238 #[inline]
247 pub async fn fetch_all<Adapter>(
248 &'q mut self,
249 db_adapter: Adapter,
250 ) -> Result<Vec<DB::Row>, Error>
251 where
252 Adapter: BackendDB<'c, DB> + 'c,
253 {
254 self.fetch(db_adapter).try_collect().await
255 }
256 #[inline]
270 pub async fn fetch_one<Adapter>(&'q mut self, db_adapter: Adapter) -> Result<DB::Row, Error>
271 where
272 Adapter: BackendDB<'c, DB> + 'c,
273 {
274 self.fetch_optional(db_adapter)
275 .and_then(|row| match row {
276 Some(row) => future::ok(row),
277 None => future::err(Error::RowNotFound),
278 })
279 .await
280 }
281 #[inline]
295 pub async fn fetch_optional<Adapter>(
296 &'q mut self,
297
298 db_adapter: Adapter,
299 ) -> Result<Option<DB::Row>, Error>
300 where
301 Adapter: BackendDB<'c, DB> + 'c,
302 {
303 let row = self.fetch_many(db_adapter).try_next().await?;
304 match row {
305 Some(Either::Right(row)) => Ok(Some(row)),
306 Some(Either::Left(_)) => Ok(None),
307 None => Ok(None),
308 }
309 }
310
311 pub async fn fetch_as<Adapter, O>(
314 &'q mut self,
315
316 db_adapter: Adapter,
317 ) -> impl Stream<Item = Result<O, Error>>
318 where
319 Adapter: BackendDB<'c, DB> + 'c,
320 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
321 {
322 self.fetch_many_as(db_adapter)
323 .try_filter_map(|step| async move { Ok(step.right()) })
324 }
325 pub fn fetch_many_as<Adapter, O>(
329 &'q mut self,
330 db_adapter: Adapter,
331 ) -> BoxStream<'e, Result<Either<DB::QueryResult, O>, Error>>
332 where
333 Adapter: BackendDB<'c, DB> + 'c,
334 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
335 {
336 let template = self.template.clone();
337 let page_no = self.page_no;
338 let page_size = self.page_size;
339 Box::pin(async_stream::try_stream! {
340 let (db_type, executor) = db_adapter.backend_db().await?;
341 let f = db_type.get_encode_placeholder_fn();
342 let mut sql = String::new();
343 let mut arg = template.render_sql_with_encode_placeholder_fn(f, &mut sql)?;
344
345 if let (Some(page_no), Some(page_size)) = (page_no, page_size) {
346 let mut args = arg.unwrap_or_default();
347 db_type.write_page_sql(&mut sql, page_size, page_no, &mut args)?;
348 arg = Some(args);
349 }
350
351 self.sql = sql;
352 let execute = SqlTemplateExecute::new(&self.sql, arg).set_persistent(self.persistent);
353 let mut stream = execute.fetch_many(executor).map(|v| match v {
354 Ok(Either::Right(row)) => O::from_row(&row).map(Either::Right),
355 Ok(Either::Left(v)) => Ok(Either::Left(v)),
356 Err(e) => Err(e),
357 });
358 while let Some(item) = stream.try_next().await? {
359 yield item;
360 }
361 })
362 }
363 #[inline]
372 pub async fn fetch_all_as<Adapter, O>(
373 &'q mut self,
374 db_adapter: Adapter,
375 ) -> Result<Vec<O>, Error>
376 where
377 Adapter: BackendDB<'c, DB> + 'c,
378 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
379 {
380 self.fetch_as(db_adapter).await.try_collect().await
381 }
382 pub async fn fetch_one_as<Adapter, O>(&'q mut self, db_adapter: Adapter) -> Result<O, Error>
396 where
397 Adapter: BackendDB<'c, DB> + 'c,
398 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
399 {
400 self.fetch_optional_as(db_adapter)
401 .and_then(|o| match o {
402 Some(o) => future::ok(o),
403 None => future::err(Error::RowNotFound),
404 })
405 .await
406 }
407 pub async fn fetch_optional_as<Adapter, O>(
421 &'q mut self,
422
423 db_adapter: Adapter,
424 ) -> Result<Option<O>, Error>
425 where
426 Adapter: BackendDB<'c, DB> + 'c,
427 O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
428 {
429 let obj = self.fetch_many_as(db_adapter).try_next().await?;
430 match obj {
431 Some(Either::Right(o)) => Ok(Some(o)),
432 Some(Either::Left(_)) => Ok(None),
433 None => Ok(None),
434 }
435 }
436}