sqlx_exasol_impl/connection/
mod.rs1#[cfg(feature = "etl")]
2pub mod etl;
3mod executor;
4pub mod stream;
5pub mod websocket;
6
7use std::{
8 fmt::Write,
9 net::{SocketAddr, ToSocketAddrs},
10};
11
12use futures_util::SinkExt;
13use rand::{seq::SliceRandom, thread_rng};
14use sqlx_core::{
15 connection::{Connection, LogSettings},
16 executor::Executor,
17 transaction::Transaction,
18};
19use websocket::{socket::WithExaSocket, ExaWebSocket};
20
21use crate::{
22 connection::websocket::{
23 future::{ClosePrepared, Disconnect, SetAttributes, WebSocketFuture},
24 WithMaybeTlsExaSocket,
25 },
26 database::Exasol,
27 options::ExaConnectOptions,
28 responses::{ExaAttributes, SessionInfo},
29 SqlxError, SqlxResult,
30};
31
32#[derive(Debug)]
34pub struct ExaConnection {
35 pub(crate) ws: ExaWebSocket,
36 pub(crate) log_settings: LogSettings,
37 session_info: SessionInfo,
38}
39
40impl ExaConnection {
41 pub fn server(&self) -> SocketAddr {
43 self.ws.server()
44 }
45
46 pub fn attributes(&self) -> &ExaAttributes {
48 &self.ws.attributes
49 }
50
51 pub fn attributes_mut(&mut self) -> &mut ExaAttributes {
56 &mut self.ws.attributes
57 }
58
59 pub async fn flush_attributes(&mut self) -> SqlxResult<()> {
65 SetAttributes::default().future(&mut self.ws).await
66 }
67
68 pub fn session_info(&self) -> &SessionInfo {
70 &self.session_info
71 }
72
73 pub(crate) async fn establish(opts: &ExaConnectOptions) -> SqlxResult<Self> {
74 let mut error = SqlxError::Configuration("Could not connect to Exasol".into());
75 let mut resolved = Vec::with_capacity(opts.hosts.len());
76
77 for (host, port) in &opts.hosts {
78 let h = host.clone();
79 let port = *port;
80
81 let sock_addrs =
82 sqlx_core::rt::spawn_blocking(move || (h.as_ref(), port).to_socket_addrs()).await?;
83
84 for sock_addr in sock_addrs {
85 resolved.push((host, sock_addr));
86 }
87 }
88
89 resolved.shuffle(&mut thread_rng());
91
92 let mut ip_buf = String::new();
94
95 for (host, sock_addr) in resolved {
97 let (ip, port) = (sock_addr.ip(), sock_addr.port());
98 write!(&mut ip_buf, "{ip}")
99 .map_err(From::from)
100 .map_err(SqlxError::Configuration)?;
101
102 let wrapper = WithExaSocket(sock_addr);
103 let with_socket = WithMaybeTlsExaSocket::new(wrapper, host.as_ref(), opts.into());
104 let socket_res = sqlx_core::net::connect_tcp(&ip_buf, port, with_socket).await;
105
106 ip_buf.clear();
108
109 let (socket, with_tls) = match socket_res {
111 Ok(Ok((socket, with_tls))) => (socket, with_tls),
112 Ok(Err(err)) | Err(err) => {
113 error = err;
114 continue;
115 }
116 };
117
118 match ExaWebSocket::new(host.as_ref(), port, socket, opts.try_into()?, with_tls).await {
119 Err(err) => error = err,
120 Ok((ws, session_info)) => {
122 let mut con = Self {
123 ws,
124 log_settings: LogSettings::default(),
125 session_info,
126 };
127
128 con.configure_session().await?;
129 return Ok(con);
130 }
131 }
132 }
133
134 Err(error)
136 }
137
138 async fn configure_session(&mut self) -> SqlxResult<()> {
140 self.execute("ALTER SESSION SET HASHTYPE_FORMAT = 'HEX';")
143 .await?;
144 Ok(())
145 }
146}
147
148impl Connection for ExaConnection {
149 type Database = Exasol;
150
151 type Options = ExaConnectOptions;
152
153 async fn close(mut self) -> SqlxResult<()> {
154 Disconnect::default().future(&mut self.ws).await?;
155 self.ws.close().await?;
156 Ok(())
157 }
158
159 async fn close_hard(mut self) -> SqlxResult<()> {
160 self.ws.close().await
161 }
162
163 async fn ping(&mut self) -> SqlxResult<()> {
164 self.ws.ping().await
165 }
166
167 async fn begin(&mut self) -> SqlxResult<Transaction<'_, Self::Database>>
168 where
169 Self: Sized,
170 {
171 Transaction::begin(self, None).await
172 }
173
174 fn shrink_buffers(&mut self) {}
175
176 async fn flush(&mut self) -> SqlxResult<()> {
177 if let Some(future) = self.ws.pending_close.take() {
178 future.future(&mut self.ws).await?;
179 }
180
181 if let Some(future) = self.ws.pending_rollback.take() {
182 future.future(&mut self.ws).await?;
183 }
184
185 Ok(())
186 }
187
188 fn should_flush(&self) -> bool {
189 self.ws.pending_close.is_some() || self.ws.pending_rollback.is_some()
190 }
191
192 fn cached_statements_size(&self) -> usize
193 where
194 Self::Database: sqlx_core::database::HasStatementCache,
195 {
196 self.ws.statement_cache.len()
197 }
198
199 async fn clear_cached_statements(&mut self) -> SqlxResult<()>
200 where
201 Self::Database: sqlx_core::database::HasStatementCache,
202 {
203 while let Some(prep) = self.ws.statement_cache.remove_lru() {
204 ClosePrepared::new(prep.statement_handle)
205 .future(&mut self.ws)
206 .await?;
207 }
208
209 Ok(())
210 }
211}
212
213#[cfg(test)]
214#[cfg(feature = "migrate")]
215#[allow(clippy::large_futures, reason = "silencing clippy")]
216mod tests {
217 use futures_util::TryStreamExt;
218 use sqlx::Executor;
219 use sqlx_core::{error::BoxDynError, pool::PoolOptions};
220
221 use crate::{ExaConnectOptions, Exasol};
222
223 #[sqlx::test]
224 async fn test_stmt_cache(
225 pool_opts: PoolOptions<Exasol>,
226 mut exa_opts: ExaConnectOptions,
227 ) -> Result<(), BoxDynError> {
228 exa_opts.statement_cache_capacity = 1;
230
231 let pool = pool_opts.connect_with(exa_opts).await?;
232 let mut con = pool.acquire().await?;
233
234 let sql1 = "SELECT 1 FROM dual";
235 let sql2 = "SELECT 2 FROM dual";
236
237 assert!(!con.as_mut().ws.statement_cache.contains_key(sql1));
238 assert!(!con.as_mut().ws.statement_cache.contains_key(sql2));
239
240 sqlx::query(sql1).execute(&mut *con).await?;
241 assert!(con.as_mut().ws.statement_cache.contains_key(sql1));
242 assert!(!con.as_mut().ws.statement_cache.contains_key(sql2));
243
244 sqlx::query(sql2).execute(&mut *con).await?;
245 assert!(!con.as_mut().ws.statement_cache.contains_key(sql1));
246 assert!(con.as_mut().ws.statement_cache.contains_key(sql2));
247
248 Ok(())
249 }
250
251 #[sqlx::test]
252 async fn test_schema_none_selected(
253 pool_opts: PoolOptions<Exasol>,
254 mut exa_opts: ExaConnectOptions,
255 ) -> Result<(), BoxDynError> {
256 exa_opts.schema = None;
257
258 let pool = pool_opts.connect_with(exa_opts).await?;
259 let mut con = pool.acquire().await?;
260
261 let schema: Option<String> = sqlx::query_scalar("SELECT CURRENT_SCHEMA")
262 .fetch_one(&mut *con)
263 .await?;
264
265 assert!(schema.is_none());
266
267 Ok(())
268 }
269
270 #[sqlx::test]
271 async fn test_connection_result_set_auto_close(
272 pool_opts: PoolOptions<Exasol>,
273 exa_opts: ExaConnectOptions,
274 ) -> Result<(), BoxDynError> {
275 let pool = pool_opts.max_connections(1).connect_with(exa_opts).await?;
277 let mut conn = pool.acquire().await?;
278 conn.execute("CREATE TABLE CLOSE_RESULTS_TEST ( col DECIMAL(3, 0) );")
279 .await?;
280
281 sqlx::query("INSERT INTO CLOSE_RESULTS_TEST VALUES(?)")
282 .bind(vec![1i8; 10000])
283 .execute(&mut *conn)
284 .await?;
285
286 assert!(conn.ws.pending_close.is_none());
287 let _ = conn
288 .fetch("SELECT * FROM CLOSE_RESULTS_TEST")
289 .try_next()
290 .await?;
291
292 assert!(conn.ws.pending_close.is_some());
293 let _ = conn
294 .fetch("SELECT * FROM CLOSE_RESULTS_TEST")
295 .try_next()
296 .await;
297
298 assert!(conn.ws.pending_close.is_some());
299 let _ = conn
300 .fetch("SELECT * FROM CLOSE_RESULTS_TEST")
301 .try_next()
302 .await;
303
304 assert!(conn.ws.pending_close.is_some());
305 let _ = conn
306 .fetch("SELECT * FROM CLOSE_RESULTS_TEST")
307 .try_next()
308 .await;
309
310 assert!(conn.ws.pending_close.is_some());
311 conn.flush_attributes().await?;
312
313 assert!(conn.ws.pending_close.is_none());
314 Ok(())
315 }
316}