xitca_postgres/execute/
async_impl.rs

1use core::{
2    future::{Future, Ready, ready},
3    pin::Pin,
4    task::{Context, Poll},
5};
6
7use crate::{
8    BoxedFuture,
9    driver::codec::AsParams,
10    error::Error,
11    prepare::Prepare,
12    query::{Query, RowAffected, RowSimpleStream, RowStream, RowStreamGuarded, RowStreamOwned},
13    statement::{
14        Statement, StatementCreate, StatementGuarded, StatementNamed, StatementPreparedQuery,
15        StatementPreparedQueryOwned, StatementQuery, StatementSingleRTTQuery,
16    },
17};
18
19use super::Execute;
20
21impl<'s, C> Execute<&C> for &'s Statement
22where
23    C: Query,
24{
25    type ExecuteOutput = ResultFuture<RowAffected>;
26    type QueryOutput = Ready<Result<RowStream<'s>, Error>>;
27
28    #[inline]
29    fn execute(self, cli: &C) -> Self::ExecuteOutput {
30        self.bind_none().execute(cli)
31    }
32
33    #[inline]
34    fn query(self, cli: &C) -> Self::QueryOutput {
35        self.bind_none().query(cli)
36    }
37}
38
39impl<C> Execute<&C> for &str
40where
41    C: Query,
42{
43    type ExecuteOutput = ResultFuture<RowAffected>;
44    type QueryOutput = Ready<Result<RowSimpleStream, Error>>;
45
46    #[inline]
47    fn execute(self, cli: &C) -> Self::ExecuteOutput {
48        cli._query(self).map(RowAffected::from).into()
49    }
50
51    #[inline]
52    fn query(self, cli: &C) -> Self::QueryOutput {
53        ready(cli._query(self))
54    }
55}
56
57type IntoGuardedFuture<'c, C> = IntoGuarded<'c, BoxedFuture<'c, Result<Statement, Error>>, C>;
58
59pub struct IntoGuarded<'a, F, C> {
60    fut: F,
61    cli: &'a C,
62}
63
64impl<'a, F, C> Future for IntoGuarded<'a, F, C>
65where
66    F: Future<Output = Result<Statement, Error>> + Unpin,
67    C: Query,
68{
69    type Output = Result<StatementGuarded<'a, C>, Error>;
70
71    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
72        let this = self.get_mut();
73        Pin::new(&mut this.fut)
74            .poll(cx)
75            .map_ok(|stmt| stmt.into_guarded(this.cli))
76    }
77}
78
79impl<'c, C> Execute<&'c C> for StatementNamed<'_>
80where
81    C: Prepare,
82{
83    type ExecuteOutput = ResultFuture<IntoGuardedFuture<'c, C>>;
84    type QueryOutput = Self::ExecuteOutput;
85
86    #[inline]
87    fn execute(self, cli: &'c C) -> Self::ExecuteOutput {
88        cli._query(StatementCreate::from((self, cli)))
89            .map(|fut| IntoGuarded { fut, cli })
90            .into()
91    }
92
93    #[inline]
94    fn query(self, cli: &'c C) -> Self::QueryOutput {
95        self.execute(cli)
96    }
97}
98
99impl<'s, C, P> Execute<&C> for StatementPreparedQuery<'s, P>
100where
101    C: Query,
102    P: AsParams,
103{
104    type ExecuteOutput = ResultFuture<RowAffected>;
105    type QueryOutput = Ready<Result<RowStream<'s>, Error>>;
106
107    #[inline]
108    fn execute(self, cli: &C) -> Self::ExecuteOutput {
109        cli._query(self).map(RowAffected::from).into()
110    }
111
112    #[inline]
113    fn query(self, cli: &C) -> Self::QueryOutput {
114        ready(cli._query(self))
115    }
116}
117
118impl<'s, C, P> Execute<&C> for StatementPreparedQueryOwned<'s, P>
119where
120    C: Query,
121    P: AsParams,
122{
123    type ExecuteOutput = ResultFuture<RowAffected>;
124    type QueryOutput = Ready<Result<RowStreamOwned, Error>>;
125
126    #[inline]
127    fn execute(self, cli: &C) -> Self::ExecuteOutput {
128        cli._query(self).map(RowAffected::from).into()
129    }
130
131    #[inline]
132    fn query(self, cli: &C) -> Self::QueryOutput {
133        ready(cli._query(self))
134    }
135}
136
137impl<'c, C, P> Execute<&'c C> for StatementQuery<'_, P>
138where
139    C: Prepare,
140    P: AsParams,
141{
142    type ExecuteOutput = ResultFuture<RowAffected>;
143    type QueryOutput = Ready<Result<RowStreamGuarded<'c, C>, Error>>;
144
145    #[inline]
146    fn execute(self, cli: &C) -> Self::ExecuteOutput {
147        self.into_single_rtt().execute(cli)
148    }
149
150    #[inline]
151    fn query(self, cli: &'c C) -> Self::QueryOutput {
152        self.into_single_rtt().query(cli)
153    }
154}
155
156impl<'c, C, P> Execute<&'c C> for StatementSingleRTTQuery<'_, P>
157where
158    C: Prepare,
159    P: AsParams,
160{
161    type ExecuteOutput = ResultFuture<RowAffected>;
162    type QueryOutput = Ready<Result<RowStreamGuarded<'c, C>, Error>>;
163
164    #[inline]
165    fn execute(self, cli: &C) -> Self::ExecuteOutput {
166        cli._query(self.into_with_cli(cli)).map(RowAffected::from).into()
167    }
168
169    #[inline]
170    fn query(self, cli: &'c C) -> Self::QueryOutput {
171        ready(cli._query(self.into_with_cli(cli)))
172    }
173}
174
175#[cfg(not(feature = "nightly"))]
176impl<'c, C> Execute<&'c C> for &std::path::Path
177where
178    C: Query + Sync,
179{
180    type ExecuteOutput = BoxedFuture<'c, Result<u64, Error>>;
181    type QueryOutput = BoxedFuture<'c, Result<RowSimpleStream, Error>>;
182
183    #[inline]
184    fn execute(self, cli: &'c C) -> Self::ExecuteOutput {
185        let path = self.to_path_buf();
186        Box::pin(async move {
187            tokio::task::spawn_blocking(|| std::fs::read_to_string(path))
188                .await
189                .unwrap()?
190                .execute(cli)
191                .await
192        })
193    }
194
195    #[inline]
196    fn query(self, cli: &'c C) -> Self::QueryOutput {
197        let path = self.to_path_buf();
198        Box::pin(async move {
199            tokio::task::spawn_blocking(|| std::fs::read_to_string(path))
200                .await
201                .unwrap()?
202                .query(cli)
203                .await
204        })
205    }
206}
207
208#[cfg(feature = "nightly")]
209impl<'c, C> Execute<&'c C> for &std::path::Path
210where
211    C: Query + Sync,
212{
213    type ExecuteOutput = impl Future<Output = Result<u64, Error>> + Send + 'c;
214    type QueryOutput = impl Future<Output = Result<RowSimpleStream, Error>> + Send + 'c;
215
216    #[inline]
217    fn execute(self, cli: &'c C) -> Self::ExecuteOutput {
218        let path = self.to_path_buf();
219        async move {
220            tokio::task::spawn_blocking(|| std::fs::read_to_string(path))
221                .await
222                .unwrap()?
223                .execute(cli)
224                .await
225        }
226    }
227
228    #[inline]
229    fn query(self, cli: &'c C) -> Self::QueryOutput {
230        let path = self.to_path_buf();
231        async move {
232            tokio::task::spawn_blocking(|| std::fs::read_to_string(path))
233                .await
234                .unwrap()?
235                .query(cli)
236                .await
237        }
238    }
239}
240
241pub struct ResultFuture<F>(Result<F, Option<Error>>);
242
243impl<F> From<Result<F, Error>> for ResultFuture<F> {
244    fn from(res: Result<F, Error>) -> Self {
245        Self(res.map_err(Some))
246    }
247}
248
249impl<F, T> Future for ResultFuture<F>
250where
251    F: Future<Output = Result<T, Error>> + Unpin,
252{
253    type Output = F::Output;
254
255    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
256        match self.get_mut().0 {
257            Ok(ref mut res) => Pin::new(res).poll(cx),
258            Err(ref mut e) => Poll::Ready(Err(e.take().unwrap())),
259        }
260    }
261}