sqlx_core_oldapi/any/connection/
executor.rs

1use crate::any::connection::AnyConnectionKind;
2use crate::any::{
3    Any, AnyColumn, AnyConnection, AnyQueryResult, AnyRow, AnyStatement, AnyTypeInfo,
4};
5use crate::database::Database;
6use crate::describe::Describe;
7use crate::error::Error;
8use crate::executor::{Execute, Executor};
9use either::Either;
10use futures_core::future::BoxFuture;
11use futures_core::stream::BoxStream;
12use futures_util::{StreamExt, TryStreamExt};
13
14impl<'c> Executor<'c> for &'c mut AnyConnection {
15    type Database = Any;
16
17    fn fetch_many<'e, 'q: 'e, E>(
18        self,
19        mut query: E,
20    ) -> BoxStream<'e, Result<Either<AnyQueryResult, AnyRow>, Error>>
21    where
22        'c: 'e,
23        E: Execute<'q, Self::Database> + 'q,
24    {
25        let arguments = query.take_arguments();
26        let query = query.sql();
27
28        match &mut self.0 {
29            #[cfg(feature = "postgres")]
30            AnyConnectionKind::Postgres(conn) => conn
31                .fetch_many((query, arguments.map(Into::into)))
32                .map_ok(|v| v.map_right(Into::into).map_left(Into::into))
33                .boxed(),
34
35            #[cfg(feature = "mysql")]
36            AnyConnectionKind::MySql(conn) => conn
37                .fetch_many((query, arguments.map(Into::into)))
38                .map_ok(|v| v.map_right(Into::into).map_left(Into::into))
39                .boxed(),
40
41            #[cfg(feature = "sqlite")]
42            AnyConnectionKind::Sqlite(conn) => conn
43                .fetch_many((query, arguments.map(Into::into)))
44                .map_ok(|v| v.map_right(Into::into).map_left(Into::into))
45                .boxed(),
46
47            #[cfg(feature = "mssql")]
48            AnyConnectionKind::Mssql(conn) => conn
49                .fetch_many((query, arguments.map(Into::into)))
50                .map_ok(|v| v.map_right(Into::into).map_left(Into::into))
51                .boxed(),
52
53            #[cfg(feature = "odbc")]
54            AnyConnectionKind::Odbc(conn) => conn
55                .fetch_many((query, arguments.map(Into::into)))
56                .map_ok(|v| v.map_right(Into::into).map_left(Into::into))
57                .boxed(),
58        }
59    }
60
61    fn fetch_optional<'e, 'q: 'e, E>(
62        self,
63        mut query: E,
64    ) -> BoxFuture<'e, Result<Option<AnyRow>, Error>>
65    where
66        'c: 'e,
67        E: Execute<'q, Self::Database> + 'q,
68    {
69        let arguments = query.take_arguments();
70        let query = query.sql();
71
72        Box::pin(async move {
73            Ok(match &mut self.0 {
74                #[cfg(feature = "postgres")]
75                AnyConnectionKind::Postgres(conn) => conn
76                    .fetch_optional((query, arguments.map(Into::into)))
77                    .await?
78                    .map(Into::into),
79
80                #[cfg(feature = "mysql")]
81                AnyConnectionKind::MySql(conn) => conn
82                    .fetch_optional((query, arguments.map(Into::into)))
83                    .await?
84                    .map(Into::into),
85
86                #[cfg(feature = "sqlite")]
87                AnyConnectionKind::Sqlite(conn) => conn
88                    .fetch_optional((query, arguments.map(Into::into)))
89                    .await?
90                    .map(Into::into),
91
92                #[cfg(feature = "mssql")]
93                AnyConnectionKind::Mssql(conn) => conn
94                    .fetch_optional((query, arguments.map(Into::into)))
95                    .await?
96                    .map(Into::into),
97
98                #[cfg(feature = "odbc")]
99                AnyConnectionKind::Odbc(conn) => conn
100                    .fetch_optional((query, arguments.map(Into::into)))
101                    .await?
102                    .map(Into::into),
103            })
104        })
105    }
106
107    fn prepare_with<'e, 'q: 'e>(
108        self,
109        sql: &'q str,
110        _parameters: &[AnyTypeInfo],
111    ) -> BoxFuture<'e, Result<AnyStatement<'q>, Error>>
112    where
113        'c: 'e,
114    {
115        Box::pin(async move {
116            Ok(match &mut self.0 {
117                // To match other databases here, we explicitly ignore the parameter types
118                #[cfg(feature = "postgres")]
119                AnyConnectionKind::Postgres(conn) => conn.prepare(sql).await.map(Into::into)?,
120
121                #[cfg(feature = "mysql")]
122                AnyConnectionKind::MySql(conn) => conn.prepare(sql).await.map(Into::into)?,
123
124                #[cfg(feature = "sqlite")]
125                AnyConnectionKind::Sqlite(conn) => conn.prepare(sql).await.map(Into::into)?,
126
127                #[cfg(feature = "mssql")]
128                AnyConnectionKind::Mssql(conn) => conn.prepare(sql).await.map(Into::into)?,
129
130                #[cfg(feature = "odbc")]
131                AnyConnectionKind::Odbc(conn) => conn.prepare(sql).await.map(Into::into)?,
132            })
133        })
134    }
135
136    fn describe<'e, 'q: 'e>(
137        self,
138        sql: &'q str,
139    ) -> BoxFuture<'e, Result<Describe<Self::Database>, Error>>
140    where
141        'c: 'e,
142    {
143        Box::pin(async move {
144            Ok(match &mut self.0 {
145                #[cfg(feature = "postgres")]
146                AnyConnectionKind::Postgres(conn) => conn.describe(sql).await.map(map_describe)?,
147
148                #[cfg(feature = "mysql")]
149                AnyConnectionKind::MySql(conn) => conn.describe(sql).await.map(map_describe)?,
150
151                #[cfg(feature = "sqlite")]
152                AnyConnectionKind::Sqlite(conn) => conn.describe(sql).await.map(map_describe)?,
153
154                #[cfg(feature = "mssql")]
155                AnyConnectionKind::Mssql(conn) => conn.describe(sql).await.map(map_describe)?,
156
157                #[cfg(feature = "odbc")]
158                AnyConnectionKind::Odbc(conn) => conn.describe(sql).await.map(map_describe)?,
159            })
160        })
161    }
162}
163
164fn map_describe<DB: Database>(info: Describe<DB>) -> Describe<Any>
165where
166    AnyTypeInfo: From<DB::TypeInfo>,
167    AnyColumn: From<DB::Column>,
168{
169    let parameters = match info.parameters {
170        None => None,
171        Some(Either::Right(num)) => Some(Either::Right(num)),
172        Some(Either::Left(params)) => {
173            Some(Either::Left(params.into_iter().map(Into::into).collect()))
174        }
175    };
176
177    Describe {
178        parameters,
179        nullable: info.nullable,
180        columns: info.columns.into_iter().map(Into::into).collect(),
181    }
182}