1use std::marker::PhantomData;
2
3use either::Either;
4use futures_core::stream::BoxStream;
5use futures_util::{StreamExt, TryStreamExt};
6
7use crate::arguments::IntoArguments;
8use crate::database::{Database, HasArguments, HasStatement, HasStatementCache};
9use crate::encode::Encode;
10use crate::error::Error;
11use crate::executor::{Execute, Executor};
12use crate::from_row::FromRow;
13use crate::query::{query, query_statement, query_statement_with, query_with, Query};
14use crate::types::Type;
15
16#[must_use = "query must be executed to affect database"]
19pub struct QueryAs<'q, DB: Database, O, A> {
20 pub(crate) inner: Query<'q, DB, A>,
21 pub(crate) output: PhantomData<O>,
22}
23
24impl<'q, DB: Database, O: Send, A: Send> Execute<'q, DB> for QueryAs<'q, DB, O, A>
25where
26 DB: Database,
27 A: 'q + IntoArguments<'q, DB>,
28{
29 #[inline]
30 fn sql(&self) -> &'q str {
31 self.inner.sql()
32 }
33
34 #[inline]
35 fn statement(&self) -> Option<&<DB as HasStatement<'q>>::Statement> {
36 self.inner.statement()
37 }
38
39 #[inline]
40 fn take_arguments(&mut self) -> Option<<DB as HasArguments<'q>>::Arguments> {
41 self.inner.take_arguments()
42 }
43
44 #[inline]
45 fn persistent(&self) -> bool {
46 self.inner.persistent()
47 }
48}
49
50impl<'q, DB: Database, O> QueryAs<'q, DB, O, <DB as HasArguments<'q>>::Arguments> {
51 pub fn bind<T: 'q + Send + Encode<'q, DB> + Type<DB>>(mut self, value: T) -> Self {
55 self.inner = self.inner.bind(value);
56 self
57 }
58}
59
60impl<'q, DB, O, A> QueryAs<'q, DB, O, A>
61where
62 DB: Database + HasStatementCache,
63{
64 pub fn persistent(mut self, value: bool) -> Self {
73 self.inner = self.inner.persistent(value);
74 self
75 }
76}
77
78impl<'q, DB, O, A> QueryAs<'q, DB, O, A>
81where
82 DB: Database,
83 A: 'q + IntoArguments<'q, DB>,
84 O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
85{
86 pub fn fetch<'e, 'c: 'e, E>(self, executor: E) -> BoxStream<'e, Result<O, Error>>
88 where
89 'q: 'e,
90 E: 'e + Executor<'c, Database = DB>,
91 DB: 'e,
92 O: 'e,
93 A: 'e,
94 {
95 self.fetch_many(executor)
96 .try_filter_map(|step| async move { Ok(step.right()) })
97 .boxed()
98 }
99
100 pub fn fetch_many<'e, 'c: 'e, E>(
103 self,
104 executor: E,
105 ) -> BoxStream<'e, Result<Either<DB::QueryResult, O>, Error>>
106 where
107 'q: 'e,
108 E: 'e + Executor<'c, Database = DB>,
109 DB: 'e,
110 O: 'e,
111 A: 'e,
112 {
113 executor
114 .fetch_many(self.inner)
115 .map(|v| match v {
116 Ok(Either::Right(row)) => O::from_row(&row).map(Either::Right),
117 Ok(Either::Left(v)) => Ok(Either::Left(v)),
118 Err(e) => Err(e),
119 })
120 .boxed()
121 }
122
123 #[inline]
125 pub async fn fetch_all<'e, 'c: 'e, E>(self, executor: E) -> Result<Vec<O>, Error>
126 where
127 'q: 'e,
128 E: 'e + Executor<'c, Database = DB>,
129 DB: 'e,
130 O: 'e,
131 A: 'e,
132 {
133 self.fetch(executor).try_collect().await
134 }
135
136 pub async fn fetch_one<'e, 'c: 'e, E>(self, executor: E) -> Result<O, Error>
138 where
139 'q: 'e,
140 E: 'e + Executor<'c, Database = DB>,
141 DB: 'e,
142 O: 'e,
143 A: 'e,
144 {
145 self.fetch_optional(executor)
146 .await
147 .and_then(|row| row.ok_or(Error::RowNotFound))
148 }
149
150 pub async fn fetch_optional<'e, 'c: 'e, E>(self, executor: E) -> Result<Option<O>, Error>
152 where
153 'q: 'e,
154 E: 'e + Executor<'c, Database = DB>,
155 DB: 'e,
156 O: 'e,
157 A: 'e,
158 {
159 let row = executor.fetch_optional(self.inner).await?;
160 if let Some(row) = row {
161 O::from_row(&row).map(Some)
162 } else {
163 Ok(None)
164 }
165 }
166}
167
168#[inline]
171pub fn query_as<'q, DB, O>(sql: &'q str) -> QueryAs<'q, DB, O, <DB as HasArguments<'q>>::Arguments>
172where
173 DB: Database,
174 O: for<'r> FromRow<'r, DB::Row>,
175{
176 QueryAs {
177 inner: query(sql),
178 output: PhantomData,
179 }
180}
181
182#[inline]
185pub fn query_as_with<'q, DB, O, A>(sql: &'q str, arguments: A) -> QueryAs<'q, DB, O, A>
186where
187 DB: Database,
188 A: IntoArguments<'q, DB>,
189 O: for<'r> FromRow<'r, DB::Row>,
190{
191 QueryAs {
192 inner: query_with(sql, arguments),
193 output: PhantomData,
194 }
195}
196
197pub(crate) fn query_statement_as<'q, DB, O>(
199 statement: &'q <DB as HasStatement<'q>>::Statement,
200) -> QueryAs<'q, DB, O, <DB as HasArguments<'q>>::Arguments>
201where
202 DB: Database,
203 O: for<'r> FromRow<'r, DB::Row>,
204{
205 QueryAs {
206 inner: query_statement(statement),
207 output: PhantomData,
208 }
209}
210
211pub(crate) fn query_statement_as_with<'q, DB, O, A>(
213 statement: &'q <DB as HasStatement<'q>>::Statement,
214 arguments: A,
215) -> QueryAs<'q, DB, O, A>
216where
217 DB: Database,
218 A: IntoArguments<'q, DB>,
219 O: for<'r> FromRow<'r, DB::Row>,
220{
221 QueryAs {
222 inner: query_statement_with(statement, arguments),
223 output: PhantomData,
224 }
225}