1use std::marker::PhantomData;
2
3use askama::Result;
4use futures_core::stream::BoxStream;
5use futures_util::{StreamExt, TryStreamExt, stream};
6
7use crate::SqlTemplate;
8use sqlx_core::{
9 Either, Error, database::Database, encode::Encode, executor::Executor, from_row::FromRow,
10 types::Type,
11};
12
13use super::{DatabaseDialect, db_adapter::BackendDB, sql_template_execute::SqlTemplateExecute};
14
15#[derive(Debug)]
17pub struct PageInfo {
18 pub total: i64,
20 pub page_size: i64,
22 pub page_count: i64,
24}
25
26impl PageInfo {
27 pub fn new(total: i64, page_size: i64) -> PageInfo {
33 let mut page_count = total / page_size;
34 if total % page_size > 0 {
35 page_count += 1;
36 }
37 Self {
38 total,
39 page_size,
40 page_count,
41 }
42 }
43}
44pub struct DBAdapterManager<'s, DB, T>
51where
52 DB: Database,
53 T: SqlTemplate<'s, DB>,
54{
55 pub(crate) sql: String,
56 pub(crate) template: T,
57 persistent: bool,
58 _p: PhantomData<&'s DB>,
59 page_size: Option<i64>,
60 page_no: Option<i64>,
61}
62
63impl<'q, DB, T> DBAdapterManager<'q, DB, T>
64where
65 DB: Database,
66 T: SqlTemplate<'q, DB>,
67{
68 pub fn new(template: T) -> Self {
73 Self {
74 sql: String::new(),
75 template,
76 persistent: true,
77 page_no: None,
78 page_size: None,
79 _p: PhantomData,
80 }
81 }
82
83 pub fn sql(&self) -> &String {
84 &self.sql
85 }
86}
87impl<'q, 's, DB, T> DBAdapterManager<'s, DB, T>
88where
89 DB: Database,
90 T: SqlTemplate<'s, DB>,
91 i64: Encode<'q, DB> + Type<DB>,
92{
93 pub fn set_persistent(mut self, persistent: bool) -> Self {
95 self.persistent = persistent;
96 self
97 }
98 #[inline]
103 pub async fn count<'c, Adapter>(mut self, db_adapter: Adapter) -> Result<i64, Error>
104 where
105 Adapter: BackendDB<'c, DB>,
106 (i64,): for<'r> FromRow<'r, DB::Row>,
107 {
108 let (mut sql, arg, db_type, executor) = Self::render_sql_with_adapter(
109 self.template.clone(),
110 db_adapter,
111 self.page_no,
112 self.page_size,
113 )
114 .await?;
115
116 db_type.write_count_sql(&mut sql);
117 self.sql = sql;
118 let execute = SqlTemplateExecute::new(self.sql, arg).set_persistent(self.persistent);
119 let (count,): (i64,) = execute.fetch_one_as(executor).await?;
120 Ok(count)
121 }
122 #[inline]
128 pub async fn count_page<'c, Adapter>(
129 self,
130
131 page_size: i64,
132 db_adapter: Adapter,
133 ) -> Result<PageInfo, Error>
134 where
135 Adapter: BackendDB<'c, DB>,
136 (i64,): for<'r> FromRow<'r, DB::Row>,
137 {
138 let count = self.count(db_adapter).await?;
139
140 Ok(PageInfo::new(count, page_size))
141 }
142 pub fn set_page(mut self, page_size: i64, page_no: i64) -> Self {
144 self.page_no = Some(page_no);
145 self.page_size = Some(page_size);
146 self
147 }
148 #[inline]
150 pub async fn render_sql_with_adapter<'c, Adapter>(
151 template: T,
152
153 db_adapter: Adapter,
154 page_no: Option<i64>,
155 page_size: Option<i64>,
156 ) -> Result<
157 (
158 String,
159 Option<DB::Arguments>,
160 impl DatabaseDialect,
161 impl Executor<'c, Database = DB>,
162 ),
163 Error,
164 >
165 where
166 Adapter: BackendDB<'c, DB>,
167 {
168 let (db_type, executor) = db_adapter.backend_db().await?;
169 let f = db_type.get_encode_placeholder_fn();
170 let mut sql = String::new();
171 let mut arg = template.render_sql_with_encode_placeholder_fn(f, &mut sql)?;
172
173 if let (Some(page_no), Some(page_size)) = (page_no, page_size) {
174 let mut args = arg.unwrap_or_default();
175 db_type.write_page_sql(&mut sql, page_size, page_no, &mut args)?;
176 arg = Some(args);
177 }
178 Ok((sql, arg, db_type, executor))
179 }
180
181 #[inline]
184 pub async fn execute<'c, Adapter>(self, db_adapter: Adapter) -> Result<DB::QueryResult, Error>
185 where
186 Adapter: BackendDB<'c, DB>,
187 {
188 self.execute_many(db_adapter).await.try_collect().await
189 }
190 #[inline]
193 pub async fn execute_many<'c, 'e, Adapter>(
194 self,
195
196 db_adapter: Adapter,
197 ) -> BoxStream<'e, Result<DB::QueryResult, Error>>
198 where
199 'c: 'e,
200 'q: 'e,
201 Adapter: BackendDB<'c, DB>,
202 {
203 self.fetch_many(db_adapter)
204 .await
205 .try_filter_map(|step| async move {
206 Ok(match step {
207 Either::Left(rows) => Some(rows),
208 Either::Right(_) => None,
209 })
210 })
211 .boxed()
212 }
213 #[inline]
216 pub async fn fetch<'c, 'e, Adapter>(
217 self,
218
219 db_adapter: Adapter,
220 ) -> BoxStream<'e, Result<DB::Row, Error>>
221 where
222 'c: 'e,
223 'q: 'e,
224 Adapter: BackendDB<'c, DB>,
225 {
226 self.fetch_many(db_adapter)
227 .await
228 .try_filter_map(|step| async move {
229 Ok(match step {
230 Either::Left(_) => None,
231 Either::Right(row) => Some(row),
232 })
233 })
234 .boxed()
235 }
236 #[inline]
242 pub async fn fetch_many<'c, 'e, Adapter>(
243 mut self,
244 db_adapter: Adapter,
245 ) -> BoxStream<'e, Result<Either<DB::QueryResult, DB::Row>, Error>>
246 where
247 'c: 'e,
248 'q: 'e,
249 Adapter: BackendDB<'c, DB>,
250 {
251 let res = Self::render_sql_with_adapter(
252 self.template.clone(),
253 db_adapter,
254 self.page_no,
255 self.page_size,
256 )
257 .await;
258
259 match res {
260 Ok((sql, arg, _db_type, executor)) => {
261 self.sql = sql;
262 let execute =
263 SqlTemplateExecute::new(self.sql, arg).set_persistent(self.persistent);
264 executor.fetch_many(execute)
265 }
266 Err(e) => stream::once(async move { Err(e) }).boxed(),
267 }
268 }
269
270 #[inline]
279 pub async fn fetch_all<'c, Adapter>(self, db_adapter: Adapter) -> Result<Vec<DB::Row>, Error>
280 where
281 Adapter: BackendDB<'c, DB>,
282 {
283 self.fetch(db_adapter).await.try_collect().await
284 }
285 #[inline]
299 pub async fn fetch_one<'c, Adapter>(self, db_adapter: Adapter) -> Result<DB::Row, Error>
300 where
301 Adapter: BackendDB<'c, DB>,
302 {
303 self.fetch_optional(db_adapter)
304 .await
305 .and_then(|row| match row {
306 Some(row) => Ok(row),
307 None => Err(Error::RowNotFound),
308 })
309 }
310 #[inline]
324 pub async fn fetch_optional<'c, Adapter>(
325 self,
326
327 db_adapter: Adapter,
328 ) -> Result<Option<DB::Row>, Error>
329 where
330 Adapter: BackendDB<'c, DB>,
331 {
332 self.fetch(db_adapter).await.try_next().await
333 }
334
335 pub async fn fetch_as<'c, 'e, Adapter, O>(
338 self,
339
340 db_adapter: Adapter,
341 ) -> BoxStream<'e, Result<O, Error>>
342 where
343 'c: 'e,
344 'q: 'e,
345 Adapter: BackendDB<'c, DB>,
346 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
347 {
348 self.fetch_many_as(db_adapter)
349 .await
350 .try_filter_map(|step| async move { Ok(step.right()) })
351 .boxed()
352 }
353 pub async fn fetch_many_as<'c, 'e, Adapter, O>(
357 self,
358 db_adapter: Adapter,
359 ) -> BoxStream<'e, Result<Either<DB::QueryResult, O>, Error>>
360 where
361 'c: 'e,
362 'q: 'e,
363 O: Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'e,
364 Adapter: BackendDB<'c, DB>,
365 {
366 self.fetch_many(db_adapter)
367 .await
368 .map(|v| match v {
369 Ok(Either::Right(row)) => O::from_row(&row).map(Either::Right),
370 Ok(Either::Left(v)) => Ok(Either::Left(v)),
371 Err(e) => Err(e),
372 })
373 .boxed()
374 }
375 #[inline]
384 pub async fn fetch_all_as<'c, Adapter, O>(self, db_adapter: Adapter) -> Result<Vec<O>, Error>
385 where
386 Adapter: BackendDB<'c, DB>,
387 O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
388 {
389 self.fetch_as(db_adapter).await.try_collect().await
390 }
391 pub async fn fetch_one_as<'c, Adapter, O>(self, db_adapter: Adapter) -> Result<O, Error>
405 where
406 Adapter: BackendDB<'c, DB>,
407 O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
408 {
409 self.fetch_optional_as(db_adapter)
410 .await
411 .and_then(|row| row.ok_or(Error::RowNotFound))
412 }
413 pub async fn fetch_optional_as<'c, Adapter, O>(
427 self,
428
429 db_adapter: Adapter,
430 ) -> Result<Option<O>, Error>
431 where
432 Adapter: BackendDB<'c, DB>,
433 O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
434 {
435 let row = self.fetch_optional(db_adapter).await?;
436 if let Some(row) = row {
437 O::from_row(&row).map(Some)
438 } else {
439 Ok(None)
440 }
441 }
442}