1#[cfg(feature = "etl")]
2pub mod etl;
3mod executor;
4mod stream;
5pub mod websocket;
6
7use std::net::SocketAddr;
8
9use futures_core::future::BoxFuture;
10use futures_util::{FutureExt, SinkExt};
11use rand::{seq::SliceRandom, thread_rng};
12use sqlx_core::{
13 connection::{Connection, LogSettings},
14 transaction::Transaction,
15};
16use websocket::{socket::WithExaSocket, ExaWebSocket};
17
18use crate::{
19 connection::websocket::{
20 future::{ClosePrepared, Disconnect, SetAttributes, WebSocketFuture},
21 WithMaybeTlsExaSocket,
22 },
23 database::Exasol,
24 options::ExaConnectOptions,
25 responses::{ExaAttributes, SessionInfo},
26 SqlxError, SqlxResult,
27};
28
29#[derive(Debug)]
31pub struct ExaConnection {
32 pub(crate) ws: ExaWebSocket,
33 session_info: SessionInfo,
34 log_settings: LogSettings,
35}
36
37impl ExaConnection {
38 pub fn socket_addr(&self) -> SocketAddr {
40 self.ws.socket_addr()
41 }
42
43 pub fn attributes(&self) -> &ExaAttributes {
45 &self.ws.attributes
46 }
47
48 pub fn attributes_mut(&mut self) -> &mut ExaAttributes {
53 &mut self.ws.attributes
54 }
55
56 pub async fn flush_attributes(&mut self) -> SqlxResult<()> {
62 SetAttributes::default().future(&mut self.ws).await
63 }
64
65 pub fn session_info(&self) -> &SessionInfo {
67 &self.session_info
68 }
69
70 pub(crate) async fn establish(opts: &ExaConnectOptions) -> SqlxResult<Self> {
71 let mut ws_result = Err(SqlxError::Configuration("No hosts found".into()));
72
73 let mut indices = (0..opts.hosts_details.len()).collect::<Vec<_>>();
77 indices.shuffle(&mut thread_rng());
78
79 for idx in indices {
81 let (host, addrs) = opts
83 .hosts_details
84 .get(idx)
85 .expect("hosts list index must be valid");
86
87 for socket_addr in addrs {
89 let wrapper = WithExaSocket(*socket_addr);
90 let with_socket = WithMaybeTlsExaSocket::new(wrapper, host, opts.into());
91 let socket_res = sqlx_core::net::connect_tcp(host, opts.port, with_socket).await;
92
93 let (socket, with_tls) = match socket_res {
95 Ok(Ok((socket, with_tls))) => (socket, with_tls),
96 Ok(Err(err)) | Err(err) => {
97 ws_result = Err(err);
98 continue;
99 }
100 };
101
102 match ExaWebSocket::new(host, opts.port, socket, opts.into(), with_tls).await {
104 Ok(ws) => {
105 ws_result = Ok(ws);
106 break;
107 }
108 Err(err) => ws_result = Err(err),
109 }
110 }
111 }
112
113 let (ws, session_info) = ws_result?;
114 let con = Self {
115 ws,
116 log_settings: LogSettings::default(),
117 session_info,
118 };
119
120 Ok(con)
121 }
122}
123
124impl Connection for ExaConnection {
125 type Database = Exasol;
126
127 type Options = ExaConnectOptions;
128
129 fn close(mut self) -> BoxFuture<'static, SqlxResult<()>> {
130 Box::pin(async move {
131 Disconnect::default().future(&mut self.ws).await?;
132 self.ws.close().await?;
133 Ok(())
134 })
135 }
136
137 fn close_hard(mut self) -> BoxFuture<'static, SqlxResult<()>> {
138 Box::pin(async move { self.ws.close().await })
139 }
140
141 fn ping(&mut self) -> BoxFuture<'_, SqlxResult<()>> {
142 self.ws.ping().boxed()
143 }
144
145 fn begin(&mut self) -> BoxFuture<'_, SqlxResult<Transaction<'_, Self::Database>>>
146 where
147 Self: Sized,
148 {
149 Transaction::begin(self, None)
150 }
151
152 fn shrink_buffers(&mut self) {}
153
154 fn flush(&mut self) -> BoxFuture<'_, SqlxResult<()>> {
155 Box::pin(async {
156 if let Some(future) = self.ws.pending_close.take() {
157 future.future(&mut self.ws).await?;
158 }
159
160 if let Some(future) = self.ws.pending_rollback.take() {
161 future.future(&mut self.ws).await?;
162 }
163
164 Ok(())
165 })
166 }
167
168 fn should_flush(&self) -> bool {
169 self.ws.pending_close.is_some() || self.ws.pending_rollback.is_some()
170 }
171
172 fn cached_statements_size(&self) -> usize
173 where
174 Self::Database: sqlx_core::database::HasStatementCache,
175 {
176 self.ws.statement_cache.len()
177 }
178
179 fn clear_cached_statements(&mut self) -> BoxFuture<'_, SqlxResult<()>>
180 where
181 Self::Database: sqlx_core::database::HasStatementCache,
182 {
183 Box::pin(async {
184 while let Some((_, prep)) = self.ws.statement_cache.pop_lru() {
185 ClosePrepared::new(prep.statement_handle)
186 .future(&mut self.ws)
187 .await?;
188 }
189
190 Ok(())
191 })
192 }
193}
194
195#[cfg(test)]
196#[cfg(feature = "migrate")]
197mod tests {
198 use std::num::NonZeroUsize;
199
200 use futures_util::TryStreamExt;
201 use sqlx::{query, Connection, Executor};
202 use sqlx_core::{error::BoxDynError, pool::PoolOptions};
203
204 use crate::{ExaConnectOptions, ExaQueryResult, Exasol};
205
206 #[cfg(feature = "compression")]
207 #[ignore]
208 #[sqlx::test]
209 async fn test_compression_feature(
210 pool_opts: PoolOptions<Exasol>,
211 mut exa_opts: ExaConnectOptions,
212 ) -> Result<(), BoxDynError> {
213 exa_opts.compression = true;
214
215 let pool = pool_opts.connect_with(exa_opts).await?;
216 let mut con = pool.acquire().await?;
217 let schema = "TEST_SWITCH_SCHEMA";
218
219 con.execute(format!("CREATE SCHEMA IF NOT EXISTS {schema};").as_str())
220 .await?;
221
222 let new_schema: String = sqlx::query_scalar("SELECT CURRENT_SCHEMA")
223 .fetch_one(&mut *con)
224 .await?;
225
226 con.execute(format!("DROP SCHEMA IF EXISTS {schema} CASCADE;").as_str())
227 .await?;
228
229 assert_eq!(schema, new_schema);
230
231 Ok(())
232 }
233
234 #[cfg(not(feature = "compression"))]
235 #[sqlx::test]
236 async fn test_compression_no_feature(
237 pool_opts: PoolOptions<Exasol>,
238 mut exa_opts: ExaConnectOptions,
239 ) {
240 exa_opts.compression = true;
241 assert!(pool_opts.connect_with(exa_opts).await.is_err());
242 }
243
244 #[sqlx::test]
245 async fn test_stmt_cache(
246 pool_opts: PoolOptions<Exasol>,
247 mut exa_opts: ExaConnectOptions,
248 ) -> Result<(), BoxDynError> {
249 exa_opts.statement_cache_capacity = NonZeroUsize::new(1).unwrap();
251
252 let pool = pool_opts.connect_with(exa_opts).await?;
253 let mut con = pool.acquire().await?;
254
255 let sql1 = "SELECT 1 FROM dual";
256 let sql2 = "SELECT 2 FROM dual";
257
258 assert!(!con.as_ref().ws.statement_cache.contains(sql1));
259 assert!(!con.as_ref().ws.statement_cache.contains(sql2));
260
261 query(sql1).execute(&mut *con).await?;
262 assert!(con.as_ref().ws.statement_cache.contains(sql1));
263 assert!(!con.as_ref().ws.statement_cache.contains(sql2));
264
265 query(sql2).execute(&mut *con).await?;
266 assert!(!con.as_ref().ws.statement_cache.contains(sql1));
267 assert!(con.as_ref().ws.statement_cache.contains(sql2));
268
269 Ok(())
270 }
271
272 #[sqlx::test]
273 async fn test_schema_none_selected(
274 pool_opts: PoolOptions<Exasol>,
275 mut exa_opts: ExaConnectOptions,
276 ) -> Result<(), BoxDynError> {
277 exa_opts.schema = None;
278 let pool = pool_opts.connect_with(exa_opts).await?;
279 let mut con = pool.acquire().await?;
280
281 let schema: Option<String> = sqlx::query_scalar("SELECT CURRENT_SCHEMA")
282 .fetch_one(&mut *con)
283 .await?;
284
285 assert!(schema.is_none());
286
287 Ok(())
288 }
289
290 #[sqlx::test]
291 async fn test_schema_selected(
292 pool_opts: PoolOptions<Exasol>,
293 exa_opts: ExaConnectOptions,
294 ) -> Result<(), BoxDynError> {
295 let pool = pool_opts.connect_with(exa_opts).await?;
296 let mut con = pool.acquire().await?;
297
298 let schema: Option<String> = sqlx::query_scalar("SELECT CURRENT_SCHEMA")
299 .fetch_one(&mut *con)
300 .await?;
301
302 assert!(schema.is_some());
303
304 Ok(())
305 }
306
307 #[sqlx::test]
308 async fn test_schema_switch(
309 pool_opts: PoolOptions<Exasol>,
310 exa_opts: ExaConnectOptions,
311 ) -> Result<(), BoxDynError> {
312 let pool = pool_opts.connect_with(exa_opts).await?;
313 let mut con = pool.acquire().await?;
314 let schema = "TEST_SWITCH_SCHEMA";
315
316 con.execute(format!("CREATE SCHEMA IF NOT EXISTS {schema};").as_str())
317 .await?;
318
319 let new_schema: String = sqlx::query_scalar("SELECT CURRENT_SCHEMA")
320 .fetch_one(&mut *con)
321 .await?;
322
323 con.execute(format!("DROP SCHEMA IF EXISTS {schema} CASCADE;").as_str())
324 .await?;
325
326 assert_eq!(schema, new_schema);
327
328 Ok(())
329 }
330
331 #[sqlx::test]
332 async fn test_schema_switch_from_attr(
333 pool_opts: PoolOptions<Exasol>,
334 exa_opts: ExaConnectOptions,
335 ) -> Result<(), BoxDynError> {
336 let pool = pool_opts.connect_with(exa_opts).await?;
337 let mut con = pool.acquire().await?;
338
339 let orig_schema: String = sqlx::query_scalar("SELECT CURRENT_SCHEMA")
340 .fetch_one(&mut *con)
341 .await?;
342
343 let schema = "TEST_SWITCH_SCHEMA";
344
345 con.execute(format!("CREATE SCHEMA IF NOT EXISTS {schema};").as_str())
346 .await?;
347
348 con.attributes_mut().set_current_schema(orig_schema.clone());
349 con.flush_attributes().await?;
350
351 let new_schema: String = sqlx::query_scalar("SELECT CURRENT_SCHEMA")
352 .fetch_one(&mut *con)
353 .await?;
354
355 assert_eq!(orig_schema, new_schema);
356
357 Ok(())
358 }
359
360 #[sqlx::test]
361 async fn test_schema_close_and_empty_attr(
362 pool_opts: PoolOptions<Exasol>,
363 exa_opts: ExaConnectOptions,
364 ) -> Result<(), BoxDynError> {
365 let pool = pool_opts.connect_with(exa_opts).await?;
366 let mut con = pool.acquire().await?;
367
368 let orig_schema: String = sqlx::query_scalar("SELECT CURRENT_SCHEMA")
369 .fetch_one(&mut *con)
370 .await?;
371
372 assert_eq!(
373 con.attributes().current_schema(),
374 Some(orig_schema.as_str())
375 );
376
377 con.execute("CLOSE SCHEMA").await?;
378 assert_eq!(con.attributes().current_schema(), None);
379
380 Ok(())
381 }
382
383 #[sqlx::test]
384 async fn test_comment_stmts(
385 pool_opts: PoolOptions<Exasol>,
386 exa_opts: ExaConnectOptions,
387 ) -> Result<(), BoxDynError> {
388 let pool = pool_opts.connect_with(exa_opts).await?;
389 let mut con = pool.acquire().await?;
390
391 con.execute_many("/* this is a comment */")
392 .try_collect::<ExaQueryResult>()
393 .await?;
394 con.execute("-- this is a comment").await?;
395
396 Ok(())
397 }
398
399 #[sqlx::test]
400 async fn test_connection_flush_on_drop(
401 pool_opts: PoolOptions<Exasol>,
402 exa_opts: ExaConnectOptions,
403 ) -> Result<(), BoxDynError> {
404 let pool = pool_opts.max_connections(1).connect_with(exa_opts).await?;
406 pool.execute("CREATE TABLE TRANSACTIONS_TEST ( col DECIMAL(1, 0) );")
407 .await?;
408
409 {
410 let mut conn = pool.acquire().await?;
411 let mut tx = conn.begin().await?;
412 tx.execute("INSERT INTO TRANSACTIONS_TEST VALUES(1)")
413 .await?;
414 }
415
416 let mut conn = pool.acquire().await?;
417 {
418 let mut tx = conn.begin().await?;
419 tx.execute("INSERT INTO TRANSACTIONS_TEST VALUES(1)")
420 .await?;
421 }
422
423 {
424 let mut tx = conn.begin().await?;
425 tx.execute("INSERT INTO TRANSACTIONS_TEST VALUES(1)")
426 .await?;
427 }
428
429 drop(conn);
430
431 let inserted = pool
432 .fetch_all("SELECT * FROM TRANSACTIONS_TEST")
433 .await?
434 .len();
435
436 assert_eq!(inserted, 0);
437 Ok(())
438 }
439
440 #[sqlx::test]
441 async fn test_connection_result_set_auto_close(
442 pool_opts: PoolOptions<Exasol>,
443 exa_opts: ExaConnectOptions,
444 ) -> Result<(), BoxDynError> {
445 let pool = pool_opts.max_connections(1).connect_with(exa_opts).await?;
447 let mut conn = pool.acquire().await?;
448 conn.execute("CREATE TABLE CLOSE_RESULTS_TEST ( col DECIMAL(1, 0) );")
449 .await?;
450
451 query("INSERT INTO CLOSE_RESULTS_TEST VALUES(?)")
452 .bind([1; 10000])
453 .execute(&mut *conn)
454 .await?;
455
456 assert!(conn.ws.pending_close.is_none());
457 let _ = conn
458 .fetch("SELECT * FROM CLOSE_RESULTS_TEST")
459 .try_next()
460 .await?;
461
462 assert!(conn.ws.pending_close.is_some());
463 let _ = conn
464 .fetch("SELECT * FROM CLOSE_RESULTS_TEST")
465 .try_next()
466 .await;
467
468 assert!(conn.ws.pending_close.is_some());
469 let _ = conn
470 .fetch("SELECT * FROM CLOSE_RESULTS_TEST")
471 .try_next()
472 .await;
473
474 assert!(conn.ws.pending_close.is_some());
475 let _ = conn
476 .fetch("SELECT * FROM CLOSE_RESULTS_TEST")
477 .try_next()
478 .await;
479
480 assert!(conn.ws.pending_close.is_some());
481 conn.flush_attributes().await?;
482
483 assert!(conn.ws.pending_close.is_none());
484 Ok(())
485 }
486}