Skip to main content

xitca_postgres/execute/
async_impl.rs

1use core::{
2    future::{Future, Ready, ready},
3    pin::Pin,
4    task::{Context, Poll},
5};
6
7use crate::{
8    BoxedFuture,
9    client::{Client, ClientBorrow},
10    driver::codec::AsParams,
11    error::Error,
12    query::{RowAffected, RowSimpleStream, RowStream, RowStreamGuarded, RowStreamOwned},
13    statement::{
14        Statement, StatementCreate, StatementGuarded, StatementNamed, StatementPreparedQuery,
15        StatementPreparedQueryOwned, StatementQuery, StatementSingleRTTQuery,
16    },
17};
18
19use super::Execute;
20
21impl<'s> Execute<&Client> for &'s Statement {
22    type ExecuteOutput = ResultFuture<RowAffected>;
23    type QueryOutput = Ready<Result<RowStream<'s>, Error>>;
24
25    #[inline]
26    fn execute(self, cli: &Client) -> Self::ExecuteOutput {
27        self.bind_none().execute(cli)
28    }
29
30    #[inline]
31    fn query(self, cli: &Client) -> Self::QueryOutput {
32        self.bind_none().query(cli)
33    }
34}
35
36impl Execute<&Client> for &str {
37    type ExecuteOutput = ResultFuture<RowAffected>;
38    type QueryOutput = Ready<Result<RowSimpleStream, Error>>;
39
40    #[inline]
41    fn execute(self, cli: &Client) -> Self::ExecuteOutput {
42        cli.query(self).map(RowAffected::from).into()
43    }
44
45    #[inline]
46    fn query(self, cli: &Client) -> Self::QueryOutput {
47        ready(cli.query(self))
48    }
49}
50
51type IntoGuardedFuture<'c, C> = IntoGuarded<'c, BoxedFuture<'c, Result<Statement, Error>>, C>;
52
53pub struct IntoGuarded<'a, F, C> {
54    fut: F,
55    cli: &'a C,
56}
57
58impl<'a, F, C> Future for IntoGuarded<'a, F, C>
59where
60    C: ClientBorrow,
61    F: Future<Output = Result<Statement, Error>> + Unpin,
62{
63    type Output = Result<StatementGuarded<'a, C>, Error>;
64
65    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
66        let this = self.get_mut();
67        Pin::new(&mut this.fut)
68            .poll(cx)
69            .map_ok(|stmt| stmt.into_guarded(this.cli))
70    }
71}
72
73impl<'c> Execute<&'c Client> for StatementNamed<'_> {
74    type ExecuteOutput = ResultFuture<IntoGuardedFuture<'c, Client>>;
75    type QueryOutput = Self::ExecuteOutput;
76
77    #[inline]
78    fn execute(self, cli: &'c Client) -> Self::ExecuteOutput {
79        cli.query(StatementCreate::from((self, cli)))
80            .map(|fut| IntoGuarded { fut, cli })
81            .into()
82    }
83
84    #[inline]
85    fn query(self, cli: &'c Client) -> Self::QueryOutput {
86        self.execute(cli)
87    }
88}
89
90impl<'s, P> Execute<&Client> for StatementPreparedQuery<'s, P>
91where
92    P: AsParams,
93{
94    type ExecuteOutput = ResultFuture<RowAffected>;
95    type QueryOutput = Ready<Result<RowStream<'s>, Error>>;
96
97    #[inline]
98    fn execute(self, cli: &Client) -> Self::ExecuteOutput {
99        cli.query(self).map(RowAffected::from).into()
100    }
101
102    #[inline]
103    fn query(self, cli: &Client) -> Self::QueryOutput {
104        ready(cli.query(self))
105    }
106}
107
108impl<'s, P> Execute<&Client> for StatementPreparedQueryOwned<'s, P>
109where
110    P: AsParams,
111{
112    type ExecuteOutput = ResultFuture<RowAffected>;
113    type QueryOutput = Ready<Result<RowStreamOwned, Error>>;
114
115    #[inline]
116    fn execute(self, cli: &Client) -> Self::ExecuteOutput {
117        cli.query(self).map(RowAffected::from).into()
118    }
119
120    #[inline]
121    fn query(self, cli: &Client) -> Self::QueryOutput {
122        ready(cli.query(self))
123    }
124}
125
126impl<'c, P> Execute<&'c Client> for StatementQuery<'_, P>
127where
128    P: AsParams,
129{
130    type ExecuteOutput = ResultFuture<RowAffected>;
131    type QueryOutput = Ready<Result<RowStreamGuarded<'c, Client>, Error>>;
132
133    #[inline]
134    fn execute(self, cli: &Client) -> Self::ExecuteOutput {
135        self.into_single_rtt().execute(cli)
136    }
137
138    #[inline]
139    fn query(self, cli: &'c Client) -> Self::QueryOutput {
140        self.into_single_rtt().query(cli)
141    }
142}
143
144impl<'c, P> Execute<&'c Client> for StatementSingleRTTQuery<'_, P>
145where
146    P: AsParams,
147{
148    type ExecuteOutput = ResultFuture<RowAffected>;
149    type QueryOutput = Ready<Result<RowStreamGuarded<'c, Client>, Error>>;
150
151    #[inline]
152    fn execute(self, cli: &Client) -> Self::ExecuteOutput {
153        cli.query(self.into_with_cli(cli)).map(RowAffected::from).into()
154    }
155
156    #[inline]
157    fn query(self, cli: &'c Client) -> Self::QueryOutput {
158        ready(cli.query(self.into_with_cli(cli)))
159    }
160}
161
162#[cfg(not(feature = "nightly"))]
163impl<'c> Execute<&'c Client> for &std::path::Path {
164    type ExecuteOutput = BoxedFuture<'c, Result<u64, Error>>;
165    type QueryOutput = BoxedFuture<'c, Result<RowSimpleStream, Error>>;
166
167    #[inline]
168    fn execute(self, cli: &'c Client) -> Self::ExecuteOutput {
169        let path = self.to_path_buf();
170        Box::pin(async move {
171            tokio::task::spawn_blocking(|| std::fs::read_to_string(path))
172                .await
173                .unwrap()?
174                .execute(cli)
175                .await
176        })
177    }
178
179    #[inline]
180    fn query(self, cli: &'c Client) -> Self::QueryOutput {
181        let path = self.to_path_buf();
182        Box::pin(async move {
183            tokio::task::spawn_blocking(|| std::fs::read_to_string(path))
184                .await
185                .unwrap()?
186                .query(cli)
187                .await
188        })
189    }
190}
191
192#[cfg(feature = "nightly")]
193impl<'c> Execute<&'c Client> for &std::path::Path {
194    type ExecuteOutput = impl Future<Output = Result<u64, Error>> + Send + 'c;
195    type QueryOutput = impl Future<Output = Result<RowSimpleStream, Error>> + Send + 'c;
196
197    #[inline]
198    fn execute(self, cli: &'c Client) -> Self::ExecuteOutput {
199        let path = self.to_path_buf();
200        async move {
201            tokio::task::spawn_blocking(|| std::fs::read_to_string(path))
202                .await
203                .unwrap()?
204                .execute(cli)
205                .await
206        }
207    }
208
209    #[inline]
210    fn query(self, cli: &'c Client) -> Self::QueryOutput {
211        let path = self.to_path_buf();
212        async move {
213            tokio::task::spawn_blocking(|| std::fs::read_to_string(path))
214                .await
215                .unwrap()?
216                .query(cli)
217                .await
218        }
219    }
220}
221
222pub struct ResultFuture<F>(Result<F, Option<Error>>);
223
224impl<F> From<Result<F, Error>> for ResultFuture<F> {
225    fn from(res: Result<F, Error>) -> Self {
226        Self(res.map_err(Some))
227    }
228}
229
230impl<F, T> Future for ResultFuture<F>
231where
232    F: Future<Output = Result<T, Error>> + Unpin,
233{
234    type Output = F::Output;
235
236    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
237        match self.get_mut().0 {
238            Ok(ref mut res) => Pin::new(res).poll(cx),
239            Err(ref mut e) => Poll::Ready(Err(e.take().unwrap())),
240        }
241    }
242}