sqlx_core_oldapi/any/connection/
executor.rs

1use crate::any::connection::AnyConnectionKind;
2use crate::any::{
3    Any, AnyColumn, AnyConnection, AnyQueryResult, AnyRow, AnyStatement, AnyTypeInfo,
4};
5use crate::database::Database;
6use crate::describe::Describe;
7use crate::error::Error;
8use crate::executor::{Execute, Executor};
9use either::Either;
10use futures_core::future::BoxFuture;
11use futures_core::stream::BoxStream;
12use futures_util::{StreamExt, TryStreamExt};
13
14impl<'c> Executor<'c> for &'c mut AnyConnection {
15    type Database = Any;
16
17    fn fetch_many<'e, 'q: 'e, E: 'q>(
18        self,
19        mut query: E,
20    ) -> BoxStream<'e, Result<Either<AnyQueryResult, AnyRow>, Error>>
21    where
22        'c: 'e,
23        E: Execute<'q, Self::Database>,
24    {
25        let arguments = query.take_arguments();
26        let query = query.sql();
27
28        match &mut self.0 {
29            #[cfg(feature = "postgres")]
30            AnyConnectionKind::Postgres(conn) => conn
31                .fetch_many((query, arguments.map(Into::into)))
32                .map_ok(|v| v.map_right(Into::into).map_left(Into::into))
33                .boxed(),
34
35            #[cfg(feature = "mysql")]
36            AnyConnectionKind::MySql(conn) => conn
37                .fetch_many((query, arguments.map(Into::into)))
38                .map_ok(|v| v.map_right(Into::into).map_left(Into::into))
39                .boxed(),
40
41            #[cfg(feature = "sqlite")]
42            AnyConnectionKind::Sqlite(conn) => conn
43                .fetch_many((query, arguments.map(Into::into)))
44                .map_ok(|v| v.map_right(Into::into).map_left(Into::into))
45                .boxed(),
46
47            #[cfg(feature = "mssql")]
48            AnyConnectionKind::Mssql(conn) => conn
49                .fetch_many((query, arguments.map(Into::into)))
50                .map_ok(|v| v.map_right(Into::into).map_left(Into::into))
51                .boxed(),
52        }
53    }
54
55    fn fetch_optional<'e, 'q: 'e, E: 'q>(
56        self,
57        mut query: E,
58    ) -> BoxFuture<'e, Result<Option<AnyRow>, Error>>
59    where
60        'c: 'e,
61        E: Execute<'q, Self::Database>,
62    {
63        let arguments = query.take_arguments();
64        let query = query.sql();
65
66        Box::pin(async move {
67            Ok(match &mut self.0 {
68                #[cfg(feature = "postgres")]
69                AnyConnectionKind::Postgres(conn) => conn
70                    .fetch_optional((query, arguments.map(Into::into)))
71                    .await?
72                    .map(Into::into),
73
74                #[cfg(feature = "mysql")]
75                AnyConnectionKind::MySql(conn) => conn
76                    .fetch_optional((query, arguments.map(Into::into)))
77                    .await?
78                    .map(Into::into),
79
80                #[cfg(feature = "sqlite")]
81                AnyConnectionKind::Sqlite(conn) => conn
82                    .fetch_optional((query, arguments.map(Into::into)))
83                    .await?
84                    .map(Into::into),
85
86                #[cfg(feature = "mssql")]
87                AnyConnectionKind::Mssql(conn) => conn
88                    .fetch_optional((query, arguments.map(Into::into)))
89                    .await?
90                    .map(Into::into),
91            })
92        })
93    }
94
95    fn prepare_with<'e, 'q: 'e>(
96        self,
97        sql: &'q str,
98        _parameters: &[AnyTypeInfo],
99    ) -> BoxFuture<'e, Result<AnyStatement<'q>, Error>>
100    where
101        'c: 'e,
102    {
103        Box::pin(async move {
104            Ok(match &mut self.0 {
105                // To match other databases here, we explicitly ignore the parameter types
106                #[cfg(feature = "postgres")]
107                AnyConnectionKind::Postgres(conn) => conn.prepare(sql).await.map(Into::into)?,
108
109                #[cfg(feature = "mysql")]
110                AnyConnectionKind::MySql(conn) => conn.prepare(sql).await.map(Into::into)?,
111
112                #[cfg(feature = "sqlite")]
113                AnyConnectionKind::Sqlite(conn) => conn.prepare(sql).await.map(Into::into)?,
114
115                #[cfg(feature = "mssql")]
116                AnyConnectionKind::Mssql(conn) => conn.prepare(sql).await.map(Into::into)?,
117            })
118        })
119    }
120
121    fn describe<'e, 'q: 'e>(
122        self,
123        sql: &'q str,
124    ) -> BoxFuture<'e, Result<Describe<Self::Database>, Error>>
125    where
126        'c: 'e,
127    {
128        Box::pin(async move {
129            Ok(match &mut self.0 {
130                #[cfg(feature = "postgres")]
131                AnyConnectionKind::Postgres(conn) => conn.describe(sql).await.map(map_describe)?,
132
133                #[cfg(feature = "mysql")]
134                AnyConnectionKind::MySql(conn) => conn.describe(sql).await.map(map_describe)?,
135
136                #[cfg(feature = "sqlite")]
137                AnyConnectionKind::Sqlite(conn) => conn.describe(sql).await.map(map_describe)?,
138
139                #[cfg(feature = "mssql")]
140                AnyConnectionKind::Mssql(conn) => conn.describe(sql).await.map(map_describe)?,
141            })
142        })
143    }
144}
145
146fn map_describe<DB: Database>(info: Describe<DB>) -> Describe<Any>
147where
148    AnyTypeInfo: From<DB::TypeInfo>,
149    AnyColumn: From<DB::Column>,
150{
151    let parameters = match info.parameters {
152        None => None,
153        Some(Either::Right(num)) => Some(Either::Right(num)),
154        Some(Either::Left(params)) => {
155            Some(Either::Left(params.into_iter().map(Into::into).collect()))
156        }
157    };
158
159    Describe {
160        parameters,
161        nullable: info.nullable,
162        columns: info.columns.into_iter().map(Into::into).collect(),
163    }
164}