sqlx_exasol_impl/connection/
executor.rs1use futures_core::{future::BoxFuture, stream::BoxStream};
2use futures_util::{FutureExt, StreamExt, TryStreamExt};
3use sqlx_core::{
4 describe::Describe,
5 executor::{Execute, Executor},
6 logger::QueryLogger,
7 sql_str::SqlStr,
8 Either,
9};
10
11use super::stream::ResultStream;
12use crate::{
13 connection::websocket::future::{
14 self, ExecuteBatch, ExecutePrepared, GetOrPrepare, WebSocketFuture,
15 },
16 database::Exasol,
17 responses::DescribeStatement,
18 statement::{ExaStatement, ExaStatementMetadata},
19 ExaConnection, ExaQueryResult, ExaRow, ExaTypeInfo, SqlxError, SqlxResult,
20};
21
22impl<'c> Executor<'c> for &'c mut ExaConnection {
23 type Database = Exasol;
24
25 fn execute<'e, 'q, E>(self, query: E) -> BoxFuture<'e, SqlxResult<ExaQueryResult>>
26 where
27 'q: 'e,
28 'c: 'e,
29 E: 'q + Execute<'q, Exasol>,
30 {
31 match self.fetch_impl(query) {
32 Ok(stream) => stream
33 .try_filter_map(|v| std::future::ready(Ok(v.left())))
34 .try_collect()
35 .boxed(),
36 Err(e) => std::future::ready(Err(e)).boxed(),
37 }
38 }
39
40 fn execute_many<'e, 'q, E>(self, query: E) -> BoxStream<'e, SqlxResult<ExaQueryResult>>
41 where
42 'q: 'e,
43 'c: 'e,
44 E: 'q + Execute<'q, Exasol>,
45 {
46 match self.fetch_many_impl(query) {
47 Ok(stream) => stream
48 .try_filter_map(|step| std::future::ready(Ok(step.left())))
49 .boxed(),
50 Err(e) => std::future::ready(Err(e)).into_stream().boxed(),
51 }
52 }
53
54 fn fetch<'e, 'q, E>(self, query: E) -> BoxStream<'e, SqlxResult<ExaRow>>
55 where
56 'q: 'e,
57 'c: 'e,
58 E: 'q + Execute<'q, Exasol>,
59 {
60 match self.fetch_impl(query) {
61 Ok(stream) => stream
62 .try_filter_map(|v| std::future::ready(Ok(v.right())))
63 .boxed(),
64 Err(e) => std::future::ready(Err(e)).into_stream().boxed(),
65 }
66 }
67
68 fn fetch_many<'e, 'q, E>(
69 self,
70 query: E,
71 ) -> BoxStream<'e, SqlxResult<Either<ExaQueryResult, ExaRow>>>
72 where
73 'q: 'e,
74 'c: 'e,
75 E: 'q + Execute<'q, Exasol>,
76 {
77 match self.fetch_many_impl(query) {
78 Ok(stream) => stream.boxed(),
79 Err(e) => std::future::ready(Err(e)).into_stream().boxed(),
80 }
81 }
82
83 fn fetch_all<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, SqlxResult<Vec<ExaRow>>>
84 where
85 'c: 'e,
86 E: 'q + Execute<'q, Self::Database>,
87 {
88 match self.fetch_impl(query) {
89 Ok(stream) => stream
90 .try_filter_map(|v| std::future::ready(Ok(v.right())))
91 .try_collect()
92 .boxed(),
93 Err(e) => std::future::ready(Err(e)).boxed(),
94 }
95 }
96
97 fn fetch_one<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, SqlxResult<ExaRow>>
98 where
99 'c: 'e,
100 E: 'q + Execute<'q, Self::Database>,
101 {
102 let stream = match self.fetch_impl(query) {
103 Ok(stream) => stream,
104 Err(e) => return std::future::ready(Err(e)).boxed(),
105 };
106
107 Box::pin(async move {
108 stream
109 .try_filter_map(|v| std::future::ready(Ok(v.right())))
110 .try_next()
111 .await
112 .transpose()
113 .unwrap_or(Err(SqlxError::RowNotFound))
114 })
115 }
116
117 fn fetch_optional<'e, 'q, E>(self, query: E) -> BoxFuture<'e, SqlxResult<Option<ExaRow>>>
118 where
119 'q: 'e,
120 'c: 'e,
121 E: 'q + Execute<'q, Exasol>,
122 {
123 let stream = match self.fetch_impl(query) {
124 Ok(stream) => stream,
125 Err(e) => return std::future::ready(Err(e)).boxed(),
126 };
127
128 Box::pin(async move {
129 stream
130 .try_filter_map(|v| std::future::ready(Ok(v.right())))
131 .try_next()
132 .await
133 })
134 }
135
136 fn prepare_with<'e>(
137 self,
138 sql: SqlStr,
139 _parameters: &'e [ExaTypeInfo],
140 ) -> BoxFuture<'e, SqlxResult<ExaStatement>>
141 where
142 'c: 'e,
143 {
144 Box::pin(async move {
145 let prepared = GetOrPrepare::new(sql.clone(), true)
146 .future(&mut self.ws)
147 .await?;
148
149 Ok(ExaStatement {
150 sql,
151 metadata: ExaStatementMetadata::new(
152 prepared.columns.clone(),
153 prepared.parameters.clone(),
154 ),
155 })
156 })
157 }
158
159 fn describe<'e>(self, sql: SqlStr) -> BoxFuture<'e, SqlxResult<Describe<Exasol>>>
161 where
162 'c: 'e,
163 {
164 Box::pin(async move {
165 let DescribeStatement {
166 columns,
167 parameters,
168 ..
169 } = future::Describe::new(sql).future(&mut self.ws).await?;
170
171 let nullable = (0..columns.len()).map(|_| None).collect();
172
173 Ok(Describe {
174 parameters: Some(Either::Left(parameters)),
175 columns,
176 nullable,
177 })
178 })
179 }
180}
181
182impl ExaConnection {
183 fn fetch_impl<'c, 'e, 'q, E>(&'c mut self, mut query: E) -> SqlxResult<ResultStream<'e>>
184 where
185 'q: 'e,
186 'c: 'e,
187 E: 'q + Execute<'q, Exasol>,
188 {
189 let persist = query.persistent();
190 let arguments = query.take_arguments().map_err(SqlxError::Encode)?;
191 let logger = QueryLogger::new(query.sql(), self.log_settings.clone());
192 let sql = logger.sql().clone();
193
194 if let Some(arguments) = arguments {
195 let future = ExecutePrepared::new(sql, persist, arguments);
196 Ok(ResultStream::new(&mut self.ws, logger, future))
197 } else {
198 let future = future::Execute::new(sql);
199 Ok(ResultStream::new(&mut self.ws, logger, future))
200 }
201 }
202
203 fn fetch_many_impl<'c, 'e, 'q, E>(&'c mut self, mut query: E) -> SqlxResult<ResultStream<'e>>
204 where
205 'q: 'e,
206 'c: 'e,
207 E: 'q + Execute<'q, Exasol>,
208 {
209 let persist = query.persistent();
210 let arguments = query.take_arguments().map_err(SqlxError::Encode)?;
211 let logger = QueryLogger::new(query.sql(), self.log_settings.clone());
212 let sql = logger.sql().clone();
213
214 if let Some(arguments) = arguments {
215 let future = ExecutePrepared::new(sql, persist, arguments);
216 Ok(ResultStream::new(&mut self.ws, logger, future))
217 } else {
218 let future = ExecuteBatch::new(sql);
219 Ok(ResultStream::new(&mut self.ws, logger, future))
220 }
221 }
222}