sqlx_core_oldapi/any/connection/
executor.rs1use crate::any::connection::AnyConnectionKind;
2use crate::any::{
3 Any, AnyColumn, AnyConnection, AnyQueryResult, AnyRow, AnyStatement, AnyTypeInfo,
4};
5use crate::database::Database;
6use crate::describe::Describe;
7use crate::error::Error;
8use crate::executor::{Execute, Executor};
9use either::Either;
10use futures_core::future::BoxFuture;
11use futures_core::stream::BoxStream;
12use futures_util::{StreamExt, TryStreamExt};
13
14impl<'c> Executor<'c> for &'c mut AnyConnection {
15 type Database = Any;
16
17 fn fetch_many<'e, 'q: 'e, E>(
18 self,
19 mut query: E,
20 ) -> BoxStream<'e, Result<Either<AnyQueryResult, AnyRow>, Error>>
21 where
22 'c: 'e,
23 E: Execute<'q, Self::Database> + 'q,
24 {
25 let arguments = query.take_arguments();
26 let query = query.sql();
27
28 match &mut self.0 {
29 #[cfg(feature = "postgres")]
30 AnyConnectionKind::Postgres(conn) => conn
31 .fetch_many((query, arguments.map(Into::into)))
32 .map_ok(|v| v.map_right(Into::into).map_left(Into::into))
33 .boxed(),
34
35 #[cfg(feature = "mysql")]
36 AnyConnectionKind::MySql(conn) => conn
37 .fetch_many((query, arguments.map(Into::into)))
38 .map_ok(|v| v.map_right(Into::into).map_left(Into::into))
39 .boxed(),
40
41 #[cfg(feature = "sqlite")]
42 AnyConnectionKind::Sqlite(conn) => conn
43 .fetch_many((query, arguments.map(Into::into)))
44 .map_ok(|v| v.map_right(Into::into).map_left(Into::into))
45 .boxed(),
46
47 #[cfg(feature = "mssql")]
48 AnyConnectionKind::Mssql(conn) => conn
49 .fetch_many((query, arguments.map(Into::into)))
50 .map_ok(|v| v.map_right(Into::into).map_left(Into::into))
51 .boxed(),
52
53 #[cfg(feature = "odbc")]
54 AnyConnectionKind::Odbc(conn) => conn
55 .fetch_many((query, arguments.map(Into::into)))
56 .map_ok(|v| v.map_right(Into::into).map_left(Into::into))
57 .boxed(),
58 }
59 }
60
61 fn fetch_optional<'e, 'q: 'e, E>(
62 self,
63 mut query: E,
64 ) -> BoxFuture<'e, Result<Option<AnyRow>, Error>>
65 where
66 'c: 'e,
67 E: Execute<'q, Self::Database> + 'q,
68 {
69 let arguments = query.take_arguments();
70 let query = query.sql();
71
72 Box::pin(async move {
73 Ok(match &mut self.0 {
74 #[cfg(feature = "postgres")]
75 AnyConnectionKind::Postgres(conn) => conn
76 .fetch_optional((query, arguments.map(Into::into)))
77 .await?
78 .map(Into::into),
79
80 #[cfg(feature = "mysql")]
81 AnyConnectionKind::MySql(conn) => conn
82 .fetch_optional((query, arguments.map(Into::into)))
83 .await?
84 .map(Into::into),
85
86 #[cfg(feature = "sqlite")]
87 AnyConnectionKind::Sqlite(conn) => conn
88 .fetch_optional((query, arguments.map(Into::into)))
89 .await?
90 .map(Into::into),
91
92 #[cfg(feature = "mssql")]
93 AnyConnectionKind::Mssql(conn) => conn
94 .fetch_optional((query, arguments.map(Into::into)))
95 .await?
96 .map(Into::into),
97
98 #[cfg(feature = "odbc")]
99 AnyConnectionKind::Odbc(conn) => conn
100 .fetch_optional((query, arguments.map(Into::into)))
101 .await?
102 .map(Into::into),
103 })
104 })
105 }
106
107 fn prepare_with<'e, 'q: 'e>(
108 self,
109 sql: &'q str,
110 _parameters: &[AnyTypeInfo],
111 ) -> BoxFuture<'e, Result<AnyStatement<'q>, Error>>
112 where
113 'c: 'e,
114 {
115 Box::pin(async move {
116 Ok(match &mut self.0 {
117 #[cfg(feature = "postgres")]
119 AnyConnectionKind::Postgres(conn) => conn.prepare(sql).await.map(Into::into)?,
120
121 #[cfg(feature = "mysql")]
122 AnyConnectionKind::MySql(conn) => conn.prepare(sql).await.map(Into::into)?,
123
124 #[cfg(feature = "sqlite")]
125 AnyConnectionKind::Sqlite(conn) => conn.prepare(sql).await.map(Into::into)?,
126
127 #[cfg(feature = "mssql")]
128 AnyConnectionKind::Mssql(conn) => conn.prepare(sql).await.map(Into::into)?,
129
130 #[cfg(feature = "odbc")]
131 AnyConnectionKind::Odbc(conn) => conn.prepare(sql).await.map(Into::into)?,
132 })
133 })
134 }
135
136 fn describe<'e, 'q: 'e>(
137 self,
138 sql: &'q str,
139 ) -> BoxFuture<'e, Result<Describe<Self::Database>, Error>>
140 where
141 'c: 'e,
142 {
143 Box::pin(async move {
144 Ok(match &mut self.0 {
145 #[cfg(feature = "postgres")]
146 AnyConnectionKind::Postgres(conn) => conn.describe(sql).await.map(map_describe)?,
147
148 #[cfg(feature = "mysql")]
149 AnyConnectionKind::MySql(conn) => conn.describe(sql).await.map(map_describe)?,
150
151 #[cfg(feature = "sqlite")]
152 AnyConnectionKind::Sqlite(conn) => conn.describe(sql).await.map(map_describe)?,
153
154 #[cfg(feature = "mssql")]
155 AnyConnectionKind::Mssql(conn) => conn.describe(sql).await.map(map_describe)?,
156
157 #[cfg(feature = "odbc")]
158 AnyConnectionKind::Odbc(conn) => conn.describe(sql).await.map(map_describe)?,
159 })
160 })
161 }
162}
163
164fn map_describe<DB: Database>(info: Describe<DB>) -> Describe<Any>
165where
166 AnyTypeInfo: From<DB::TypeInfo>,
167 AnyColumn: From<DB::Column>,
168{
169 let parameters = match info.parameters {
170 None => None,
171 Some(Either::Right(num)) => Some(Either::Right(num)),
172 Some(Either::Left(params)) => {
173 Some(Either::Left(params.into_iter().map(Into::into).collect()))
174 }
175 };
176
177 Describe {
178 parameters,
179 nullable: info.nullable,
180 columns: info.columns.into_iter().map(Into::into).collect(),
181 }
182}