sqlx_core_oldapi/pool/
executor.rs

1use either::Either;
2use futures_core::future::BoxFuture;
3use futures_core::stream::BoxStream;
4use futures_util::TryStreamExt;
5
6use crate::database::{Database, HasStatement};
7use crate::describe::Describe;
8use crate::error::Error;
9use crate::executor::{Execute, Executor};
10use crate::pool::Pool;
11
12impl<'p, DB: Database> Executor<'p> for &'_ Pool<DB>
13where
14    for<'c> &'c mut DB::Connection: Executor<'c, Database = DB>,
15{
16    type Database = DB;
17
18    fn fetch_many<'e, 'q: 'e, E>(
19        self,
20        query: E,
21    ) -> BoxStream<'e, Result<Either<DB::QueryResult, DB::Row>, Error>>
22    where
23        E: Execute<'q, Self::Database> + 'q,
24    {
25        let pool = self.clone();
26
27        Box::pin(try_stream! {
28            let mut conn = pool.acquire().await?;
29            let mut s = conn.fetch_many(query);
30
31            while let Some(v) = s.try_next().await? {
32                r#yield!(v);
33            }
34
35            Ok(())
36        })
37    }
38
39    fn fetch_optional<'e, 'q: 'e, E>(
40        self,
41        query: E,
42    ) -> BoxFuture<'e, Result<Option<DB::Row>, Error>>
43    where
44        E: Execute<'q, Self::Database> + 'q,
45    {
46        let pool = self.clone();
47
48        Box::pin(async move { pool.acquire().await?.fetch_optional(query).await })
49    }
50
51    fn prepare_with<'e, 'q: 'e>(
52        self,
53        sql: &'q str,
54        parameters: &'e [<Self::Database as Database>::TypeInfo],
55    ) -> BoxFuture<'e, Result<<Self::Database as HasStatement<'q>>::Statement, Error>> {
56        let pool = self.clone();
57
58        Box::pin(async move { pool.acquire().await?.prepare_with(sql, parameters).await })
59    }
60
61    #[doc(hidden)]
62    fn describe<'e, 'q: 'e>(
63        self,
64        sql: &'q str,
65    ) -> BoxFuture<'e, Result<Describe<Self::Database>, Error>> {
66        let pool = self.clone();
67
68        Box::pin(async move { pool.acquire().await?.describe(sql).await })
69    }
70}
71
72// NOTE: required due to lack of lazy normalization
73#[allow(unused_macros)]
74macro_rules! impl_executor_for_pool_connection {
75    ($DB:ident, $C:ident, $R:ident) => {
76        impl<'c> crate::executor::Executor<'c> for &'c mut crate::pool::PoolConnection<$DB> {
77            type Database = $DB;
78
79            #[inline]
80            fn fetch_many<'e, 'q: 'e, E>(
81                self,
82                query: E,
83            ) -> futures_core::stream::BoxStream<
84                'e,
85                Result<
86                    either::Either<<$DB as crate::database::Database>::QueryResult, $R>,
87                    crate::error::Error,
88                >,
89            >
90            where
91                'c: 'e,
92                E: crate::executor::Execute<'q, $DB> + 'q,
93            {
94                (**self).fetch_many(query)
95            }
96
97            #[inline]
98            fn fetch_optional<'e, 'q: 'e, E>(
99                self,
100                query: E,
101            ) -> futures_core::future::BoxFuture<'e, Result<Option<$R>, crate::error::Error>>
102            where
103                'c: 'e,
104                E: crate::executor::Execute<'q, $DB> + 'q,
105            {
106                (**self).fetch_optional(query)
107            }
108
109            #[inline]
110            fn prepare_with<'e, 'q: 'e>(
111                self,
112                sql: &'q str,
113                parameters: &'e [<$DB as crate::database::Database>::TypeInfo],
114            ) -> futures_core::future::BoxFuture<
115                'e,
116                Result<<$DB as crate::database::HasStatement<'q>>::Statement, crate::error::Error>,
117            >
118            where
119                'c: 'e,
120            {
121                (**self).prepare_with(sql, parameters)
122            }
123
124            #[doc(hidden)]
125            #[inline]
126            fn describe<'e, 'q: 'e>(
127                self,
128                sql: &'q str,
129            ) -> futures_core::future::BoxFuture<
130                'e,
131                Result<crate::describe::Describe<$DB>, crate::error::Error>,
132            >
133            where
134                'c: 'e,
135            {
136                (**self).describe(sql)
137            }
138        }
139    };
140}