sqlx_exasol/connection/
mod.rs

1#[cfg(feature = "etl")]
2pub mod etl;
3mod executor;
4mod stream;
5pub mod websocket;
6
7use std::net::SocketAddr;
8
9use futures_core::future::BoxFuture;
10use futures_util::{FutureExt, SinkExt};
11use rand::{seq::SliceRandom, thread_rng};
12use sqlx_core::{
13    connection::{Connection, LogSettings},
14    transaction::Transaction,
15};
16use websocket::{socket::WithExaSocket, ExaWebSocket};
17
18use crate::{
19    connection::websocket::{
20        future::{ClosePrepared, Disconnect, SetAttributes, WebSocketFuture},
21        WithMaybeTlsExaSocket,
22    },
23    database::Exasol,
24    options::ExaConnectOptions,
25    responses::{ExaAttributes, SessionInfo},
26    SqlxError, SqlxResult,
27};
28
29/// A connection to the Exasol database. Implementor of [`Connection`].
30#[derive(Debug)]
31pub struct ExaConnection {
32    pub(crate) ws: ExaWebSocket,
33    session_info: SessionInfo,
34    log_settings: LogSettings,
35}
36
37impl ExaConnection {
38    /// Returns the Exasol server socket address that we're connected to.
39    pub fn socket_addr(&self) -> SocketAddr {
40        self.ws.socket_addr()
41    }
42
43    /// Returns a reference of the [`ExaAttributes`] used in this connection.
44    pub fn attributes(&self) -> &ExaAttributes {
45        &self.ws.attributes
46    }
47
48    /// Allows setting connection attributes through the driver.
49    ///
50    /// Note that attributes will only reach Exasol after a statement is
51    /// executed or after explicitly calling the `flush_attributes()` method.
52    pub fn attributes_mut(&mut self) -> &mut ExaAttributes {
53        &mut self.ws.attributes
54    }
55
56    /// Flushes the current [`ExaAttributes`] to Exasol.
57    ///
58    /// # Errors
59    ///
60    /// Will return an error if sending the attributes fails.
61    pub async fn flush_attributes(&mut self) -> SqlxResult<()> {
62        SetAttributes::default().future(&mut self.ws).await
63    }
64
65    /// Returns a reference to the [`SessionInfo`] related to this connection.
66    pub fn session_info(&self) -> &SessionInfo {
67        &self.session_info
68    }
69
70    pub(crate) async fn establish(opts: &ExaConnectOptions) -> SqlxResult<Self> {
71        let mut ws_result = Err(SqlxError::Configuration("No hosts found".into()));
72
73        // We want to try and randomly connect to nodes, if multiple were provided.
74        // But since the RNG is not Send and cloning the hosts would be more expensive,
75        // we create a vector of the indices and shuffle these instead.
76        let mut indices = (0..opts.hosts_details.len()).collect::<Vec<_>>();
77        indices.shuffle(&mut thread_rng());
78
79        // For each host parsed from the connection string
80        for idx in indices {
81            // We know the index is valid since it's between 0..hosts.len()
82            let (host, addrs) = opts
83                .hosts_details
84                .get(idx)
85                .expect("hosts list index must be valid");
86
87            // For each socket address resolved from the host
88            for socket_addr in addrs {
89                let wrapper = WithExaSocket(*socket_addr);
90                let with_socket = WithMaybeTlsExaSocket::new(wrapper, host, opts.into());
91                let socket_res = sqlx_core::net::connect_tcp(host, opts.port, with_socket).await;
92
93                // Continue if the future to connect a socket failed
94                let (socket, with_tls) = match socket_res {
95                    Ok(Ok((socket, with_tls))) => (socket, with_tls),
96                    Ok(Err(err)) | Err(err) => {
97                        ws_result = Err(err);
98                        continue;
99                    }
100                };
101
102                // Break if we successfully connect a websocket.
103                match ExaWebSocket::new(host, opts.port, socket, opts.into(), with_tls).await {
104                    Ok(ws) => {
105                        ws_result = Ok(ws);
106                        break;
107                    }
108                    Err(err) => ws_result = Err(err),
109                }
110            }
111        }
112
113        let (ws, session_info) = ws_result?;
114        let con = Self {
115            ws,
116            log_settings: LogSettings::default(),
117            session_info,
118        };
119
120        Ok(con)
121    }
122}
123
124impl Connection for ExaConnection {
125    type Database = Exasol;
126
127    type Options = ExaConnectOptions;
128
129    fn close(mut self) -> BoxFuture<'static, SqlxResult<()>> {
130        Box::pin(async move {
131            Disconnect::default().future(&mut self.ws).await?;
132            self.ws.close().await?;
133            Ok(())
134        })
135    }
136
137    fn close_hard(mut self) -> BoxFuture<'static, SqlxResult<()>> {
138        Box::pin(async move { self.ws.close().await })
139    }
140
141    fn ping(&mut self) -> BoxFuture<'_, SqlxResult<()>> {
142        self.ws.ping().boxed()
143    }
144
145    fn begin(&mut self) -> BoxFuture<'_, SqlxResult<Transaction<'_, Self::Database>>>
146    where
147        Self: Sized,
148    {
149        Transaction::begin(self, None)
150    }
151
152    fn shrink_buffers(&mut self) {}
153
154    fn flush(&mut self) -> BoxFuture<'_, SqlxResult<()>> {
155        Box::pin(async {
156            if let Some(future) = self.ws.pending_close.take() {
157                future.future(&mut self.ws).await?;
158            }
159
160            if let Some(future) = self.ws.pending_rollback.take() {
161                future.future(&mut self.ws).await?;
162            }
163
164            Ok(())
165        })
166    }
167
168    fn should_flush(&self) -> bool {
169        self.ws.pending_close.is_some() || self.ws.pending_rollback.is_some()
170    }
171
172    fn cached_statements_size(&self) -> usize
173    where
174        Self::Database: sqlx_core::database::HasStatementCache,
175    {
176        self.ws.statement_cache.len()
177    }
178
179    fn clear_cached_statements(&mut self) -> BoxFuture<'_, SqlxResult<()>>
180    where
181        Self::Database: sqlx_core::database::HasStatementCache,
182    {
183        Box::pin(async {
184            while let Some((_, prep)) = self.ws.statement_cache.pop_lru() {
185                ClosePrepared::new(prep.statement_handle)
186                    .future(&mut self.ws)
187                    .await?;
188            }
189
190            Ok(())
191        })
192    }
193}
194
195#[cfg(test)]
196#[cfg(feature = "migrate")]
197mod tests {
198    use std::num::NonZeroUsize;
199
200    use futures_util::TryStreamExt;
201    use sqlx::{query, Connection, Executor};
202    use sqlx_core::{error::BoxDynError, pool::PoolOptions};
203
204    use crate::{ExaConnectOptions, ExaQueryResult, Exasol};
205
206    #[cfg(feature = "compression")]
207    #[ignore]
208    #[sqlx::test]
209    async fn test_compression_feature(
210        pool_opts: PoolOptions<Exasol>,
211        mut exa_opts: ExaConnectOptions,
212    ) -> Result<(), BoxDynError> {
213        exa_opts.compression = true;
214
215        let pool = pool_opts.connect_with(exa_opts).await?;
216        let mut con = pool.acquire().await?;
217        let schema = "TEST_SWITCH_SCHEMA";
218
219        con.execute(format!("CREATE SCHEMA IF NOT EXISTS {schema};").as_str())
220            .await?;
221
222        let new_schema: String = sqlx::query_scalar("SELECT CURRENT_SCHEMA")
223            .fetch_one(&mut *con)
224            .await?;
225
226        con.execute(format!("DROP SCHEMA IF EXISTS {schema} CASCADE;").as_str())
227            .await?;
228
229        assert_eq!(schema, new_schema);
230
231        Ok(())
232    }
233
234    #[cfg(not(feature = "compression"))]
235    #[sqlx::test]
236    async fn test_compression_no_feature(
237        pool_opts: PoolOptions<Exasol>,
238        mut exa_opts: ExaConnectOptions,
239    ) {
240        exa_opts.compression = true;
241        assert!(pool_opts.connect_with(exa_opts).await.is_err());
242    }
243
244    #[sqlx::test]
245    async fn test_stmt_cache(
246        pool_opts: PoolOptions<Exasol>,
247        mut exa_opts: ExaConnectOptions,
248    ) -> Result<(), BoxDynError> {
249        // Set a low cache size
250        exa_opts.statement_cache_capacity = NonZeroUsize::new(1).unwrap();
251
252        let pool = pool_opts.connect_with(exa_opts).await?;
253        let mut con = pool.acquire().await?;
254
255        let sql1 = "SELECT 1 FROM dual";
256        let sql2 = "SELECT 2 FROM dual";
257
258        assert!(!con.as_ref().ws.statement_cache.contains(sql1));
259        assert!(!con.as_ref().ws.statement_cache.contains(sql2));
260
261        query(sql1).execute(&mut *con).await?;
262        assert!(con.as_ref().ws.statement_cache.contains(sql1));
263        assert!(!con.as_ref().ws.statement_cache.contains(sql2));
264
265        query(sql2).execute(&mut *con).await?;
266        assert!(!con.as_ref().ws.statement_cache.contains(sql1));
267        assert!(con.as_ref().ws.statement_cache.contains(sql2));
268
269        Ok(())
270    }
271
272    #[sqlx::test]
273    async fn test_schema_none_selected(
274        pool_opts: PoolOptions<Exasol>,
275        mut exa_opts: ExaConnectOptions,
276    ) -> Result<(), BoxDynError> {
277        exa_opts.schema = None;
278        let pool = pool_opts.connect_with(exa_opts).await?;
279        let mut con = pool.acquire().await?;
280
281        let schema: Option<String> = sqlx::query_scalar("SELECT CURRENT_SCHEMA")
282            .fetch_one(&mut *con)
283            .await?;
284
285        assert!(schema.is_none());
286
287        Ok(())
288    }
289
290    #[sqlx::test]
291    async fn test_schema_selected(
292        pool_opts: PoolOptions<Exasol>,
293        exa_opts: ExaConnectOptions,
294    ) -> Result<(), BoxDynError> {
295        let pool = pool_opts.connect_with(exa_opts).await?;
296        let mut con = pool.acquire().await?;
297
298        let schema: Option<String> = sqlx::query_scalar("SELECT CURRENT_SCHEMA")
299            .fetch_one(&mut *con)
300            .await?;
301
302        assert!(schema.is_some());
303
304        Ok(())
305    }
306
307    #[sqlx::test]
308    async fn test_schema_switch(
309        pool_opts: PoolOptions<Exasol>,
310        exa_opts: ExaConnectOptions,
311    ) -> Result<(), BoxDynError> {
312        let pool = pool_opts.connect_with(exa_opts).await?;
313        let mut con = pool.acquire().await?;
314        let schema = "TEST_SWITCH_SCHEMA";
315
316        con.execute(format!("CREATE SCHEMA IF NOT EXISTS {schema};").as_str())
317            .await?;
318
319        let new_schema: String = sqlx::query_scalar("SELECT CURRENT_SCHEMA")
320            .fetch_one(&mut *con)
321            .await?;
322
323        con.execute(format!("DROP SCHEMA IF EXISTS {schema} CASCADE;").as_str())
324            .await?;
325
326        assert_eq!(schema, new_schema);
327
328        Ok(())
329    }
330
331    #[sqlx::test]
332    async fn test_schema_switch_from_attr(
333        pool_opts: PoolOptions<Exasol>,
334        exa_opts: ExaConnectOptions,
335    ) -> Result<(), BoxDynError> {
336        let pool = pool_opts.connect_with(exa_opts).await?;
337        let mut con = pool.acquire().await?;
338
339        let orig_schema: String = sqlx::query_scalar("SELECT CURRENT_SCHEMA")
340            .fetch_one(&mut *con)
341            .await?;
342
343        let schema = "TEST_SWITCH_SCHEMA";
344
345        con.execute(format!("CREATE SCHEMA IF NOT EXISTS {schema};").as_str())
346            .await?;
347
348        con.attributes_mut().set_current_schema(orig_schema.clone());
349        con.flush_attributes().await?;
350
351        let new_schema: String = sqlx::query_scalar("SELECT CURRENT_SCHEMA")
352            .fetch_one(&mut *con)
353            .await?;
354
355        assert_eq!(orig_schema, new_schema);
356
357        Ok(())
358    }
359
360    #[sqlx::test]
361    async fn test_schema_close_and_empty_attr(
362        pool_opts: PoolOptions<Exasol>,
363        exa_opts: ExaConnectOptions,
364    ) -> Result<(), BoxDynError> {
365        let pool = pool_opts.connect_with(exa_opts).await?;
366        let mut con = pool.acquire().await?;
367
368        let orig_schema: String = sqlx::query_scalar("SELECT CURRENT_SCHEMA")
369            .fetch_one(&mut *con)
370            .await?;
371
372        assert_eq!(
373            con.attributes().current_schema(),
374            Some(orig_schema.as_str())
375        );
376
377        con.execute("CLOSE SCHEMA").await?;
378        assert_eq!(con.attributes().current_schema(), None);
379
380        Ok(())
381    }
382
383    #[sqlx::test]
384    async fn test_comment_stmts(
385        pool_opts: PoolOptions<Exasol>,
386        exa_opts: ExaConnectOptions,
387    ) -> Result<(), BoxDynError> {
388        let pool = pool_opts.connect_with(exa_opts).await?;
389        let mut con = pool.acquire().await?;
390
391        con.execute_many("/* this is a comment */")
392            .try_collect::<ExaQueryResult>()
393            .await?;
394        con.execute("-- this is a comment").await?;
395
396        Ok(())
397    }
398
399    #[sqlx::test]
400    async fn test_connection_flush_on_drop(
401        pool_opts: PoolOptions<Exasol>,
402        exa_opts: ExaConnectOptions,
403    ) -> Result<(), BoxDynError> {
404        // Only allow one connection
405        let pool = pool_opts.max_connections(1).connect_with(exa_opts).await?;
406        pool.execute("CREATE TABLE TRANSACTIONS_TEST ( col DECIMAL(1, 0) );")
407            .await?;
408
409        {
410            let mut conn = pool.acquire().await?;
411            let mut tx = conn.begin().await?;
412            tx.execute("INSERT INTO TRANSACTIONS_TEST VALUES(1)")
413                .await?;
414        }
415
416        let mut conn = pool.acquire().await?;
417        {
418            let mut tx = conn.begin().await?;
419            tx.execute("INSERT INTO TRANSACTIONS_TEST VALUES(1)")
420                .await?;
421        }
422
423        {
424            let mut tx = conn.begin().await?;
425            tx.execute("INSERT INTO TRANSACTIONS_TEST VALUES(1)")
426                .await?;
427        }
428
429        drop(conn);
430
431        let inserted = pool
432            .fetch_all("SELECT * FROM TRANSACTIONS_TEST")
433            .await?
434            .len();
435
436        assert_eq!(inserted, 0);
437        Ok(())
438    }
439
440    #[sqlx::test]
441    async fn test_connection_result_set_auto_close(
442        pool_opts: PoolOptions<Exasol>,
443        exa_opts: ExaConnectOptions,
444    ) -> Result<(), BoxDynError> {
445        // Only allow one connection
446        let pool = pool_opts.max_connections(1).connect_with(exa_opts).await?;
447        let mut conn = pool.acquire().await?;
448        conn.execute("CREATE TABLE CLOSE_RESULTS_TEST ( col DECIMAL(1, 0) );")
449            .await?;
450
451        query("INSERT INTO CLOSE_RESULTS_TEST VALUES(?)")
452            .bind([1; 10000])
453            .execute(&mut *conn)
454            .await?;
455
456        assert!(conn.ws.pending_close.is_none());
457        let _ = conn
458            .fetch("SELECT * FROM CLOSE_RESULTS_TEST")
459            .try_next()
460            .await?;
461
462        assert!(conn.ws.pending_close.is_some());
463        let _ = conn
464            .fetch("SELECT * FROM CLOSE_RESULTS_TEST")
465            .try_next()
466            .await;
467
468        assert!(conn.ws.pending_close.is_some());
469        let _ = conn
470            .fetch("SELECT * FROM CLOSE_RESULTS_TEST")
471            .try_next()
472            .await;
473
474        assert!(conn.ws.pending_close.is_some());
475        let _ = conn
476            .fetch("SELECT * FROM CLOSE_RESULTS_TEST")
477            .try_next()
478            .await;
479
480        assert!(conn.ws.pending_close.is_some());
481        conn.flush_attributes().await?;
482
483        assert!(conn.ws.pending_close.is_none());
484        Ok(())
485    }
486}