xitca_postgres/execute/
async_impl.rs

1use core::{
2    future::{ready, Future, Ready},
3    pin::Pin,
4    task::{Context, Poll},
5};
6
7use crate::{
8    driver::codec::AsParams,
9    error::Error,
10    prepare::Prepare,
11    query::{Query, RowAffected, RowSimpleStream, RowStream, RowStreamGuarded},
12    statement::{
13        Statement, StatementCreate, StatementGuarded, StatementNamed, StatementQuery, StatementUnnamedBind,
14        StatementUnnamedQuery,
15    },
16    BoxedFuture,
17};
18
19use super::Execute;
20
21impl<'s, C> Execute<'_, C> for &'s Statement
22where
23    C: Query,
24{
25    type ExecuteOutput = ResultFuture<RowAffected>;
26    type QueryOutput = Ready<Result<RowStream<'s>, Error>>;
27
28    #[inline]
29    fn execute(self, cli: &C) -> Self::ExecuteOutput {
30        cli._query(self).map(RowAffected::from).into()
31    }
32
33    #[inline]
34    fn query(self, cli: &C) -> Self::QueryOutput {
35        ready(cli._query(self))
36    }
37}
38
39impl<'s, C> Execute<'_, C> for &'s str
40where
41    C: Query,
42{
43    type ExecuteOutput = ResultFuture<RowAffected>;
44    type QueryOutput = Ready<Result<RowSimpleStream, Error>>;
45
46    #[inline]
47    fn execute(self, cli: &C) -> Self::ExecuteOutput {
48        cli._query(self).map(RowAffected::from).into()
49    }
50
51    #[inline]
52    fn query(self, cli: &C) -> Self::QueryOutput {
53        ready(cli._query(self))
54    }
55}
56
57type IntoGuardedFuture<'c, C> = IntoGuarded<'c, BoxedFuture<'c, Result<Statement, Error>>, C>;
58
59pub struct IntoGuarded<'a, F, C> {
60    fut: F,
61    cli: &'a C,
62}
63
64impl<'a, F, C> Future for IntoGuarded<'a, F, C>
65where
66    F: Future<Output = Result<Statement, Error>> + Unpin,
67    C: Query,
68{
69    type Output = Result<StatementGuarded<'a, C>, Error>;
70
71    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
72        let this = self.get_mut();
73        Pin::new(&mut this.fut)
74            .poll(cx)
75            .map_ok(|stmt| stmt.into_guarded(this.cli))
76    }
77}
78
79impl<'c, C> Execute<'c, C> for StatementNamed<'_>
80where
81    C: Prepare + 'c,
82{
83    type ExecuteOutput = ResultFuture<IntoGuardedFuture<'c, C>>;
84    type QueryOutput = Self::ExecuteOutput;
85
86    #[inline]
87    fn execute(self, cli: &'c C) -> Self::ExecuteOutput {
88        cli._query(StatementCreate::from((self, cli)))
89            .map(|fut| IntoGuarded { fut, cli })
90            .into()
91    }
92
93    #[inline]
94    fn query(self, cli: &'c C) -> Self::QueryOutput {
95        self.execute(cli)
96    }
97}
98
99impl<'s, C, P> Execute<'_, C> for StatementQuery<'s, P>
100where
101    C: Query,
102    P: AsParams,
103{
104    type ExecuteOutput = ResultFuture<RowAffected>;
105    type QueryOutput = Ready<Result<RowStream<'s>, Error>>;
106
107    #[inline]
108    fn execute(self, cli: &C) -> Self::ExecuteOutput {
109        cli._query(self).map(RowAffected::from).into()
110    }
111
112    #[inline]
113    fn query(self, cli: &C) -> Self::QueryOutput {
114        ready(cli._query(self))
115    }
116}
117
118impl<'c, C, P> Execute<'c, C> for StatementUnnamedBind<'_, P>
119where
120    C: Prepare + 'c,
121    P: AsParams,
122{
123    type ExecuteOutput = ResultFuture<RowAffected>;
124    type QueryOutput = Ready<Result<RowStreamGuarded<'c, C>, Error>>;
125
126    #[inline]
127    fn execute(self, cli: &C) -> Self::ExecuteOutput {
128        cli._query(StatementUnnamedQuery::from((self, cli)))
129            .map(RowAffected::from)
130            .into()
131    }
132
133    #[inline]
134    fn query(self, cli: &'c C) -> Self::QueryOutput {
135        ready(cli._query(StatementUnnamedQuery::from((self, cli))))
136    }
137}
138
139impl<'c, C> Execute<'c, C> for &std::path::Path
140where
141    C: Query + Sync + 'c,
142{
143    type ExecuteOutput = BoxedFuture<'c, Result<u64, Error>>;
144    type QueryOutput = BoxedFuture<'c, Result<RowSimpleStream, Error>>;
145
146    #[inline]
147    fn execute(self, cli: &'c C) -> Self::ExecuteOutput {
148        let path = self.to_path_buf();
149        Box::pin(async move {
150            tokio::task::spawn_blocking(|| std::fs::read_to_string(path))
151                .await
152                .unwrap()?
153                .execute(cli)
154                .await
155        })
156    }
157
158    #[inline]
159    fn query(self, cli: &'c C) -> Self::QueryOutput {
160        let path = self.to_path_buf();
161        Box::pin(async move {
162            tokio::task::spawn_blocking(|| std::fs::read_to_string(path))
163                .await
164                .unwrap()?
165                .query(cli)
166                .await
167        })
168    }
169}
170
171pub struct ResultFuture<F>(Result<F, Option<Error>>);
172
173impl<F> From<Result<F, Error>> for ResultFuture<F> {
174    fn from(res: Result<F, Error>) -> Self {
175        Self(res.map_err(Some))
176    }
177}
178
179impl<F, T> Future for ResultFuture<F>
180where
181    F: Future<Output = Result<T, Error>> + Unpin,
182{
183    type Output = F::Output;
184
185    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
186        match self.get_mut().0 {
187            Ok(ref mut res) => Pin::new(res).poll(cx),
188            Err(ref mut e) => Poll::Ready(Err(e.take().unwrap())),
189        }
190    }
191}