1use std::marker::PhantomData;
2
3use crate::SqlTemplate;
4use askama::Result;
5use futures_core::{Stream, future::BoxFuture, stream::BoxStream};
6
7use futures_util::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, future};
8use sqlx_core::{
9 Either, Error, database::Database, encode::Encode, from_row::FromRow, types::Type,
10};
11
12use crate::{DatabaseDialect, db_adapter::BackendDB, sql_template_execute::SqlTemplateExecute};
13
14#[derive(Debug, PartialEq, Eq)]
16pub struct PaginationInfo {
17 pub total: i64,
19 pub pagination_size: i64,
21 pub pagination_count: i64,
23}
24
25impl PaginationInfo {
26 pub fn new(total: i64, pagination_size: i64) -> PaginationInfo {
32 let mut pagination_count = total / pagination_size;
33 if total % pagination_size > 0 {
34 pagination_count += 1;
35 }
36 Self {
37 total,
38 pagination_size,
39 pagination_count,
40 }
41 }
42}
43pub struct DBAdapter<'q, DB, T>
50where
51 DB: Database,
52 T: SqlTemplate<'q, DB>,
53{
54 pub(crate) template: T,
55 persistent: bool,
56 _p: PhantomData<&'q DB>,
57 pagination_size: Option<i64>,
58 pagination_no: Option<i64>,
59}
60
61impl<'q, DB, T> DBAdapter<'q, DB, T>
62where
63 DB: Database,
64 T: SqlTemplate<'q, DB>,
65{
66 pub fn new(template: T) -> Self {
71 Self {
72 template,
73 persistent: true,
74 pagination_no: None,
75 pagination_size: None,
76 _p: PhantomData,
77 }
78 }
79}
80impl<'q, 'c, 'e, DB, T> DBAdapter<'q, DB, T>
81where
82 DB: Database + Sync,
83 T: SqlTemplate<'q, DB> + Send + 'q,
84 i64: Encode<'q, DB> + Type<DB>,
85 DB::Arguments: 'q,
86 'q: 'e,
87 'c: 'e,
88{
89 pub fn set_persistent(mut self, persistent: bool) -> Self {
91 self.persistent = persistent;
92 self
93 }
94 #[inline]
99 pub fn count<Adapter>(self, db_adapter: Adapter) -> BoxFuture<'e, Result<i64, Error>>
100 where
101 Adapter: BackendDB<'c, DB> + 'c,
102 (i64,): for<'r> FromRow<'r, DB::Row>,
103 {
104 let template = self.template.clone();
105
106 async move {
107 let (db_type, executor) = db_adapter.backend_db().await?;
108 let f = db_type.placeholder_fn();
109 let mut sql = String::new();
110 let arg = template.render_with_placeholder(f, &mut sql)?;
111
112 db_type.write_count_sql(&mut sql);
113 let execute = SqlTemplateExecute::new(sql, arg).set_persistent(self.persistent);
114 let (count,): (i64,) = execute.fetch_one_as(executor).await?;
115 Ok(count)
116 }
117 .boxed()
118 }
119 #[inline]
125 pub async fn pagination_info<Adapter>(
126 self,
127 pagination_size: i64,
128 db_adapter: Adapter,
129 ) -> Result<PaginationInfo, Error>
130 where
131 Adapter: BackendDB<'c, DB> + 'c,
132 (i64,): for<'r> FromRow<'r, DB::Row>,
133 {
134 let count = self.count(db_adapter).await?;
135 Ok(PaginationInfo::new(count, pagination_size))
136 }
137 pub fn set_pagination(mut self, pagination_size: i64, pagination_no: i64) -> Self {
139 self.pagination_no = Some(pagination_no);
140 self.pagination_size = Some(pagination_size);
141 self
142 }
143
144 #[inline]
147 pub async fn execute<Adapter>(self, db_adapter: Adapter) -> Result<DB::QueryResult, Error>
148 where
149 Adapter: BackendDB<'c, DB> + 'c,
150 {
151 self.execute_many(db_adapter).try_collect().await
152 }
153 #[inline]
156 pub fn execute_many<Adapter>(
157 self,
158
159 db_adapter: Adapter,
160 ) -> impl Stream<Item = Result<DB::QueryResult, Error>>
161 where
162 Adapter: BackendDB<'c, DB> + 'c,
163 {
164 self.fetch_many(db_adapter)
165 .try_filter_map(|step| async move {
166 Ok(match step {
167 Either::Left(rows) => Some(rows),
168 Either::Right(_) => None,
169 })
170 })
171 }
172 #[inline]
175 pub fn fetch<Adapter>(self, db_adapter: Adapter) -> impl Stream<Item = Result<DB::Row, Error>>
176 where
177 Adapter: BackendDB<'c, DB> + 'c,
178 {
179 self.fetch_many(db_adapter)
180 .try_filter_map(|step| async move {
181 Ok(match step {
182 Either::Left(_) => None,
183 Either::Right(row) => Some(row),
184 })
185 })
186 }
187 #[inline]
193 #[allow(clippy::type_complexity)]
194 pub fn fetch_many<Adapter>(
195 self,
196 db_adapter: Adapter,
197 ) -> BoxStream<'e, Result<Either<DB::QueryResult, DB::Row>, Error>>
198 where
199 Adapter: BackendDB<'c, DB> + 'c,
200 {
201 let template = self.template.clone();
202 let pagination_no = self.pagination_no;
203 let pagination_size = self.pagination_size;
204 Box::pin(async_stream::try_stream! {
205 let (db_type, executor) = db_adapter.backend_db().await?;
206 let f = db_type.placeholder_fn();
207 let mut sql = String::new();
208 let mut arg = template.render_with_placeholder(f, &mut sql)?;
209
210 if let (Some(pagination_no), Some(pagination_size)) = (pagination_no, pagination_size) {
211 let mut args = arg.unwrap_or_default();
212 db_type.write_pagination_sql(&mut sql, pagination_size, pagination_no, &mut args)?;
213 arg = Some(args);
214 }
215
216 let execute = SqlTemplateExecute::new(sql, arg).set_persistent(self.persistent);
217 let mut stream = execute.fetch_many(executor);
218 while let Some(item) = stream.try_next().await? {
219 yield item;
220 }
221 })
222 }
223
224 #[inline]
233 pub async fn fetch_all<Adapter>(self, db_adapter: Adapter) -> Result<Vec<DB::Row>, Error>
234 where
235 Adapter: BackendDB<'c, DB> + 'c,
236 {
237 self.fetch(db_adapter).try_collect().await
238 }
239 #[inline]
253 pub async fn fetch_one<Adapter>(self, db_adapter: Adapter) -> Result<DB::Row, Error>
254 where
255 Adapter: BackendDB<'c, DB> + 'c,
256 {
257 self.fetch_optional(db_adapter)
258 .and_then(|row| match row {
259 Some(row) => future::ok(row),
260 None => future::err(Error::RowNotFound),
261 })
262 .await
263 }
264 #[inline]
278 pub async fn fetch_optional<Adapter>(
279 self,
280
281 db_adapter: Adapter,
282 ) -> Result<Option<DB::Row>, Error>
283 where
284 Adapter: BackendDB<'c, DB> + 'c,
285 {
286 let row = self.fetch_many(db_adapter).try_next().await?;
287 match row {
288 Some(Either::Right(row)) => Ok(Some(row)),
289 Some(Either::Left(_)) => Ok(None),
290 None => Ok(None),
291 }
292 }
293
294 pub async fn fetch_as<Adapter, O>(
297 self,
298
299 db_adapter: Adapter,
300 ) -> impl Stream<Item = Result<O, Error>>
301 where
302 Adapter: BackendDB<'c, DB> + 'c,
303 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
304 {
305 self.fetch_many_as(db_adapter)
306 .try_filter_map(|step| async move { Ok(step.right()) })
307 }
308 pub fn fetch_many_as<Adapter, O>(
312 self,
313 db_adapter: Adapter,
314 ) -> BoxStream<'e, Result<Either<DB::QueryResult, O>, Error>>
315 where
316 'q: 'e,
317 Adapter: BackendDB<'c, DB> + 'c,
318 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
319 {
320 let template = self.template.clone();
321 let pagination_no = self.pagination_no;
322 let pagination_size = self.pagination_size;
323 Box::pin(async_stream::try_stream! {
324 let (db_type, executor) = db_adapter.backend_db().await?;
325 let f = db_type.placeholder_fn();
326 let mut sql = String::new();
327 let mut arg = template.render_with_placeholder(f, &mut sql)?;
328
329 if let (Some(pagination_no), Some(pagination_size)) = (pagination_no, pagination_size) {
330 let mut args = arg.unwrap_or_default();
331 db_type.write_pagination_sql(&mut sql, pagination_size, pagination_no, &mut args)?;
332 arg = Some(args);
333 }
334
335 let execute = SqlTemplateExecute::new(sql, arg).set_persistent(self.persistent);
336 let mut stream = execute.fetch_many(executor).map(|v| match v {
337 Ok(Either::Right(row)) => O::from_row(&row).map(Either::Right),
338 Ok(Either::Left(v)) => Ok(Either::Left(v)),
339 Err(e) => Err(e),
340 });
341 while let Some(item) = stream.try_next().await? {
342 yield item;
343 }
344 })
345 }
346 #[inline]
355 pub async fn fetch_all_as<Adapter, O>(self, db_adapter: Adapter) -> Result<Vec<O>, Error>
356 where
357 Adapter: BackendDB<'c, DB> + 'c,
358 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
359 {
360 self.fetch_as(db_adapter).await.try_collect().await
361 }
362 pub async fn fetch_one_as<Adapter, O>(self, db_adapter: Adapter) -> Result<O, Error>
376 where
377 Adapter: BackendDB<'c, DB> + 'c,
378 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
379 {
380 self.fetch_optional_as(db_adapter)
381 .and_then(|o| match o {
382 Some(o) => future::ok(o),
383 None => future::err(Error::RowNotFound),
384 })
385 .await
386 }
387 pub async fn fetch_optional_as<Adapter, O>(
401 self,
402
403 db_adapter: Adapter,
404 ) -> Result<Option<O>, Error>
405 where
406 Adapter: BackendDB<'c, DB> + 'c,
407 O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
408 {
409 let row = self.fetch_many_as(db_adapter).try_next().await?;
410 match row {
411 Some(Either::Right(o)) => Ok(Some(o)),
412 Some(Either::Left(_)) => Ok(None),
413 None => Ok(None),
414 }
415 }
416}