sqlx_core_oldapi/pool/
executor.rs1use either::Either;
2use futures_core::future::BoxFuture;
3use futures_core::stream::BoxStream;
4use futures_util::TryStreamExt;
5
6use crate::database::{Database, HasStatement};
7use crate::describe::Describe;
8use crate::error::Error;
9use crate::executor::{Execute, Executor};
10use crate::pool::Pool;
11
12impl<'p, DB: Database> Executor<'p> for &'_ Pool<DB>
13where
14 for<'c> &'c mut DB::Connection: Executor<'c, Database = DB>,
15{
16 type Database = DB;
17
18 fn fetch_many<'e, 'q: 'e, E>(
19 self,
20 query: E,
21 ) -> BoxStream<'e, Result<Either<DB::QueryResult, DB::Row>, Error>>
22 where
23 E: Execute<'q, Self::Database> + 'q,
24 {
25 let pool = self.clone();
26
27 Box::pin(try_stream! {
28 let mut conn = pool.acquire().await?;
29 let mut s = conn.fetch_many(query);
30
31 while let Some(v) = s.try_next().await? {
32 r#yield!(v);
33 }
34
35 Ok(())
36 })
37 }
38
39 fn fetch_optional<'e, 'q: 'e, E>(
40 self,
41 query: E,
42 ) -> BoxFuture<'e, Result<Option<DB::Row>, Error>>
43 where
44 E: Execute<'q, Self::Database> + 'q,
45 {
46 let pool = self.clone();
47
48 Box::pin(async move { pool.acquire().await?.fetch_optional(query).await })
49 }
50
51 fn prepare_with<'e, 'q: 'e>(
52 self,
53 sql: &'q str,
54 parameters: &'e [<Self::Database as Database>::TypeInfo],
55 ) -> BoxFuture<'e, Result<<Self::Database as HasStatement<'q>>::Statement, Error>> {
56 let pool = self.clone();
57
58 Box::pin(async move { pool.acquire().await?.prepare_with(sql, parameters).await })
59 }
60
61 #[doc(hidden)]
62 fn describe<'e, 'q: 'e>(
63 self,
64 sql: &'q str,
65 ) -> BoxFuture<'e, Result<Describe<Self::Database>, Error>> {
66 let pool = self.clone();
67
68 Box::pin(async move { pool.acquire().await?.describe(sql).await })
69 }
70}
71
72#[allow(unused_macros)]
74macro_rules! impl_executor_for_pool_connection {
75 ($DB:ident, $C:ident, $R:ident) => {
76 impl<'c> crate::executor::Executor<'c> for &'c mut crate::pool::PoolConnection<$DB> {
77 type Database = $DB;
78
79 #[inline]
80 fn fetch_many<'e, 'q: 'e, E>(
81 self,
82 query: E,
83 ) -> futures_core::stream::BoxStream<
84 'e,
85 Result<
86 either::Either<<$DB as crate::database::Database>::QueryResult, $R>,
87 crate::error::Error,
88 >,
89 >
90 where
91 'c: 'e,
92 E: crate::executor::Execute<'q, $DB> + 'q,
93 {
94 (**self).fetch_many(query)
95 }
96
97 #[inline]
98 fn fetch_optional<'e, 'q: 'e, E>(
99 self,
100 query: E,
101 ) -> futures_core::future::BoxFuture<'e, Result<Option<$R>, crate::error::Error>>
102 where
103 'c: 'e,
104 E: crate::executor::Execute<'q, $DB> + 'q,
105 {
106 (**self).fetch_optional(query)
107 }
108
109 #[inline]
110 fn prepare_with<'e, 'q: 'e>(
111 self,
112 sql: &'q str,
113 parameters: &'e [<$DB as crate::database::Database>::TypeInfo],
114 ) -> futures_core::future::BoxFuture<
115 'e,
116 Result<<$DB as crate::database::HasStatement<'q>>::Statement, crate::error::Error>,
117 >
118 where
119 'c: 'e,
120 {
121 (**self).prepare_with(sql, parameters)
122 }
123
124 #[doc(hidden)]
125 #[inline]
126 fn describe<'e, 'q: 'e>(
127 self,
128 sql: &'q str,
129 ) -> futures_core::future::BoxFuture<
130 'e,
131 Result<crate::describe::Describe<$DB>, crate::error::Error>,
132 >
133 where
134 'c: 'e,
135 {
136 (**self).describe(sql)
137 }
138 }
139 };
140}