#[cfg(feature = "etl")]
pub mod etl;
mod executor;
pub mod stream;
pub mod websocket;
use std::{
fmt::Write,
net::{SocketAddr, ToSocketAddrs},
};
use futures_util::SinkExt;
use rand::{seq::SliceRandom, thread_rng};
use sqlx_core::{
connection::{Connection, LogSettings},
executor::Executor,
transaction::Transaction,
};
use websocket::{socket::WithExaSocket, ExaWebSocket};
use crate::{
connection::websocket::{
future::{ClosePrepared, Disconnect, SetAttributes, WebSocketFuture},
WithMaybeTlsExaSocket,
},
database::Exasol,
options::ExaConnectOptions,
responses::{ExaAttributes, SessionInfo},
SqlxError, SqlxResult,
};
#[derive(Debug)]
pub struct ExaConnection {
pub(crate) ws: ExaWebSocket,
pub(crate) log_settings: LogSettings,
session_info: SessionInfo,
}
impl ExaConnection {
pub fn server(&self) -> SocketAddr {
self.ws.server()
}
pub fn attributes(&self) -> &ExaAttributes {
&self.ws.attributes
}
pub fn attributes_mut(&mut self) -> &mut ExaAttributes {
&mut self.ws.attributes
}
pub async fn flush_attributes(&mut self) -> SqlxResult<()> {
SetAttributes::default().future(&mut self.ws).await
}
pub fn session_info(&self) -> &SessionInfo {
&self.session_info
}
pub(crate) async fn establish(opts: &ExaConnectOptions) -> SqlxResult<Self> {
let mut error = SqlxError::Configuration("Could not connect to Exasol".into());
let mut resolved = Vec::with_capacity(opts.hosts.len());
for (host, port) in &opts.hosts {
let h = host.clone();
let port = *port;
let sock_addrs =
sqlx_core::rt::spawn_blocking(move || (h.as_ref(), port).to_socket_addrs()).await?;
for sock_addr in sock_addrs {
resolved.push((host, sock_addr));
}
}
resolved.shuffle(&mut thread_rng());
let mut ip_buf = String::new();
for (host, sock_addr) in resolved {
let (ip, port) = (sock_addr.ip(), sock_addr.port());
write!(&mut ip_buf, "{ip}")
.map_err(From::from)
.map_err(SqlxError::Configuration)?;
let wrapper = WithExaSocket(sock_addr);
let with_socket = WithMaybeTlsExaSocket::new(wrapper, host.as_ref(), opts.into());
let socket_res = sqlx_core::net::connect_tcp(&ip_buf, port, with_socket).await;
ip_buf.clear();
let (socket, with_tls) = match socket_res {
Ok(Ok((socket, with_tls))) => (socket, with_tls),
Ok(Err(err)) | Err(err) => {
error = err;
continue;
}
};
match ExaWebSocket::new(host.as_ref(), port, socket, opts.try_into()?, with_tls).await {
Err(err) => error = err,
Ok((ws, session_info)) => {
let mut con = Self {
ws,
log_settings: LogSettings::default(),
session_info,
};
con.configure_session().await?;
return Ok(con);
}
}
}
Err(error)
}
async fn configure_session(&mut self) -> SqlxResult<()> {
self.execute("ALTER SESSION SET HASHTYPE_FORMAT = 'HEX';")
.await?;
Ok(())
}
}
impl Connection for ExaConnection {
type Database = Exasol;
type Options = ExaConnectOptions;
async fn close(mut self) -> SqlxResult<()> {
Disconnect::default().future(&mut self.ws).await?;
self.ws.close().await?;
Ok(())
}
async fn close_hard(mut self) -> SqlxResult<()> {
self.ws.close().await
}
async fn ping(&mut self) -> SqlxResult<()> {
self.ws.ping().await
}
async fn begin(&mut self) -> SqlxResult<Transaction<'_, Self::Database>>
where
Self: Sized,
{
Transaction::begin(self, None).await
}
fn shrink_buffers(&mut self) {}
async fn flush(&mut self) -> SqlxResult<()> {
if let Some(future) = self.ws.pending_close.take() {
future.future(&mut self.ws).await?;
}
if let Some(future) = self.ws.pending_rollback.take() {
future.future(&mut self.ws).await?;
}
Ok(())
}
fn should_flush(&self) -> bool {
self.ws.pending_close.is_some() || self.ws.pending_rollback.is_some()
}
fn cached_statements_size(&self) -> usize
where
Self::Database: sqlx_core::database::HasStatementCache,
{
self.ws.statement_cache.len()
}
async fn clear_cached_statements(&mut self) -> SqlxResult<()>
where
Self::Database: sqlx_core::database::HasStatementCache,
{
while let Some(prep) = self.ws.statement_cache.remove_lru() {
ClosePrepared::new(prep.statement_handle)
.future(&mut self.ws)
.await?;
}
Ok(())
}
}
#[cfg(test)]
#[cfg(feature = "migrate")]
#[allow(clippy::large_futures, reason = "silencing clippy")]
mod tests {
use futures_util::TryStreamExt;
use sqlx::Executor;
use sqlx_core::{error::BoxDynError, pool::PoolOptions};
use crate::{ExaConnectOptions, Exasol};
#[sqlx::test]
async fn test_stmt_cache(
pool_opts: PoolOptions<Exasol>,
mut exa_opts: ExaConnectOptions,
) -> Result<(), BoxDynError> {
exa_opts.statement_cache_capacity = 1;
let pool = pool_opts.connect_with(exa_opts).await?;
let mut con = pool.acquire().await?;
let sql1 = "SELECT 1 FROM dual";
let sql2 = "SELECT 2 FROM dual";
assert!(!con.as_mut().ws.statement_cache.contains_key(sql1));
assert!(!con.as_mut().ws.statement_cache.contains_key(sql2));
sqlx::query(sql1).execute(&mut *con).await?;
assert!(con.as_mut().ws.statement_cache.contains_key(sql1));
assert!(!con.as_mut().ws.statement_cache.contains_key(sql2));
sqlx::query(sql2).execute(&mut *con).await?;
assert!(!con.as_mut().ws.statement_cache.contains_key(sql1));
assert!(con.as_mut().ws.statement_cache.contains_key(sql2));
Ok(())
}
#[sqlx::test]
async fn test_schema_none_selected(
pool_opts: PoolOptions<Exasol>,
mut exa_opts: ExaConnectOptions,
) -> Result<(), BoxDynError> {
exa_opts.schema = None;
let pool = pool_opts.connect_with(exa_opts).await?;
let mut con = pool.acquire().await?;
let schema: Option<String> = sqlx::query_scalar("SELECT CURRENT_SCHEMA")
.fetch_one(&mut *con)
.await?;
assert!(schema.is_none());
Ok(())
}
#[sqlx::test]
async fn test_connection_result_set_auto_close(
pool_opts: PoolOptions<Exasol>,
exa_opts: ExaConnectOptions,
) -> Result<(), BoxDynError> {
let pool = pool_opts.max_connections(1).connect_with(exa_opts).await?;
let mut conn = pool.acquire().await?;
conn.execute("CREATE TABLE CLOSE_RESULTS_TEST ( col DECIMAL(3, 0) );")
.await?;
sqlx::query("INSERT INTO CLOSE_RESULTS_TEST VALUES(?)")
.bind(vec![1i8; 10000])
.execute(&mut *conn)
.await?;
assert!(conn.ws.pending_close.is_none());
let _ = conn
.fetch("SELECT * FROM CLOSE_RESULTS_TEST")
.try_next()
.await?;
assert!(conn.ws.pending_close.is_some());
let _ = conn
.fetch("SELECT * FROM CLOSE_RESULTS_TEST")
.try_next()
.await;
assert!(conn.ws.pending_close.is_some());
let _ = conn
.fetch("SELECT * FROM CLOSE_RESULTS_TEST")
.try_next()
.await;
assert!(conn.ws.pending_close.is_some());
let _ = conn
.fetch("SELECT * FROM CLOSE_RESULTS_TEST")
.try_next()
.await;
assert!(conn.ws.pending_close.is_some());
conn.flush_attributes().await?;
assert!(conn.ws.pending_close.is_none());
Ok(())
}
}