1use core::{
2 future::{ready, Future, Ready},
3 pin::Pin,
4 task::{Context, Poll},
5};
6
7use crate::{
8 driver::codec::AsParams,
9 error::Error,
10 prepare::Prepare,
11 query::{Query, RowAffected, RowSimpleStream, RowStream, RowStreamGuarded},
12 statement::{
13 Statement, StatementCreate, StatementGuarded, StatementNamed, StatementQuery, StatementUnnamedBind,
14 StatementUnnamedQuery,
15 },
16 BoxedFuture,
17};
18
19use super::Execute;
20
21impl<'s, C> Execute<'_, C> for &'s Statement
22where
23 C: Query,
24{
25 type ExecuteOutput = ResultFuture<RowAffected>;
26 type QueryOutput = Ready<Result<RowStream<'s>, Error>>;
27
28 #[inline]
29 fn execute(self, cli: &C) -> Self::ExecuteOutput {
30 cli._query(self).map(RowAffected::from).into()
31 }
32
33 #[inline]
34 fn query(self, cli: &C) -> Self::QueryOutput {
35 ready(cli._query(self))
36 }
37}
38
39impl<'s, C> Execute<'_, C> for &'s str
40where
41 C: Query,
42{
43 type ExecuteOutput = ResultFuture<RowAffected>;
44 type QueryOutput = Ready<Result<RowSimpleStream, Error>>;
45
46 #[inline]
47 fn execute(self, cli: &C) -> Self::ExecuteOutput {
48 cli._query(self).map(RowAffected::from).into()
49 }
50
51 #[inline]
52 fn query(self, cli: &C) -> Self::QueryOutput {
53 ready(cli._query(self))
54 }
55}
56
57type IntoGuardedFuture<'c, C> = IntoGuarded<'c, BoxedFuture<'c, Result<Statement, Error>>, C>;
58
59pub struct IntoGuarded<'a, F, C> {
60 fut: F,
61 cli: &'a C,
62}
63
64impl<'a, F, C> Future for IntoGuarded<'a, F, C>
65where
66 F: Future<Output = Result<Statement, Error>> + Unpin,
67 C: Query,
68{
69 type Output = Result<StatementGuarded<'a, C>, Error>;
70
71 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
72 let this = self.get_mut();
73 Pin::new(&mut this.fut)
74 .poll(cx)
75 .map_ok(|stmt| stmt.into_guarded(this.cli))
76 }
77}
78
79impl<'c, C> Execute<'c, C> for StatementNamed<'_>
80where
81 C: Prepare + 'c,
82{
83 type ExecuteOutput = ResultFuture<IntoGuardedFuture<'c, C>>;
84 type QueryOutput = Self::ExecuteOutput;
85
86 #[inline]
87 fn execute(self, cli: &'c C) -> Self::ExecuteOutput {
88 cli._query(StatementCreate::from((self, cli)))
89 .map(|fut| IntoGuarded { fut, cli })
90 .into()
91 }
92
93 #[inline]
94 fn query(self, cli: &'c C) -> Self::QueryOutput {
95 self.execute(cli)
96 }
97}
98
99impl<'s, C, P> Execute<'_, C> for StatementQuery<'s, P>
100where
101 C: Query,
102 P: AsParams,
103{
104 type ExecuteOutput = ResultFuture<RowAffected>;
105 type QueryOutput = Ready<Result<RowStream<'s>, Error>>;
106
107 #[inline]
108 fn execute(self, cli: &C) -> Self::ExecuteOutput {
109 cli._query(self).map(RowAffected::from).into()
110 }
111
112 #[inline]
113 fn query(self, cli: &C) -> Self::QueryOutput {
114 ready(cli._query(self))
115 }
116}
117
118impl<'c, C, P> Execute<'c, C> for StatementUnnamedBind<'_, P>
119where
120 C: Prepare + 'c,
121 P: AsParams,
122{
123 type ExecuteOutput = ResultFuture<RowAffected>;
124 type QueryOutput = Ready<Result<RowStreamGuarded<'c, C>, Error>>;
125
126 #[inline]
127 fn execute(self, cli: &C) -> Self::ExecuteOutput {
128 cli._query(StatementUnnamedQuery::from((self, cli)))
129 .map(RowAffected::from)
130 .into()
131 }
132
133 #[inline]
134 fn query(self, cli: &'c C) -> Self::QueryOutput {
135 ready(cli._query(StatementUnnamedQuery::from((self, cli))))
136 }
137}
138
139impl<'c, C> Execute<'c, C> for &std::path::Path
140where
141 C: Query + Sync + 'c,
142{
143 type ExecuteOutput = BoxedFuture<'c, Result<u64, Error>>;
144 type QueryOutput = BoxedFuture<'c, Result<RowSimpleStream, Error>>;
145
146 #[inline]
147 fn execute(self, cli: &'c C) -> Self::ExecuteOutput {
148 let path = self.to_path_buf();
149 Box::pin(async move {
150 tokio::task::spawn_blocking(|| std::fs::read_to_string(path))
151 .await
152 .unwrap()?
153 .execute(cli)
154 .await
155 })
156 }
157
158 #[inline]
159 fn query(self, cli: &'c C) -> Self::QueryOutput {
160 let path = self.to_path_buf();
161 Box::pin(async move {
162 tokio::task::spawn_blocking(|| std::fs::read_to_string(path))
163 .await
164 .unwrap()?
165 .query(cli)
166 .await
167 })
168 }
169}
170
171pub struct ResultFuture<F>(Result<F, Option<Error>>);
172
173impl<F> From<Result<F, Error>> for ResultFuture<F> {
174 fn from(res: Result<F, Error>) -> Self {
175 Self(res.map_err(Some))
176 }
177}
178
179impl<F, T> Future for ResultFuture<F>
180where
181 F: Future<Output = Result<T, Error>> + Unpin,
182{
183 type Output = F::Output;
184
185 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
186 match self.get_mut().0 {
187 Ok(ref mut res) => Pin::new(res).poll(cx),
188 Err(ref mut e) => Poll::Ready(Err(e.take().unwrap())),
189 }
190 }
191}