sqlx_core/pool/executor.rs
1use either::Either;
2use futures_core::future::BoxFuture;
3use futures_core::stream::BoxStream;
4use futures_util::TryStreamExt;
5
6use crate::database::Database;
7use crate::error::Error;
8use crate::executor::{Execute, Executor};
9use crate::pool::Pool;
10use crate::sql_str::SqlStr;
11
12impl<'p, DB: Database> Executor<'p> for &'_ Pool<DB>
13where
14 for<'c> &'c mut DB::Connection: Executor<'c, Database = DB>,
15{
16 type Database = DB;
17
18 fn fetch_many<'e, 'q: 'e, E>(
19 self,
20 query: E,
21 ) -> BoxStream<'e, Result<Either<DB::QueryResult, DB::Row>, Error>>
22 where
23 E: 'q + Execute<'q, Self::Database>,
24 {
25 let pool = self.clone();
26
27 Box::pin(try_stream! {
28 let mut conn = pool.acquire().await?;
29 let mut s = conn.fetch_many(query);
30
31 while let Some(v) = s.try_next().await? {
32 r#yield!(v);
33 }
34
35 Ok(())
36 })
37 }
38
39 fn fetch_optional<'e, 'q: 'e, E>(
40 self,
41 query: E,
42 ) -> BoxFuture<'e, Result<Option<DB::Row>, Error>>
43 where
44 E: 'q + Execute<'q, Self::Database>,
45 {
46 let pool = self.clone();
47
48 Box::pin(async move { pool.acquire().await?.fetch_optional(query).await })
49 }
50
51 fn prepare_with<'e>(
52 self,
53 sql: SqlStr,
54 parameters: &'e [<Self::Database as Database>::TypeInfo],
55 ) -> BoxFuture<'e, Result<<Self::Database as Database>::Statement, Error>>
56 where
57 'p: 'e,
58 {
59 let pool = self.clone();
60
61 Box::pin(async move { pool.acquire().await?.prepare_with(sql, parameters).await })
62 }
63
64 #[doc(hidden)]
65 #[cfg(feature = "offline")]
66 fn describe<'e>(
67 self,
68 sql: SqlStr,
69 ) -> BoxFuture<'e, Result<crate::describe::Describe<Self::Database>, Error>> {
70 let pool = self.clone();
71
72 Box::pin(async move { pool.acquire().await?.describe(sql).await })
73 }
74}
75
76// Causes an overflow when evaluating `&mut DB::Connection: Executor`.
77//
78//
79// impl<'c, DB: Database> crate::executor::Executor<'c> for &'c mut crate::pool::PoolConnection<DB>
80// where
81// &'c mut DB::Connection: Executor<'c, Database = DB>,
82// {
83// type Database = DB;
84//
85//
86//
87// #[inline]
88// fn fetch_many<'e, 'q: 'e, E: 'q>(
89// self,
90// query: E,
91// ) -> futures_core::stream::BoxStream<
92// 'e,
93// Result<
94// either::Either<<DB as crate::database::Database>::QueryResult, DB::Row>,
95// crate::error::Error,
96// >,
97// >
98// where
99// 'c: 'e,
100// E: crate::executor::Execute<'q, DB>,
101// {
102// (**self).fetch_many(query)
103// }
104//
105// #[inline]
106// fn fetch_optional<'e, 'q: 'e, E: 'q>(
107// self,
108// query: E,
109// ) -> futures_core::future::BoxFuture<'e, Result<Option<DB::Row>, crate::error::Error>>
110// where
111// 'c: 'e,
112// E: crate::executor::Execute<'q, DB>,
113// {
114// (**self).fetch_optional(query)
115// }
116//
117// #[inline]
118// fn prepare_with<'e, 'q: 'e>(
119// self,
120// sql: &'q str,
121// parameters: &'e [<DB as crate::database::Database>::TypeInfo],
122// ) -> futures_core::future::BoxFuture<
123// 'e,
124// Result<<DB as crate::database::Database>::Statement<'q>, crate::error::Error>,
125// >
126// where
127// 'c: 'e,
128// {
129// (**self).prepare_with(sql, parameters)
130// }
131//
132// #[doc(hidden)]
133// #[cfg(feature = "offline")]
134// #[inline]
135// fn describe<'e, 'q: 'e>(
136// self,
137// sql: &'q str,
138// ) -> futures_core::future::BoxFuture<
139// 'e,
140// Result<crate::describe::Describe<DB>, crate::error::Error>,
141// >
142// where
143// 'c: 'e,
144// {
145// (**self).describe(sql)
146// }
147// }