sqlx_exasol_impl/connection/
mod.rs

1#[cfg(feature = "etl")]
2pub mod etl;
3mod executor;
4pub mod stream;
5pub mod websocket;
6
7use std::{
8    fmt::Write,
9    net::{SocketAddr, ToSocketAddrs},
10};
11
12use futures_util::SinkExt;
13use rand::{seq::SliceRandom, thread_rng};
14use sqlx_core::{
15    connection::{Connection, LogSettings},
16    executor::Executor,
17    transaction::Transaction,
18};
19use websocket::{socket::WithExaSocket, ExaWebSocket};
20
21use crate::{
22    connection::websocket::{
23        future::{ClosePrepared, Disconnect, SetAttributes, WebSocketFuture},
24        WithMaybeTlsExaSocket,
25    },
26    database::Exasol,
27    options::ExaConnectOptions,
28    responses::{ExaAttributes, SessionInfo},
29    SqlxError, SqlxResult,
30};
31
32/// A connection to the Exasol database. Implementor of [`Connection`].
33#[derive(Debug)]
34pub struct ExaConnection {
35    pub(crate) ws: ExaWebSocket,
36    pub(crate) log_settings: LogSettings,
37    session_info: SessionInfo,
38}
39
40impl ExaConnection {
41    /// Returns the Exasol server socket address that we're connected to.
42    pub fn server(&self) -> SocketAddr {
43        self.ws.server()
44    }
45
46    /// Returns a reference of the [`ExaAttributes`] used in this connection.
47    pub fn attributes(&self) -> &ExaAttributes {
48        &self.ws.attributes
49    }
50
51    /// Allows setting connection attributes through the driver.
52    ///
53    /// Note that attributes will only reach Exasol after a statement is
54    /// executed or after explicitly calling the `flush_attributes()` method.
55    pub fn attributes_mut(&mut self) -> &mut ExaAttributes {
56        &mut self.ws.attributes
57    }
58
59    /// Flushes the current [`ExaAttributes`] to Exasol.
60    ///
61    /// # Errors
62    ///
63    /// Will return an error if sending the attributes fails.
64    pub async fn flush_attributes(&mut self) -> SqlxResult<()> {
65        SetAttributes::default().future(&mut self.ws).await
66    }
67
68    /// Returns a reference to the [`SessionInfo`] related to this connection.
69    pub fn session_info(&self) -> &SessionInfo {
70        &self.session_info
71    }
72
73    pub(crate) async fn establish(opts: &ExaConnectOptions) -> SqlxResult<Self> {
74        let mut error = SqlxError::Configuration("Could not connect to Exasol".into());
75        let mut resolved = Vec::with_capacity(opts.hosts.len());
76
77        for (host, port) in &opts.hosts {
78            let h = host.clone();
79            let port = *port;
80
81            let sock_addrs =
82                sqlx_core::rt::spawn_blocking(move || (h.as_ref(), port).to_socket_addrs()).await?;
83
84            for sock_addr in sock_addrs {
85                resolved.push((host, sock_addr));
86            }
87        }
88
89        // Shuffle the resolved addresses to randomly connect to nodes.
90        resolved.shuffle(&mut thread_rng());
91
92        // Reusable buffer for writing IP address as strings.
93        let mut ip_buf = String::new();
94
95        // Attempt connecting to each resolved socket address until one succeeds.
96        for (host, sock_addr) in resolved {
97            let (ip, port) = (sock_addr.ip(), sock_addr.port());
98            write!(&mut ip_buf, "{ip}")
99                .map_err(From::from)
100                .map_err(SqlxError::Configuration)?;
101
102            let wrapper = WithExaSocket(sock_addr);
103            let with_socket = WithMaybeTlsExaSocket::new(wrapper, host.as_ref(), opts.into());
104            let socket_res = sqlx_core::net::connect_tcp(&ip_buf, port, with_socket).await;
105
106            // Clear the buffer in case we continue iteration.
107            ip_buf.clear();
108
109            // Continue if the future to connect a socket failed.
110            let (socket, with_tls) = match socket_res {
111                Ok(Ok((socket, with_tls))) => (socket, with_tls),
112                Ok(Err(err)) | Err(err) => {
113                    error = err;
114                    continue;
115                }
116            };
117
118            match ExaWebSocket::new(host.as_ref(), port, socket, opts.try_into()?, with_tls).await {
119                Err(err) => error = err,
120                // Return if we successfully connect a websocket.
121                Ok((ws, session_info)) => {
122                    let mut con = Self {
123                        ws,
124                        log_settings: LogSettings::default(),
125                        session_info,
126                    };
127
128                    con.configure_session().await?;
129                    return Ok(con);
130                }
131            }
132        }
133
134        // All attempts failed, return the last error
135        Err(error)
136    }
137
138    /// Sets session parameters for the open connection.
139    async fn configure_session(&mut self) -> SqlxResult<()> {
140        // We rely on this for consistent size output for HASHTYPE columns.
141        // This allows to reliably use UUID at compile-time.
142        self.execute("ALTER SESSION SET HASHTYPE_FORMAT = 'HEX';")
143            .await?;
144        Ok(())
145    }
146}
147
148impl Connection for ExaConnection {
149    type Database = Exasol;
150
151    type Options = ExaConnectOptions;
152
153    async fn close(mut self) -> SqlxResult<()> {
154        Disconnect::default().future(&mut self.ws).await?;
155        self.ws.close().await?;
156        Ok(())
157    }
158
159    async fn close_hard(mut self) -> SqlxResult<()> {
160        self.ws.close().await
161    }
162
163    async fn ping(&mut self) -> SqlxResult<()> {
164        self.ws.ping().await
165    }
166
167    async fn begin(&mut self) -> SqlxResult<Transaction<'_, Self::Database>>
168    where
169        Self: Sized,
170    {
171        Transaction::begin(self, None).await
172    }
173
174    fn shrink_buffers(&mut self) {}
175
176    async fn flush(&mut self) -> SqlxResult<()> {
177        if let Some(future) = self.ws.pending_close.take() {
178            future.future(&mut self.ws).await?;
179        }
180
181        if let Some(future) = self.ws.pending_rollback.take() {
182            future.future(&mut self.ws).await?;
183        }
184
185        Ok(())
186    }
187
188    fn should_flush(&self) -> bool {
189        self.ws.pending_close.is_some() || self.ws.pending_rollback.is_some()
190    }
191
192    fn cached_statements_size(&self) -> usize
193    where
194        Self::Database: sqlx_core::database::HasStatementCache,
195    {
196        self.ws.statement_cache.len()
197    }
198
199    async fn clear_cached_statements(&mut self) -> SqlxResult<()>
200    where
201        Self::Database: sqlx_core::database::HasStatementCache,
202    {
203        while let Some(prep) = self.ws.statement_cache.remove_lru() {
204            ClosePrepared::new(prep.statement_handle)
205                .future(&mut self.ws)
206                .await?;
207        }
208
209        Ok(())
210    }
211}
212
213#[cfg(test)]
214#[cfg(feature = "migrate")]
215#[allow(clippy::large_futures, reason = "silencing clippy")]
216mod tests {
217    use futures_util::TryStreamExt;
218    use sqlx::Executor;
219    use sqlx_core::{error::BoxDynError, pool::PoolOptions};
220
221    use crate::{ExaConnectOptions, Exasol};
222
223    #[sqlx::test]
224    async fn test_stmt_cache(
225        pool_opts: PoolOptions<Exasol>,
226        mut exa_opts: ExaConnectOptions,
227    ) -> Result<(), BoxDynError> {
228        // Set a low cache size
229        exa_opts.statement_cache_capacity = 1;
230
231        let pool = pool_opts.connect_with(exa_opts).await?;
232        let mut con = pool.acquire().await?;
233
234        let sql1 = "SELECT 1 FROM dual";
235        let sql2 = "SELECT 2 FROM dual";
236
237        assert!(!con.as_mut().ws.statement_cache.contains_key(sql1));
238        assert!(!con.as_mut().ws.statement_cache.contains_key(sql2));
239
240        sqlx::query(sql1).execute(&mut *con).await?;
241        assert!(con.as_mut().ws.statement_cache.contains_key(sql1));
242        assert!(!con.as_mut().ws.statement_cache.contains_key(sql2));
243
244        sqlx::query(sql2).execute(&mut *con).await?;
245        assert!(!con.as_mut().ws.statement_cache.contains_key(sql1));
246        assert!(con.as_mut().ws.statement_cache.contains_key(sql2));
247
248        Ok(())
249    }
250
251    #[sqlx::test]
252    async fn test_schema_none_selected(
253        pool_opts: PoolOptions<Exasol>,
254        mut exa_opts: ExaConnectOptions,
255    ) -> Result<(), BoxDynError> {
256        exa_opts.schema = None;
257
258        let pool = pool_opts.connect_with(exa_opts).await?;
259        let mut con = pool.acquire().await?;
260
261        let schema: Option<String> = sqlx::query_scalar("SELECT CURRENT_SCHEMA")
262            .fetch_one(&mut *con)
263            .await?;
264
265        assert!(schema.is_none());
266
267        Ok(())
268    }
269
270    #[sqlx::test]
271    async fn test_connection_result_set_auto_close(
272        pool_opts: PoolOptions<Exasol>,
273        exa_opts: ExaConnectOptions,
274    ) -> Result<(), BoxDynError> {
275        // Only allow one connection
276        let pool = pool_opts.max_connections(1).connect_with(exa_opts).await?;
277        let mut conn = pool.acquire().await?;
278        conn.execute("CREATE TABLE CLOSE_RESULTS_TEST ( col DECIMAL(3, 0) );")
279            .await?;
280
281        sqlx::query("INSERT INTO CLOSE_RESULTS_TEST VALUES(?)")
282            .bind(vec![1i8; 10000])
283            .execute(&mut *conn)
284            .await?;
285
286        assert!(conn.ws.pending_close.is_none());
287        let _ = conn
288            .fetch("SELECT * FROM CLOSE_RESULTS_TEST")
289            .try_next()
290            .await?;
291
292        assert!(conn.ws.pending_close.is_some());
293        let _ = conn
294            .fetch("SELECT * FROM CLOSE_RESULTS_TEST")
295            .try_next()
296            .await;
297
298        assert!(conn.ws.pending_close.is_some());
299        let _ = conn
300            .fetch("SELECT * FROM CLOSE_RESULTS_TEST")
301            .try_next()
302            .await;
303
304        assert!(conn.ws.pending_close.is_some());
305        let _ = conn
306            .fetch("SELECT * FROM CLOSE_RESULTS_TEST")
307            .try_next()
308            .await;
309
310        assert!(conn.ws.pending_close.is_some());
311        conn.flush_attributes().await?;
312
313        assert!(conn.ws.pending_close.is_none());
314        Ok(())
315    }
316}