1use std::{borrow::Cow, future::ready};
2
3use futures_core::{future::BoxFuture, stream::BoxStream};
4use futures_util::{FutureExt, StreamExt, TryStreamExt};
5use sqlx_core::{
6 database::Database,
7 describe::Describe,
8 executor::{Execute, Executor},
9 logger::QueryLogger,
10 Either,
11};
12
13use super::stream::ResultStream;
14use crate::{
15 connection::websocket::future::{
16 self, ExecuteBatch, ExecutePrepared, GetOrPrepare, WebSocketFuture,
17 },
18 database::Exasol,
19 responses::DescribeStatement,
20 statement::{ExaStatement, ExaStatementMetadata},
21 ExaConnection, SqlxError, SqlxResult,
22};
23
24impl<'c> Executor<'c> for &'c mut ExaConnection {
25 type Database = Exasol;
26
27 fn execute<'e, 'q, E>(
28 self,
29 mut query: E,
30 ) -> BoxFuture<'e, SqlxResult<<Self::Database as Database>::QueryResult>>
31 where
32 'q: 'e,
33 'c: 'e,
34 E: 'q + Execute<'q, Self::Database>,
35 {
36 let sql = query.sql();
37 let persist = query.persistent();
38 let logger = QueryLogger::new(sql, self.log_settings.clone());
39 let arguments = match query.take_arguments().map_err(SqlxError::Encode) {
40 Ok(a) => a,
41 Err(e) => return Box::pin(ready(Err(e))),
42 };
43
44 let filter_fn = |step| async move {
45 Ok(match step {
46 Either::Left(rows) => Some(rows),
47 Either::Right(_) => None,
48 })
49 };
50
51 if let Some(arguments) = arguments {
52 let future = ExecutePrepared::new(sql, persist, arguments);
53 ResultStream::new(&mut self.ws, logger, future)
54 .try_filter_map(filter_fn)
55 .try_collect()
56 .boxed()
57 } else {
58 let future = future::Execute::new(sql);
59 ResultStream::new(&mut self.ws, logger, future)
60 .try_filter_map(filter_fn)
61 .try_collect()
62 .boxed()
63 }
64 }
65
66 fn execute_many<'e, 'q, E>(
67 self,
68 query: E,
69 ) -> BoxStream<'e, SqlxResult<<Self::Database as Database>::QueryResult>>
70 where
71 'q: 'e,
72 'c: 'e,
73 E: 'q + Execute<'q, Self::Database>,
74 {
75 self.fetch_many(query)
76 .try_filter_map(|step| async move {
77 Ok(match step {
78 Either::Left(rows) => Some(rows),
79 Either::Right(_) => None,
80 })
81 })
82 .boxed()
83 }
84
85 fn fetch<'e, 'q, E>(
86 self,
87 mut query: E,
88 ) -> BoxStream<'e, SqlxResult<<Self::Database as Database>::Row>>
89 where
90 'q: 'e,
91 'c: 'e,
92 E: 'q + Execute<'q, Self::Database>,
93 {
94 let sql = query.sql();
95 let persist = query.persistent();
96 let logger = QueryLogger::new(sql, self.log_settings.clone());
97 let arguments = match query.take_arguments().map_err(SqlxError::Encode) {
98 Ok(a) => a,
99 Err(e) => return Box::pin(ready(Err(e)).into_stream()),
100 };
101
102 let filter_fn = |step| async move {
103 Ok(match step {
104 Either::Left(_) => None,
105 Either::Right(row) => Some(row),
106 })
107 };
108
109 if let Some(arguments) = arguments {
110 let future = ExecutePrepared::new(sql, persist, arguments);
111 Box::pin(ResultStream::new(&mut self.ws, logger, future).try_filter_map(filter_fn))
112 } else {
113 let future = future::Execute::new(sql);
114 Box::pin(ResultStream::new(&mut self.ws, logger, future).try_filter_map(filter_fn))
115 }
116 }
117
118 fn fetch_many<'e, 'q, E>(
119 self,
120 mut query: E,
121 ) -> BoxStream<
122 'e,
123 SqlxResult<
124 Either<<Self::Database as Database>::QueryResult, <Self::Database as Database>::Row>,
125 >,
126 >
127 where
128 'q: 'e,
129 'c: 'e,
130 E: 'q + Execute<'q, Self::Database>,
131 {
132 let sql = query.sql();
133 let persist = query.persistent();
134 let logger = QueryLogger::new(sql, self.log_settings.clone());
135 let arguments = match query.take_arguments().map_err(SqlxError::Encode) {
136 Ok(a) => a,
137 Err(e) => return Box::pin(ready(Err(e)).into_stream()),
138 };
139
140 if let Some(arguments) = arguments {
141 let future = ExecutePrepared::new(sql, persist, arguments);
142 Box::pin(ResultStream::new(&mut self.ws, logger, future))
143 } else {
144 let future = ExecuteBatch::new(sql);
145 Box::pin(ResultStream::new(&mut self.ws, logger, future))
146 }
147 }
148
149 fn fetch_optional<'e, 'q, E>(
150 self,
151 query: E,
152 ) -> BoxFuture<'e, SqlxResult<Option<<Self::Database as Database>::Row>>>
153 where
154 'q: 'e,
155 'c: 'e,
156 E: 'q + Execute<'q, Self::Database>,
157 {
158 let mut s = self.fetch_many(query);
159
160 Box::pin(async move {
161 while let Some(v) = s.try_next().await? {
162 if let Either::Right(r) = v {
163 return Ok(Some(r));
164 }
165 }
166
167 Ok(None)
168 })
169 }
170
171 fn prepare_with<'e, 'q>(
172 self,
173 sql: &'q str,
174 _parameters: &'e [<Self::Database as Database>::TypeInfo],
175 ) -> BoxFuture<'e, SqlxResult<<Self::Database as Database>::Statement<'q>>>
176 where
177 'q: 'e,
178 'c: 'e,
179 {
180 Box::pin(async move {
181 let prepared = GetOrPrepare::new(sql, true).future(&mut self.ws).await?;
182
183 Ok(ExaStatement {
184 sql: Cow::Borrowed(sql),
185 metadata: ExaStatementMetadata::new(
186 prepared.columns.clone(),
187 prepared.parameters.clone(),
188 ),
189 })
190 })
191 }
192
193 fn describe<'e, 'q>(self, sql: &'q str) -> BoxFuture<'e, SqlxResult<Describe<Self::Database>>>
195 where
196 'q: 'e,
197 'c: 'e,
198 {
199 Box::pin(async move {
200 let DescribeStatement {
201 columns,
202 parameters,
203 ..
204 } = future::Describe::new(sql).future(&mut self.ws).await?;
205
206 let nullable = (0..columns.len()).map(|_| None).collect();
207
208 Ok(Describe {
209 parameters: Some(Either::Left(parameters)),
210 columns,
211 nullable,
212 })
213 })
214 }
215}