1use std::marker::PhantomData;
2
3use either::Either;
4use futures_core::stream::BoxStream;
5use futures_util::{future, StreamExt, TryFutureExt, TryStreamExt};
6
7use crate::arguments::{Arguments, IntoArguments};
8use crate::database::{Database, HasArguments, HasStatement, HasStatementCache};
9use crate::encode::Encode;
10use crate::error::Error;
11use crate::executor::{Execute, Executor};
12use crate::statement::Statement;
13use crate::types::Type;
14
15#[must_use = "query must be executed to affect database"]
17pub struct Query<'q, DB: Database, A> {
18 pub(crate) statement: Either<&'q str, &'q <DB as HasStatement<'q>>::Statement>,
19 pub(crate) arguments: Option<A>,
20 pub(crate) database: PhantomData<DB>,
21 pub(crate) persistent: bool,
22}
23
24#[must_use = "query must be executed to affect database"]
34pub struct Map<'q, DB: Database, F, A> {
35 inner: Query<'q, DB, A>,
36 mapper: F,
37}
38
39impl<'q, DB, A> Execute<'q, DB> for Query<'q, DB, A>
40where
41 DB: Database,
42 A: Send + IntoArguments<'q, DB>,
43{
44 #[inline]
45 fn sql(&self) -> &'q str {
46 match self.statement {
47 Either::Right(ref statement) => statement.sql(),
48 Either::Left(sql) => sql,
49 }
50 }
51
52 fn statement(&self) -> Option<&<DB as HasStatement<'q>>::Statement> {
53 match self.statement {
54 Either::Right(ref statement) => Some(&statement),
55 Either::Left(_) => None,
56 }
57 }
58
59 #[inline]
60 fn take_arguments(&mut self) -> Option<<DB as HasArguments<'q>>::Arguments> {
61 self.arguments.take().map(IntoArguments::into_arguments)
62 }
63
64 #[inline]
65 fn persistent(&self) -> bool {
66 self.persistent
67 }
68}
69
70impl<'q, DB: Database> Query<'q, DB, <DB as HasArguments<'q>>::Arguments> {
71 pub fn bind<T: 'q + Send + Encode<'q, DB> + Type<DB>>(mut self, value: T) -> Self {
80 if let Some(arguments) = &mut self.arguments {
81 arguments.add(value);
82 }
83
84 self
85 }
86}
87
88impl<'q, DB, A> Query<'q, DB, A>
89where
90 DB: Database + HasStatementCache,
91{
92 pub fn persistent(mut self, value: bool) -> Self {
101 self.persistent = value;
102 self
103 }
104}
105
106impl<'q, DB, A: Send> Query<'q, DB, A>
107where
108 DB: Database,
109 A: 'q + IntoArguments<'q, DB>,
110{
111 #[inline]
118 pub fn map<F, O>(
119 self,
120 mut f: F,
121 ) -> Map<'q, DB, impl FnMut(DB::Row) -> Result<O, Error> + Send, A>
122 where
123 F: FnMut(DB::Row) -> O + Send,
124 O: Unpin,
125 {
126 self.try_map(move |row| Ok(f(row)))
127 }
128
129 #[inline]
134 pub fn try_map<F, O>(self, f: F) -> Map<'q, DB, F, A>
135 where
136 F: FnMut(DB::Row) -> Result<O, Error> + Send,
137 O: Unpin,
138 {
139 Map {
140 inner: self,
141 mapper: f,
142 }
143 }
144
145 #[inline]
147 pub async fn execute<'e, 'c: 'e, E>(self, executor: E) -> Result<DB::QueryResult, Error>
148 where
149 'q: 'e,
150 A: 'e,
151 E: Executor<'c, Database = DB>,
152 {
153 executor.execute(self).await
154 }
155
156 #[inline]
158 pub async fn execute_many<'e, 'c: 'e, E>(
159 self,
160 executor: E,
161 ) -> BoxStream<'e, Result<DB::QueryResult, Error>>
162 where
163 'q: 'e,
164 A: 'e,
165 E: Executor<'c, Database = DB>,
166 {
167 executor.execute_many(self)
168 }
169
170 #[inline]
172 pub fn fetch<'e, 'c: 'e, E>(self, executor: E) -> BoxStream<'e, Result<DB::Row, Error>>
173 where
174 'q: 'e,
175 A: 'e,
176 E: Executor<'c, Database = DB>,
177 {
178 executor.fetch(self)
179 }
180
181 #[inline]
184 pub fn fetch_many<'e, 'c: 'e, E>(
185 self,
186 executor: E,
187 ) -> BoxStream<'e, Result<Either<DB::QueryResult, DB::Row>, Error>>
188 where
189 'q: 'e,
190 A: 'e,
191 E: Executor<'c, Database = DB>,
192 {
193 executor.fetch_many(self)
194 }
195
196 #[inline]
198 pub async fn fetch_all<'e, 'c: 'e, E>(self, executor: E) -> Result<Vec<DB::Row>, Error>
199 where
200 'q: 'e,
201 A: 'e,
202 E: Executor<'c, Database = DB>,
203 {
204 executor.fetch_all(self).await
205 }
206
207 #[inline]
209 pub async fn fetch_one<'e, 'c: 'e, E>(self, executor: E) -> Result<DB::Row, Error>
210 where
211 'q: 'e,
212 A: 'e,
213 E: Executor<'c, Database = DB>,
214 {
215 executor.fetch_one(self).await
216 }
217
218 #[inline]
220 pub async fn fetch_optional<'e, 'c: 'e, E>(self, executor: E) -> Result<Option<DB::Row>, Error>
221 where
222 'q: 'e,
223 A: 'e,
224 E: Executor<'c, Database = DB>,
225 {
226 executor.fetch_optional(self).await
227 }
228}
229
230impl<'q, DB, F: Send, A: Send> Execute<'q, DB> for Map<'q, DB, F, A>
231where
232 DB: Database,
233 A: IntoArguments<'q, DB>,
234{
235 #[inline]
236 fn sql(&self) -> &'q str {
237 self.inner.sql()
238 }
239
240 #[inline]
241 fn statement(&self) -> Option<&<DB as HasStatement<'q>>::Statement> {
242 self.inner.statement()
243 }
244
245 #[inline]
246 fn take_arguments(&mut self) -> Option<<DB as HasArguments<'q>>::Arguments> {
247 self.inner.take_arguments()
248 }
249
250 #[inline]
251 fn persistent(&self) -> bool {
252 self.inner.arguments.is_some()
253 }
254}
255
256impl<'q, DB, F, O, A> Map<'q, DB, F, A>
257where
258 DB: Database,
259 F: FnMut(DB::Row) -> Result<O, Error> + Send,
260 O: Send + Unpin,
261 A: 'q + Send + IntoArguments<'q, DB>,
262{
263 #[inline]
270 pub fn map<G, P>(
271 self,
272 mut g: G,
273 ) -> Map<'q, DB, impl FnMut(DB::Row) -> Result<P, Error> + Send, A>
274 where
275 G: FnMut(O) -> P + Send,
276 P: Unpin,
277 {
278 self.try_map(move |data| Ok(g(data)))
279 }
280
281 #[inline]
286 pub fn try_map<G, P>(
287 self,
288 mut g: G,
289 ) -> Map<'q, DB, impl FnMut(DB::Row) -> Result<P, Error> + Send, A>
290 where
291 G: FnMut(O) -> Result<P, Error> + Send,
292 P: Unpin,
293 {
294 let mut f = self.mapper;
295 Map {
296 inner: self.inner,
297 mapper: move |row| f(row).and_then(|o| g(o)),
298 }
299 }
300
301 pub fn fetch<'e, 'c: 'e, E>(self, executor: E) -> BoxStream<'e, Result<O, Error>>
303 where
304 'q: 'e,
305 E: 'e + Executor<'c, Database = DB>,
306 DB: 'e,
307 F: 'e,
308 O: 'e,
309 {
310 self.fetch_many(executor)
311 .try_filter_map(|step| async move {
312 Ok(match step {
313 Either::Left(_) => None,
314 Either::Right(o) => Some(o),
315 })
316 })
317 .boxed()
318 }
319
320 pub fn fetch_many<'e, 'c: 'e, E>(
323 mut self,
324 executor: E,
325 ) -> BoxStream<'e, Result<Either<DB::QueryResult, O>, Error>>
326 where
327 'q: 'e,
328 E: 'e + Executor<'c, Database = DB>,
329 DB: 'e,
330 F: 'e,
331 O: 'e,
332 {
333 Box::pin(try_stream! {
334 let mut s = executor.fetch_many(self.inner);
335
336 while let Some(v) = s.try_next().await? {
337 r#yield!(match v {
338 Either::Left(v) => Either::Left(v),
339 Either::Right(row) => {
340 Either::Right((self.mapper)(row)?)
341 }
342 });
343 }
344
345 Ok(())
346 })
347 }
348
349 pub async fn fetch_all<'e, 'c: 'e, E>(self, executor: E) -> Result<Vec<O>, Error>
351 where
352 'q: 'e,
353 E: 'e + Executor<'c, Database = DB>,
354 DB: 'e,
355 F: 'e,
356 O: 'e,
357 {
358 self.fetch(executor).try_collect().await
359 }
360
361 pub async fn fetch_one<'e, 'c: 'e, E>(self, executor: E) -> Result<O, Error>
363 where
364 'q: 'e,
365 E: 'e + Executor<'c, Database = DB>,
366 DB: 'e,
367 F: 'e,
368 O: 'e,
369 {
370 self.fetch_optional(executor)
371 .and_then(|row| match row {
372 Some(row) => future::ok(row),
373 None => future::err(Error::RowNotFound),
374 })
375 .await
376 }
377
378 pub async fn fetch_optional<'e, 'c: 'e, E>(mut self, executor: E) -> Result<Option<O>, Error>
380 where
381 'q: 'e,
382 E: 'e + Executor<'c, Database = DB>,
383 DB: 'e,
384 F: 'e,
385 O: 'e,
386 {
387 let row = executor.fetch_optional(self.inner).await?;
388
389 if let Some(row) = row {
390 (self.mapper)(row).map(Some)
391 } else {
392 Ok(None)
393 }
394 }
395}
396
397pub(crate) fn query_statement<'q, DB>(
399 statement: &'q <DB as HasStatement<'q>>::Statement,
400) -> Query<'q, DB, <DB as HasArguments<'_>>::Arguments>
401where
402 DB: Database,
403{
404 Query {
405 database: PhantomData,
406 arguments: Some(Default::default()),
407 statement: Either::Right(statement),
408 persistent: true,
409 }
410}
411
412pub(crate) fn query_statement_with<'q, DB, A>(
414 statement: &'q <DB as HasStatement<'q>>::Statement,
415 arguments: A,
416) -> Query<'q, DB, A>
417where
418 DB: Database,
419 A: IntoArguments<'q, DB>,
420{
421 Query {
422 database: PhantomData,
423 arguments: Some(arguments),
424 statement: Either::Right(statement),
425 persistent: true,
426 }
427}
428
429pub fn query<DB>(sql: &str) -> Query<'_, DB, <DB as HasArguments<'_>>::Arguments>
431where
432 DB: Database,
433{
434 Query {
435 database: PhantomData,
436 arguments: Some(Default::default()),
437 statement: Either::Left(sql),
438 persistent: true,
439 }
440}
441
442pub fn query_with<'q, DB, A>(sql: &'q str, arguments: A) -> Query<'q, DB, A>
444where
445 DB: Database,
446 A: IntoArguments<'q, DB>,
447{
448 Query {
449 database: PhantomData,
450 arguments: Some(arguments),
451 statement: Either::Left(sql),
452 persistent: true,
453 }
454}