sqlx_core/pool/executor.rs
1use either::Either;
2use futures_core::future::BoxFuture;
3use futures_core::stream::BoxStream;
4use futures_util::TryStreamExt;
5
6use crate::database::Database;
7use crate::describe::Describe;
8use crate::error::Error;
9use crate::executor::{Execute, Executor};
10use crate::pool::Pool;
11use crate::sql_str::SqlStr;
12
13impl<'p, DB: Database> Executor<'p> for &'_ Pool<DB>
14where
15 for<'c> &'c mut DB::Connection: Executor<'c, Database = DB>,
16{
17 type Database = DB;
18
19 fn fetch_many<'e, 'q: 'e, E>(
20 self,
21 query: E,
22 ) -> BoxStream<'e, Result<Either<DB::QueryResult, DB::Row>, Error>>
23 where
24 E: 'q + Execute<'q, Self::Database>,
25 {
26 let pool = self.clone();
27
28 Box::pin(try_stream! {
29 let mut conn = pool.acquire().await?;
30 let mut s = conn.fetch_many(query);
31
32 while let Some(v) = s.try_next().await? {
33 r#yield!(v);
34 }
35
36 Ok(())
37 })
38 }
39
40 fn fetch_optional<'e, 'q: 'e, E>(
41 self,
42 query: E,
43 ) -> BoxFuture<'e, Result<Option<DB::Row>, Error>>
44 where
45 E: 'q + Execute<'q, Self::Database>,
46 {
47 let pool = self.clone();
48
49 Box::pin(async move { pool.acquire().await?.fetch_optional(query).await })
50 }
51
52 fn prepare_with<'e>(
53 self,
54 sql: SqlStr,
55 parameters: &'e [<Self::Database as Database>::TypeInfo],
56 ) -> BoxFuture<'e, Result<<Self::Database as Database>::Statement, Error>>
57 where
58 'p: 'e,
59 {
60 let pool = self.clone();
61
62 Box::pin(async move { pool.acquire().await?.prepare_with(sql, parameters).await })
63 }
64
65 #[doc(hidden)]
66 fn describe<'e>(self, sql: SqlStr) -> BoxFuture<'e, Result<Describe<Self::Database>, Error>> {
67 let pool = self.clone();
68
69 Box::pin(async move { pool.acquire().await?.describe(sql).await })
70 }
71}
72
73// Causes an overflow when evaluating `&mut DB::Connection: Executor`.
74//
75//
76// impl<'c, DB: Database> crate::executor::Executor<'c> for &'c mut crate::pool::PoolConnection<DB>
77// where
78// &'c mut DB::Connection: Executor<'c, Database = DB>,
79// {
80// type Database = DB;
81//
82//
83//
84// #[inline]
85// fn fetch_many<'e, 'q: 'e, E: 'q>(
86// self,
87// query: E,
88// ) -> futures_core::stream::BoxStream<
89// 'e,
90// Result<
91// either::Either<<DB as crate::database::Database>::QueryResult, DB::Row>,
92// crate::error::Error,
93// >,
94// >
95// where
96// 'c: 'e,
97// E: crate::executor::Execute<'q, DB>,
98// {
99// (**self).fetch_many(query)
100// }
101//
102// #[inline]
103// fn fetch_optional<'e, 'q: 'e, E: 'q>(
104// self,
105// query: E,
106// ) -> futures_core::future::BoxFuture<'e, Result<Option<DB::Row>, crate::error::Error>>
107// where
108// 'c: 'e,
109// E: crate::executor::Execute<'q, DB>,
110// {
111// (**self).fetch_optional(query)
112// }
113//
114// #[inline]
115// fn prepare_with<'e, 'q: 'e>(
116// self,
117// sql: &'q str,
118// parameters: &'e [<DB as crate::database::Database>::TypeInfo],
119// ) -> futures_core::future::BoxFuture<
120// 'e,
121// Result<<DB as crate::database::Database>::Statement<'q>, crate::error::Error>,
122// >
123// where
124// 'c: 'e,
125// {
126// (**self).prepare_with(sql, parameters)
127// }
128//
129// #[doc(hidden)]
130// #[inline]
131// fn describe<'e, 'q: 'e>(
132// self,
133// sql: &'q str,
134// ) -> futures_core::future::BoxFuture<
135// 'e,
136// Result<crate::describe::Describe<DB>, crate::error::Error>,
137// >
138// where
139// 'c: 'e,
140// {
141// (**self).describe(sql)
142// }
143// }