1use std::marker::PhantomData;
2
3use askama::Result;
4use futures_core::stream::BoxStream;
5use futures_util::{StreamExt, TryStreamExt, stream};
6
7use crate::SqlTemplate;
8use sqlx_core::{
9 Either, Error, database::Database, encode::Encode, executor::Executor, from_row::FromRow,
10 types::Type,
11};
12
13use super::{DatabaseDialect, db_adapter::BackendDB, sql_templte_execute::SqlTemplateExecute};
14
15#[derive(Debug)]
17pub struct PageInfo {
18 pub total: i64,
20 pub page_size: i64,
22 pub page_count: i64,
24}
25
26impl PageInfo {
27 pub fn new(total: i64, page_size: i64) -> PageInfo {
33 let mut page_count = total / page_size;
34 if total % page_size > 0 {
35 page_count += 1;
36 }
37 Self {
38 total,
39 page_size,
40 page_count,
41 }
42 }
43}
44pub struct DBAdapterManager<'q, DB, T>
51where
52 DB: Database,
53 T: SqlTemplate<'q, DB>,
54{
55 pub(crate) sql: String,
56 pub(crate) template: T,
57 persistent: bool,
58 _p: PhantomData<&'q DB>,
59 page_size: Option<i64>,
60 page_no: Option<i64>,
61}
62
63impl<'q, DB, T> DBAdapterManager<'q, DB, T>
64where
65 DB: Database,
66 T: SqlTemplate<'q, DB>,
67{
68 pub fn new(template: T) -> Self {
73 Self {
74 sql: String::new(),
75 template,
76 persistent: true,
77 page_no: None,
78 page_size: None,
79 _p: PhantomData,
80 }
81 }
82
83 pub fn sql(&self) -> &String {
84 &self.sql
85 }
86}
87impl<'q, DB, T> DBAdapterManager<'q, DB, T>
88where
89 DB: Database,
90 T: SqlTemplate<'q, DB>,
91 i64: Encode<'q, DB> + Type<DB>,
92{
93 pub fn set_persistent(mut self, persistent: bool) -> Self {
95 self.persistent = persistent;
96 self
97 }
98 #[inline]
103 pub async fn count<'c, Adapter>(&'q mut self, db_adapter: Adapter) -> Result<i64, Error>
104 where
105 Adapter: BackendDB<'c, DB>,
106 (i64,): for<'r> FromRow<'r, DB::Row>,
107 {
108 let (mut sql, arg, db_type, executor) = Self::render_adapter_sql(
109 self.template.clone(),
110 db_adapter,
111 self.page_no,
112 self.page_size,
113 )
114 .await?;
115
116 db_type.write_count_sql(&mut sql);
117 self.sql = sql;
118 let execute = SqlTemplateExecute::new(&self.sql, arg).set_persistent(self.persistent);
119 let (count,): (i64,) = execute.fetch_one_as(executor).await?;
120 Ok(count)
121 }
122 #[inline]
128 pub async fn count_page<'c, Adapter>(
129 &'q mut self,
130
131 page_size: i64,
132 db_adapter: Adapter,
133 ) -> Result<PageInfo, Error>
134 where
135 Adapter: BackendDB<'c, DB>,
136 (i64,): for<'r> FromRow<'r, DB::Row>,
137 {
138 let count = self.count(db_adapter).await?;
139
140 Ok(PageInfo::new(count, page_size))
141 }
142 pub fn set_page(mut self, page_size: i64, page_no: i64) -> Self {
144 self.page_no = Some(page_no);
145 self.page_size = Some(page_size);
146 self
147 }
148 #[inline]
150 pub async fn render_adapter_sql<'c, Adapter>(
151 template: T,
152
153 db_adapter: Adapter,
154 page_no: Option<i64>,
155 page_size: Option<i64>,
156 ) -> Result<
157 (
158 String,
159 Option<DB::Arguments<'q>>,
160 impl DatabaseDialect,
161 impl Executor<'c, Database = DB>,
162 ),
163 Error,
164 >
165 where
166 Adapter: BackendDB<'c, DB>,
167 {
168 let (db_type, executor) = db_adapter.backend_db().await?;
169 let f = db_type.get_encode_placeholder_fn();
170 let mut sql = String::new();
171 let mut arg = template.render_sql_with_encode_placeholder_fn(f, &mut sql)?;
172
173 if let (Some(page_no), Some(page_size)) = (page_no, page_size) {
174 let mut args = arg.unwrap_or_default();
175 db_type.write_page_sql(&mut sql, page_size, page_no, &mut args)?;
176 arg = Some(args);
177 }
178 Ok((sql, arg, db_type, executor))
179 }
180
181 #[inline]
184 pub async fn execute<'c, Adapter>(
185 &'q mut self,
186 db_adapter: Adapter,
187 ) -> Result<DB::QueryResult, Error>
188 where
189 Adapter: BackendDB<'c, DB>,
190 {
191 self.execute_many(db_adapter).await.try_collect().await
192 }
193 #[inline]
196 pub async fn execute_many<'c, 'e, Adapter>(
197 &'q mut self,
198
199 db_adapter: Adapter,
200 ) -> BoxStream<'e, Result<DB::QueryResult, Error>>
201 where
202 'c: 'e,
203 'q: 'e,
204 Adapter: BackendDB<'c, DB>,
205 {
206 self.fetch_many(db_adapter)
207 .await
208 .try_filter_map(|step| async move {
209 Ok(match step {
210 Either::Left(rows) => Some(rows),
211 Either::Right(_) => None,
212 })
213 })
214 .boxed()
215 }
216 #[inline]
219 pub async fn fetch<'c, 'e, Adapter>(
220 &'q mut self,
221
222 db_adapter: Adapter,
223 ) -> BoxStream<'e, Result<DB::Row, Error>>
224 where
225 'c: 'e,
226 'q: 'e,
227 Adapter: BackendDB<'c, DB>,
228 {
229 self.fetch_many(db_adapter)
230 .await
231 .try_filter_map(|step| async move {
232 Ok(match step {
233 Either::Left(_) => None,
234 Either::Right(row) => Some(row),
235 })
236 })
237 .boxed()
238 }
239 #[inline]
245 #[allow(clippy::type_complexity)]
246 pub async fn fetch_many<'c, 'e, Adapter>(
247 &'q mut self,
248 db_adapter: Adapter,
249 ) -> BoxStream<'e, Result<Either<DB::QueryResult, DB::Row>, Error>>
250 where
251 'c: 'e,
252 'q: 'e,
253 Adapter: BackendDB<'c, DB>,
254 {
255 let res = Self::render_adapter_sql(
256 self.template.clone(),
257 db_adapter,
258 self.page_no,
259 self.page_size,
260 )
261 .await;
262
263 match res {
264 Ok((sql, arg, _db_type, executor)) => {
265 self.sql = sql;
266 let execute =
267 SqlTemplateExecute::new(&self.sql, arg).set_persistent(self.persistent);
268 execute.fetch_many(executor)
269 }
270 Err(e) => stream::once(async move { Err(e) }).boxed(),
271 }
272 }
273
274 #[inline]
283 pub async fn fetch_all<'c, Adapter>(
284 &'q mut self,
285 db_adapter: Adapter,
286 ) -> Result<Vec<DB::Row>, Error>
287 where
288 Adapter: BackendDB<'c, DB>,
289 {
290 self.fetch(db_adapter).await.try_collect().await
291 }
292 #[inline]
306 pub async fn fetch_one<'c, Adapter>(&'q mut self, db_adapter: Adapter) -> Result<DB::Row, Error>
307 where
308 Adapter: BackendDB<'c, DB>,
309 {
310 self.fetch_optional(db_adapter)
311 .await
312 .and_then(|row| match row {
313 Some(row) => Ok(row),
314 None => Err(Error::RowNotFound),
315 })
316 }
317 #[inline]
331 pub async fn fetch_optional<'c, Adapter>(
332 &'q mut self,
333
334 db_adapter: Adapter,
335 ) -> Result<Option<DB::Row>, Error>
336 where
337 Adapter: BackendDB<'c, DB>,
338 {
339 self.fetch(db_adapter).await.try_next().await
340 }
341
342 pub async fn fetch_as<'c, 'e, Adapter, O>(
345 &'q mut self,
346
347 db_adapter: Adapter,
348 ) -> BoxStream<'e, Result<O, Error>>
349 where
350 'c: 'e,
351 'q: 'e,
352 Adapter: BackendDB<'c, DB>,
353 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
354 {
355 self.fetch_many_as(db_adapter)
356 .await
357 .try_filter_map(|step| async move { Ok(step.right()) })
358 .boxed()
359 }
360 pub async fn fetch_many_as<'c, 'e, Adapter, O>(
364 &'q mut self,
365 db_adapter: Adapter,
366 ) -> BoxStream<'e, Result<Either<DB::QueryResult, O>, Error>>
367 where
368 'c: 'e,
369 'q: 'e,
370 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
371 Adapter: BackendDB<'c, DB>,
372 {
373 self.fetch_many(db_adapter)
374 .await
375 .map(|v| match v {
376 Ok(Either::Right(row)) => O::from_row(&row).map(Either::Right),
377 Ok(Either::Left(v)) => Ok(Either::Left(v)),
378 Err(e) => Err(e),
379 })
380 .boxed()
381 }
382 #[inline]
391 pub async fn fetch_all_as<'c, Adapter, O>(
392 &'q mut self,
393 db_adapter: Adapter,
394 ) -> Result<Vec<O>, Error>
395 where
396 Adapter: BackendDB<'c, DB>,
397 O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
398 {
399 self.fetch_as(db_adapter).await.try_collect().await
400 }
401 pub async fn fetch_one_as<'c, Adapter, O>(&'q mut self, db_adapter: Adapter) -> Result<O, Error>
415 where
416 Adapter: BackendDB<'c, DB>,
417 O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
418 {
419 self.fetch_optional_as(db_adapter)
420 .await
421 .and_then(|row| row.ok_or(Error::RowNotFound))
422 }
423 pub async fn fetch_optional_as<'c, Adapter, O>(
437 &'q mut self,
438
439 db_adapter: Adapter,
440 ) -> Result<Option<O>, Error>
441 where
442 Adapter: BackendDB<'c, DB>,
443 O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
444 {
445 let row = self.fetch_optional(db_adapter).await?;
446 if let Some(row) = row {
447 O::from_row(&row).map(Some)
448 } else {
449 Ok(None)
450 }
451 }
452}