Skip to main content

xitca_postgres/pool/
execute.rs

1use std::sync::Arc;
2
3use crate::{
4    driver::codec::AsParams,
5    error::Error,
6    execute::Execute,
7    query::{RowSimpleStream, RowStreamOwned},
8    statement::StatementQuery,
9};
10
11use super::Pool;
12
13#[cfg(not(feature = "nightly"))]
14impl<'c, 's> Execute<&'c Pool> for &'s str
15where
16    's: 'c,
17{
18    type ExecuteOutput = crate::BoxedFuture<'c, Result<u64, Error>>;
19    type QueryOutput = crate::BoxedFuture<'c, Result<RowSimpleStream, Error>>;
20
21    #[inline]
22    fn execute(self, pool: &'c Pool) -> Self::ExecuteOutput {
23        Box::pin(async {
24            {
25                let conn = pool.get().await?;
26                self.execute(&conn)
27            }
28            // return connection to pool before await on execution future
29            .await
30        })
31    }
32
33    #[inline]
34    fn query(self, pool: &'c Pool) -> Self::QueryOutput {
35        Box::pin(async {
36            let conn = pool.get().await?;
37            self.query(&conn).await
38        })
39    }
40}
41
42#[cfg(feature = "nightly")]
43impl<'c, 's> Execute<&'c Pool> for &'s str
44where
45    's: 'c,
46{
47    type ExecuteOutput = impl Future<Output = Result<u64, Error>> + Send + 'c;
48    type QueryOutput = impl Future<Output = Result<RowSimpleStream, Error>> + Send + 'c;
49
50    #[inline]
51    fn execute(self, pool: &'c Pool) -> Self::ExecuteOutput {
52        async {
53            {
54                let conn = pool.get().await?;
55                self.execute(&conn)
56            }
57            // return connection to pool before await on execution future
58            .await
59        }
60    }
61
62    #[inline]
63    fn query(self, pool: &'c Pool) -> Self::QueryOutput {
64        async {
65            let conn = pool.get().await?;
66            self.query(&conn).await
67        }
68    }
69}
70
71#[cfg(not(feature = "nightly"))]
72impl<'c, 's, P> Execute<&'c Pool> for StatementQuery<'s, P>
73where
74    P: AsParams + Send + 'c,
75    's: 'c,
76{
77    type ExecuteOutput = crate::BoxedFuture<'c, Result<u64, Error>>;
78    type QueryOutput = crate::BoxedFuture<'c, Result<RowStreamOwned, Error>>;
79
80    #[inline]
81    fn execute(self, pool: &'c Pool) -> Self::ExecuteOutput {
82        Box::pin(async {
83            {
84                let mut conn = pool.get().await?;
85                self.execute(&mut conn).await?
86            }
87            // return connection to pool before await on execution future
88            .await
89        })
90    }
91
92    #[inline]
93    fn query(self, pool: &'c Pool) -> Self::QueryOutput {
94        Box::pin(async {
95            let mut conn = pool.get().await?;
96            self.query(&mut conn).await
97        })
98    }
99}
100
101#[cfg(feature = "nightly")]
102impl<'c, 's, P> Execute<&'c Pool> for StatementQuery<'s, P>
103where
104    P: AsParams + Send + 'c,
105    's: 'c,
106{
107    type ExecuteOutput = impl Future<Output = Result<u64, Error>> + Send + 'c;
108    type QueryOutput = impl Future<Output = Result<RowStreamOwned, Error>> + Send + 'c;
109
110    #[inline]
111    fn execute(self, pool: &'c Pool) -> Self::ExecuteOutput {
112        async {
113            {
114                let mut conn = pool.get().await?;
115                self.execute(&mut conn).await?
116            }
117            // return connection to pool before await on execution future
118            .await
119        }
120    }
121
122    #[inline]
123    fn query(self, pool: &'c Pool) -> Self::QueryOutput {
124        async {
125            let mut conn = pool.get().await?;
126            self.query(&mut conn).await
127        }
128    }
129}
130
131#[cfg(not(feature = "nightly"))]
132impl<'c, 's, P, const N: usize> Execute<&'c Pool> for [StatementQuery<'s, P>; N]
133where
134    P: AsParams + Send + 'c,
135    's: 'c,
136{
137    type ExecuteOutput = crate::BoxedFuture<'c, Result<u64, Error>>;
138    type QueryOutput = crate::BoxedFuture<'c, Result<Vec<RowStreamOwned>, Error>>;
139
140    #[inline]
141    fn execute(self, pool: &'c Pool) -> Self::ExecuteOutput {
142        Box::pin(execute_iter_with_pool(self.into_iter(), pool))
143    }
144
145    #[inline]
146    fn query(self, pool: &'c Pool) -> Self::QueryOutput {
147        Box::pin(query_iter_with_pool(self.into_iter(), pool))
148    }
149}
150
151#[cfg(feature = "nightly")]
152impl<'c, 's, P, const N: usize> Execute<&'c Pool> for [StatementQuery<'s, P>; N]
153where
154    P: AsParams + Send + 'c,
155    's: 'c,
156{
157    type ExecuteOutput = impl Future<Output = Result<u64, Error>> + Send + 'c;
158    type QueryOutput = impl Future<Output = Result<Vec<RowStreamOwned>, Error>> + Send + 'c;
159
160    #[inline]
161    fn execute(self, pool: &'c Pool) -> Self::ExecuteOutput {
162        execute_iter_with_pool(self.into_iter(), pool)
163    }
164
165    #[inline]
166    fn query(self, pool: &'c Pool) -> Self::QueryOutput {
167        query_iter_with_pool(self.into_iter(), pool)
168    }
169}
170
171#[cfg(not(feature = "nightly"))]
172impl<'c, 's, P> Execute<&'c Pool> for Vec<StatementQuery<'s, P>>
173where
174    P: AsParams + Send + 'c,
175    's: 'c,
176{
177    type ExecuteOutput = crate::BoxedFuture<'c, Result<u64, Error>>;
178    type QueryOutput = crate::BoxedFuture<'c, Result<Vec<RowStreamOwned>, Error>>;
179
180    #[inline]
181    fn execute(self, pool: &'c Pool) -> Self::ExecuteOutput {
182        Box::pin(execute_iter_with_pool(self.into_iter(), pool))
183    }
184
185    #[inline]
186    fn query(self, pool: &'c Pool) -> Self::QueryOutput {
187        Box::pin(query_iter_with_pool(self.into_iter(), pool))
188    }
189}
190
191#[cfg(feature = "nightly")]
192impl<'c, 's, P> Execute<&'c Pool> for Vec<StatementQuery<'s, P>>
193where
194    P: AsParams + Send + 'c,
195    's: 'c,
196{
197    type ExecuteOutput = impl Future<Output = Result<u64, Error>> + Send + 'c;
198    type QueryOutput = impl Future<Output = Result<Vec<RowStreamOwned>, Error>> + Send + 'c;
199
200    #[inline]
201    fn execute(self, pool: &'c Pool) -> Self::ExecuteOutput {
202        execute_iter_with_pool(self.into_iter(), pool)
203    }
204
205    #[inline]
206    fn query(self, pool: &'c Pool) -> Self::QueryOutput {
207        query_iter_with_pool(self.into_iter(), pool)
208    }
209}
210
211async fn execute_iter_with_pool<P>(
212    iter: impl Iterator<Item = StatementQuery<'_, P>> + Send,
213    pool: &Pool,
214) -> Result<u64, Error>
215where
216    P: AsParams + Send,
217{
218    let mut res = Vec::with_capacity(iter.size_hint().0);
219
220    {
221        let mut conn = pool.get().await?;
222
223        for stmt in iter {
224            let fut = stmt.execute(&mut conn).await?;
225            res.push(fut);
226        }
227    }
228
229    let mut num = 0;
230
231    for res in res {
232        num += res.await?;
233    }
234
235    Ok(num)
236}
237
238async fn query_iter_with_pool<P>(
239    iter: impl Iterator<Item = StatementQuery<'_, P>> + Send,
240    pool: &Pool,
241) -> Result<Vec<RowStreamOwned>, Error>
242where
243    P: AsParams + Send,
244{
245    let mut res = Vec::with_capacity(iter.size_hint().0);
246
247    let mut conn = pool.get().await?;
248
249    for stmt in iter {
250        let stream = stmt.query(&mut conn).await?;
251        res.push(stream);
252    }
253
254    Ok(res)
255}
256
257impl<'c, Q> Execute<&'c Arc<Pool>> for Q
258where
259    Q: Execute<&'c Pool>,
260{
261    type ExecuteOutput = Q::ExecuteOutput;
262    type QueryOutput = Q::QueryOutput;
263
264    #[inline]
265    fn execute(self, pool: &'c Arc<Pool>) -> Self::ExecuteOutput {
266        Q::execute(self, pool)
267    }
268
269    #[inline]
270    fn query(self, pool: &'c Arc<Pool>) -> Self::QueryOutput {
271        Q::query(self, pool)
272    }
273}