1use std::marker::PhantomData;
2
3use either::Either;
4use futures_core::stream::BoxStream;
5use futures_util::{future, StreamExt, TryFutureExt, TryStreamExt};
6
7use crate::arguments::{Arguments, IntoArguments};
8use crate::database::{Database, HasArguments, HasStatement, HasStatementCache};
9use crate::encode::Encode;
10use crate::error::Error;
11use crate::executor::{Execute, Executor};
12use crate::statement::Statement;
13use crate::types::Type;
14
15#[must_use = "query must be executed to affect database"]
17pub struct Query<'q, DB: Database, A> {
18 pub(crate) statement: Either<&'q str, &'q <DB as HasStatement<'q>>::Statement>,
19 pub(crate) arguments: Option<A>,
20 pub(crate) database: PhantomData<DB>,
21 pub(crate) persistent: bool,
22}
23
24#[must_use = "query must be executed to affect database"]
34pub struct Map<'q, DB: Database, F, A> {
35 inner: Query<'q, DB, A>,
36 mapper: F,
37}
38
39impl<'q, DB, A> Execute<'q, DB> for Query<'q, DB, A>
40where
41 DB: Database,
42 A: Send + IntoArguments<'q, DB>,
43{
44 #[inline]
45 fn sql(&self) -> &'q str {
46 match self.statement {
47 Either::Right(ref statement) => statement.sql(),
48 Either::Left(sql) => sql,
49 }
50 }
51
52 fn statement(&self) -> Option<&<DB as HasStatement<'q>>::Statement> {
53 match self.statement {
54 Either::Right(ref statement) => Some(&statement),
55 Either::Left(_) => None,
56 }
57 }
58
59 #[inline]
60 fn take_arguments(&mut self) -> Option<<DB as HasArguments<'q>>::Arguments> {
61 self.arguments.take().map(IntoArguments::into_arguments)
62 }
63
64 #[inline]
65 fn persistent(&self) -> bool {
66 self.persistent
67 }
68}
69
70impl<'q, DB: Database> Query<'q, DB, <DB as HasArguments<'q>>::Arguments> {
71 pub fn bind<T: 'q + Send + Encode<'q, DB> + Type<DB>>(mut self, value: T) -> Self {
80 self.arguments
81 .get_or_insert_with(Default::default)
82 .add(value);
83 self
84 }
85}
86
87impl<'q, DB, A> Query<'q, DB, A>
88where
89 DB: Database + HasStatementCache,
90{
91 pub fn persistent(mut self, value: bool) -> Self {
100 self.persistent = value;
101 self
102 }
103}
104
105impl<'q, DB, A: Send> Query<'q, DB, A>
106where
107 DB: Database,
108 A: 'q + IntoArguments<'q, DB>,
109{
110 #[inline]
117 pub fn map<F, O>(
118 self,
119 mut f: F,
120 ) -> Map<'q, DB, impl FnMut(DB::Row) -> Result<O, Error> + Send, A>
121 where
122 F: FnMut(DB::Row) -> O + Send,
123 O: Unpin,
124 {
125 self.try_map(move |row| Ok(f(row)))
126 }
127
128 #[inline]
133 pub fn try_map<F, O>(self, f: F) -> Map<'q, DB, F, A>
134 where
135 F: FnMut(DB::Row) -> Result<O, Error> + Send,
136 O: Unpin,
137 {
138 Map {
139 inner: self,
140 mapper: f,
141 }
142 }
143
144 #[inline]
146 pub async fn execute<'e, 'c: 'e, E>(self, executor: E) -> Result<DB::QueryResult, Error>
147 where
148 'q: 'e,
149 A: 'e,
150 E: Executor<'c, Database = DB>,
151 {
152 executor.execute(self).await
153 }
154
155 #[inline]
157 pub async fn execute_many<'e, 'c: 'e, E>(
158 self,
159 executor: E,
160 ) -> BoxStream<'e, Result<DB::QueryResult, Error>>
161 where
162 'q: 'e,
163 A: 'e,
164 E: Executor<'c, Database = DB>,
165 {
166 executor.execute_many(self)
167 }
168
169 #[inline]
171 pub fn fetch<'e, 'c: 'e, E>(self, executor: E) -> BoxStream<'e, Result<DB::Row, Error>>
172 where
173 'q: 'e,
174 A: 'e,
175 E: Executor<'c, Database = DB>,
176 {
177 executor.fetch(self)
178 }
179
180 #[inline]
183 pub fn fetch_many<'e, 'c: 'e, E>(
184 self,
185 executor: E,
186 ) -> BoxStream<'e, Result<Either<DB::QueryResult, DB::Row>, Error>>
187 where
188 'q: 'e,
189 A: 'e,
190 E: Executor<'c, Database = DB>,
191 {
192 executor.fetch_many(self)
193 }
194
195 #[inline]
197 pub async fn fetch_all<'e, 'c: 'e, E>(self, executor: E) -> Result<Vec<DB::Row>, Error>
198 where
199 'q: 'e,
200 A: 'e,
201 E: Executor<'c, Database = DB>,
202 {
203 executor.fetch_all(self).await
204 }
205
206 #[inline]
208 pub async fn fetch_one<'e, 'c: 'e, E>(self, executor: E) -> Result<DB::Row, Error>
209 where
210 'q: 'e,
211 A: 'e,
212 E: Executor<'c, Database = DB>,
213 {
214 executor.fetch_one(self).await
215 }
216
217 #[inline]
219 pub async fn fetch_optional<'e, 'c: 'e, E>(self, executor: E) -> Result<Option<DB::Row>, Error>
220 where
221 'q: 'e,
222 A: 'e,
223 E: Executor<'c, Database = DB>,
224 {
225 executor.fetch_optional(self).await
226 }
227}
228
229impl<'q, DB, F: Send, A: Send> Execute<'q, DB> for Map<'q, DB, F, A>
230where
231 DB: Database,
232 A: IntoArguments<'q, DB>,
233{
234 #[inline]
235 fn sql(&self) -> &'q str {
236 self.inner.sql()
237 }
238
239 #[inline]
240 fn statement(&self) -> Option<&<DB as HasStatement<'q>>::Statement> {
241 self.inner.statement()
242 }
243
244 #[inline]
245 fn take_arguments(&mut self) -> Option<<DB as HasArguments<'q>>::Arguments> {
246 self.inner.take_arguments()
247 }
248
249 #[inline]
250 fn persistent(&self) -> bool {
251 self.inner.arguments.is_some()
252 }
253}
254
255impl<'q, DB, F, O, A> Map<'q, DB, F, A>
256where
257 DB: Database,
258 F: FnMut(DB::Row) -> Result<O, Error> + Send,
259 O: Send + Unpin,
260 A: 'q + Send + IntoArguments<'q, DB>,
261{
262 #[inline]
269 pub fn map<G, P>(
270 self,
271 mut g: G,
272 ) -> Map<'q, DB, impl FnMut(DB::Row) -> Result<P, Error> + Send, A>
273 where
274 G: FnMut(O) -> P + Send,
275 P: Unpin,
276 {
277 self.try_map(move |data| Ok(g(data)))
278 }
279
280 #[inline]
285 pub fn try_map<G, P>(
286 self,
287 mut g: G,
288 ) -> Map<'q, DB, impl FnMut(DB::Row) -> Result<P, Error> + Send, A>
289 where
290 G: FnMut(O) -> Result<P, Error> + Send,
291 P: Unpin,
292 {
293 let mut f = self.mapper;
294 Map {
295 inner: self.inner,
296 mapper: move |row| f(row).and_then(|o| g(o)),
297 }
298 }
299
300 pub fn fetch<'e, 'c: 'e, E>(self, executor: E) -> BoxStream<'e, Result<O, Error>>
302 where
303 'q: 'e,
304 E: 'e + Executor<'c, Database = DB>,
305 DB: 'e,
306 F: 'e,
307 O: 'e,
308 {
309 self.fetch_many(executor)
310 .try_filter_map(|step| async move {
311 Ok(match step {
312 Either::Left(_) => None,
313 Either::Right(o) => Some(o),
314 })
315 })
316 .boxed()
317 }
318
319 pub fn fetch_many<'e, 'c: 'e, E>(
322 mut self,
323 executor: E,
324 ) -> BoxStream<'e, Result<Either<DB::QueryResult, O>, Error>>
325 where
326 'q: 'e,
327 E: 'e + Executor<'c, Database = DB>,
328 DB: 'e,
329 F: 'e,
330 O: 'e,
331 {
332 Box::pin(try_stream! {
333 let mut s = executor.fetch_many(self.inner);
334
335 while let Some(v) = s.try_next().await? {
336 r#yield!(match v {
337 Either::Left(v) => Either::Left(v),
338 Either::Right(row) => {
339 Either::Right((self.mapper)(row)?)
340 }
341 });
342 }
343
344 Ok(())
345 })
346 }
347
348 pub async fn fetch_all<'e, 'c: 'e, E>(self, executor: E) -> Result<Vec<O>, Error>
350 where
351 'q: 'e,
352 E: 'e + Executor<'c, Database = DB>,
353 DB: 'e,
354 F: 'e,
355 O: 'e,
356 {
357 self.fetch(executor).try_collect().await
358 }
359
360 pub async fn fetch_one<'e, 'c: 'e, E>(self, executor: E) -> Result<O, Error>
362 where
363 'q: 'e,
364 E: 'e + Executor<'c, Database = DB>,
365 DB: 'e,
366 F: 'e,
367 O: 'e,
368 {
369 self.fetch_optional(executor)
370 .and_then(|row| match row {
371 Some(row) => future::ok(row),
372 None => future::err(Error::RowNotFound),
373 })
374 .await
375 }
376
377 pub async fn fetch_optional<'e, 'c: 'e, E>(mut self, executor: E) -> Result<Option<O>, Error>
379 where
380 'q: 'e,
381 E: 'e + Executor<'c, Database = DB>,
382 DB: 'e,
383 F: 'e,
384 O: 'e,
385 {
386 let row = executor.fetch_optional(self.inner).await?;
387
388 if let Some(row) = row {
389 (self.mapper)(row).map(Some)
390 } else {
391 Ok(None)
392 }
393 }
394}
395
396pub(crate) fn query_statement<'q, DB>(
398 statement: &'q <DB as HasStatement<'q>>::Statement,
399) -> Query<'q, DB, <DB as HasArguments<'q>>::Arguments>
400where
401 DB: Database,
402{
403 Query {
404 database: PhantomData,
405 arguments: Some(Default::default()), statement: Either::Right(statement),
407 persistent: true,
408 }
409}
410
411pub(crate) fn query_statement_with<'q, DB, A>(
413 statement: &'q <DB as HasStatement<'q>>::Statement,
414 arguments: A,
415) -> Query<'q, DB, A>
416where
417 DB: Database,
418 A: IntoArguments<'q, DB>,
419{
420 Query {
421 database: PhantomData,
422 arguments: Some(arguments),
423 statement: Either::Right(statement),
424 persistent: true,
425 }
426}
427
428pub fn query<DB>(sql: &str) -> Query<'_, DB, <DB as HasArguments<'_>>::Arguments>
430where
431 DB: Database,
432{
433 Query {
434 database: PhantomData,
435 arguments: None,
436 statement: Either::Left(sql),
437 persistent: true,
438 }
439}
440
441pub fn query_with<'q, DB, A>(sql: &'q str, arguments: A) -> Query<'q, DB, A>
443where
444 DB: Database,
445 A: IntoArguments<'q, DB>,
446{
447 Query {
448 database: PhantomData,
449 arguments: Some(arguments),
450 statement: Either::Left(sql),
451 persistent: true,
452 }
453}