1use core::{
2 future::{Future, Ready, ready},
3 pin::Pin,
4 task::{Context, Poll},
5};
6
7use crate::{
8 BoxedFuture,
9 driver::codec::AsParams,
10 error::Error,
11 prepare::Prepare,
12 query::{Query, RowAffected, RowSimpleStream, RowStream, RowStreamGuarded, RowStreamOwned},
13 statement::{
14 Statement, StatementCreate, StatementGuarded, StatementNamed, StatementPreparedQuery,
15 StatementPreparedQueryOwned, StatementQuery, StatementSingleRTTQuery,
16 },
17};
18
19use super::Execute;
20
21impl<'s, C> Execute<&C> for &'s Statement
22where
23 C: Query,
24{
25 type ExecuteOutput = ResultFuture<RowAffected>;
26 type QueryOutput = Ready<Result<RowStream<'s>, Error>>;
27
28 #[inline]
29 fn execute(self, cli: &C) -> Self::ExecuteOutput {
30 self.bind_none().execute(cli)
31 }
32
33 #[inline]
34 fn query(self, cli: &C) -> Self::QueryOutput {
35 self.bind_none().query(cli)
36 }
37}
38
39impl<C> Execute<&C> for &str
40where
41 C: Query,
42{
43 type ExecuteOutput = ResultFuture<RowAffected>;
44 type QueryOutput = Ready<Result<RowSimpleStream, Error>>;
45
46 #[inline]
47 fn execute(self, cli: &C) -> Self::ExecuteOutput {
48 cli._query(self).map(RowAffected::from).into()
49 }
50
51 #[inline]
52 fn query(self, cli: &C) -> Self::QueryOutput {
53 ready(cli._query(self))
54 }
55}
56
57type IntoGuardedFuture<'c, C> = IntoGuarded<'c, BoxedFuture<'c, Result<Statement, Error>>, C>;
58
59pub struct IntoGuarded<'a, F, C> {
60 fut: F,
61 cli: &'a C,
62}
63
64impl<'a, F, C> Future for IntoGuarded<'a, F, C>
65where
66 F: Future<Output = Result<Statement, Error>> + Unpin,
67 C: Query,
68{
69 type Output = Result<StatementGuarded<'a, C>, Error>;
70
71 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
72 let this = self.get_mut();
73 Pin::new(&mut this.fut)
74 .poll(cx)
75 .map_ok(|stmt| stmt.into_guarded(this.cli))
76 }
77}
78
79impl<'c, C> Execute<&'c C> for StatementNamed<'_>
80where
81 C: Prepare,
82{
83 type ExecuteOutput = ResultFuture<IntoGuardedFuture<'c, C>>;
84 type QueryOutput = Self::ExecuteOutput;
85
86 #[inline]
87 fn execute(self, cli: &'c C) -> Self::ExecuteOutput {
88 cli._query(StatementCreate::from((self, cli)))
89 .map(|fut| IntoGuarded { fut, cli })
90 .into()
91 }
92
93 #[inline]
94 fn query(self, cli: &'c C) -> Self::QueryOutput {
95 self.execute(cli)
96 }
97}
98
99impl<'s, C, P> Execute<&C> for StatementPreparedQuery<'s, P>
100where
101 C: Query,
102 P: AsParams,
103{
104 type ExecuteOutput = ResultFuture<RowAffected>;
105 type QueryOutput = Ready<Result<RowStream<'s>, Error>>;
106
107 #[inline]
108 fn execute(self, cli: &C) -> Self::ExecuteOutput {
109 cli._query(self).map(RowAffected::from).into()
110 }
111
112 #[inline]
113 fn query(self, cli: &C) -> Self::QueryOutput {
114 ready(cli._query(self))
115 }
116}
117
118impl<'s, C, P> Execute<&C> for StatementPreparedQueryOwned<'s, P>
119where
120 C: Query,
121 P: AsParams,
122{
123 type ExecuteOutput = ResultFuture<RowAffected>;
124 type QueryOutput = Ready<Result<RowStreamOwned, Error>>;
125
126 #[inline]
127 fn execute(self, cli: &C) -> Self::ExecuteOutput {
128 cli._query(self).map(RowAffected::from).into()
129 }
130
131 #[inline]
132 fn query(self, cli: &C) -> Self::QueryOutput {
133 ready(cli._query(self))
134 }
135}
136
137impl<'c, C, P> Execute<&'c C> for StatementQuery<'_, P>
138where
139 C: Prepare,
140 P: AsParams,
141{
142 type ExecuteOutput = ResultFuture<RowAffected>;
143 type QueryOutput = Ready<Result<RowStreamGuarded<'c, C>, Error>>;
144
145 #[inline]
146 fn execute(self, cli: &C) -> Self::ExecuteOutput {
147 self.into_single_rtt().execute(cli)
148 }
149
150 #[inline]
151 fn query(self, cli: &'c C) -> Self::QueryOutput {
152 self.into_single_rtt().query(cli)
153 }
154}
155
156impl<'c, C, P> Execute<&'c C> for StatementSingleRTTQuery<'_, P>
157where
158 C: Prepare,
159 P: AsParams,
160{
161 type ExecuteOutput = ResultFuture<RowAffected>;
162 type QueryOutput = Ready<Result<RowStreamGuarded<'c, C>, Error>>;
163
164 #[inline]
165 fn execute(self, cli: &C) -> Self::ExecuteOutput {
166 cli._query(self.into_with_cli(cli)).map(RowAffected::from).into()
167 }
168
169 #[inline]
170 fn query(self, cli: &'c C) -> Self::QueryOutput {
171 ready(cli._query(self.into_with_cli(cli)))
172 }
173}
174
175#[cfg(not(feature = "nightly"))]
176impl<'c, C> Execute<&'c C> for &std::path::Path
177where
178 C: Query + Sync,
179{
180 type ExecuteOutput = BoxedFuture<'c, Result<u64, Error>>;
181 type QueryOutput = BoxedFuture<'c, Result<RowSimpleStream, Error>>;
182
183 #[inline]
184 fn execute(self, cli: &'c C) -> Self::ExecuteOutput {
185 let path = self.to_path_buf();
186 Box::pin(async move {
187 tokio::task::spawn_blocking(|| std::fs::read_to_string(path))
188 .await
189 .unwrap()?
190 .execute(cli)
191 .await
192 })
193 }
194
195 #[inline]
196 fn query(self, cli: &'c C) -> Self::QueryOutput {
197 let path = self.to_path_buf();
198 Box::pin(async move {
199 tokio::task::spawn_blocking(|| std::fs::read_to_string(path))
200 .await
201 .unwrap()?
202 .query(cli)
203 .await
204 })
205 }
206}
207
208#[cfg(feature = "nightly")]
209impl<'c, C> Execute<&'c C> for &std::path::Path
210where
211 C: Query + Sync,
212{
213 type ExecuteOutput = impl Future<Output = Result<u64, Error>> + Send + 'c;
214 type QueryOutput = impl Future<Output = Result<RowSimpleStream, Error>> + Send + 'c;
215
216 #[inline]
217 fn execute(self, cli: &'c C) -> Self::ExecuteOutput {
218 let path = self.to_path_buf();
219 async move {
220 tokio::task::spawn_blocking(|| std::fs::read_to_string(path))
221 .await
222 .unwrap()?
223 .execute(cli)
224 .await
225 }
226 }
227
228 #[inline]
229 fn query(self, cli: &'c C) -> Self::QueryOutput {
230 let path = self.to_path_buf();
231 async move {
232 tokio::task::spawn_blocking(|| std::fs::read_to_string(path))
233 .await
234 .unwrap()?
235 .query(cli)
236 .await
237 }
238 }
239}
240
241pub struct ResultFuture<F>(Result<F, Option<Error>>);
242
243impl<F> From<Result<F, Error>> for ResultFuture<F> {
244 fn from(res: Result<F, Error>) -> Self {
245 Self(res.map_err(Some))
246 }
247}
248
249impl<F, T> Future for ResultFuture<F>
250where
251 F: Future<Output = Result<T, Error>> + Unpin,
252{
253 type Output = F::Output;
254
255 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
256 match self.get_mut().0 {
257 Ok(ref mut res) => Pin::new(res).poll(cx),
258 Err(ref mut e) => Poll::Ready(Err(e.take().unwrap())),
259 }
260 }
261}