use crate::adbc::Statement;
use crate::connection::auth::AuthResponseData;
use crate::connection::params::ConnectionParams;
use crate::connection::session::{Session as SessionInfo, SessionConfig, SessionState};
use crate::error::{ConnectionError, ExasolError, QueryError};
use crate::query::prepared::PreparedStatement;
use crate::query::results::ResultSet;
use crate::transport::protocol::{
ConnectionParams as TransportConnectionParams, Credentials as TransportCredentials,
QueryResult, TransportProtocol,
};
use crate::transport::WebSocketTransport;
use arrow::array::RecordBatch;
use std::sync::{Arc, OnceLock};
use std::time::Duration;
use tokio::runtime::Runtime;
use tokio::sync::Mutex;
use tokio::time::timeout;
pub type Session = Connection;
fn blocking_runtime() -> &'static Runtime {
static RUNTIME: OnceLock<Runtime> = OnceLock::new();
RUNTIME.get_or_init(|| {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to create tokio runtime for blocking operations")
})
}
pub struct Connection {
transport: Arc<Mutex<dyn TransportProtocol>>,
session: SessionInfo,
params: ConnectionParams,
}
impl Connection {
pub async fn from_params(params: ConnectionParams) -> Result<Self, ConnectionError> {
let mut transport = WebSocketTransport::new();
let mut transport_params = TransportConnectionParams::new(params.host.clone(), params.port)
.with_tls(params.use_tls)
.with_validate_server_certificate(params.validate_server_certificate)
.with_timeout(params.connection_timeout.as_millis() as u64);
if let Some(ref fp) = params.certificate_fingerprint {
transport_params = transport_params.with_certificate_fingerprint(fp.clone());
}
transport.connect(&transport_params).await.map_err(|e| {
ConnectionError::ConnectionFailed {
host: params.host.clone(),
port: params.port,
message: e.to_string(),
}
})?;
let credentials =
TransportCredentials::new(params.username.clone(), params.password().to_string());
let session_info = transport
.authenticate(&credentials)
.await
.map_err(|e| ConnectionError::AuthenticationFailed(e.to_string()))?;
let session_config = SessionConfig {
idle_timeout: params.idle_timeout,
query_timeout: params.query_timeout,
..Default::default()
};
let session_id = session_info.session_id.clone();
let auth_response = AuthResponseData {
session_id: session_id.clone(),
protocol_version: session_info.protocol_version,
release_version: session_info.release_version,
database_name: session_info.database_name,
product_name: session_info.product_name,
max_data_message_size: session_info.max_data_message_size,
max_identifier_length: 128, max_varchar_length: 2_000_000, identifier_quote_string: "\"".to_string(),
time_zone: session_info.time_zone.unwrap_or_else(|| "UTC".to_string()),
time_zone_behavior: "INVALID TIMESTAMP TO DOUBLE".to_string(),
};
let session = SessionInfo::new(session_id, auth_response, session_config);
if let Some(schema) = ¶ms.schema {
session.set_current_schema(Some(schema.clone())).await;
}
Ok(Self {
transport: Arc::new(Mutex::new(transport)),
session,
params,
})
}
pub fn builder() -> ConnectionBuilder {
ConnectionBuilder::new()
}
pub fn create_statement(&self, sql: impl Into<String>) -> Statement {
Statement::new(sql)
}
pub async fn execute_statement(&mut self, stmt: &Statement) -> Result<ResultSet, QueryError> {
self.session
.validate_ready()
.await
.map_err(|e| QueryError::InvalidState(e.to_string()))?;
self.session.set_state(SessionState::Executing).await;
self.session.increment_query_count();
let final_sql = stmt.build_sql()?;
let timeout_duration = Duration::from_millis(stmt.timeout_ms());
let transport = Arc::clone(&self.transport);
let result = timeout(timeout_duration, async move {
let mut transport_guard = transport.lock().await;
transport_guard.execute_query(&final_sql).await
})
.await
.map_err(|_| QueryError::Timeout {
timeout_ms: stmt.timeout_ms(),
})?
.map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
self.update_session_state_after_query().await;
ResultSet::from_transport_result(result, Arc::clone(&self.transport))
}
pub async fn execute_statement_update(&mut self, stmt: &Statement) -> Result<i64, QueryError> {
let result_set = self.execute_statement(stmt).await?;
result_set.row_count().ok_or_else(|| {
QueryError::NoResultSet("Expected row count, got result set".to_string())
})
}
pub async fn prepare(
&mut self,
sql: impl Into<String>,
) -> Result<PreparedStatement, QueryError> {
let sql = sql.into();
self.session
.validate_ready()
.await
.map_err(|e| QueryError::InvalidState(e.to_string()))?;
let mut transport = self.transport.lock().await;
let handle = transport
.create_prepared_statement(&sql)
.await
.map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
Ok(PreparedStatement::new(handle))
}
pub async fn execute_prepared(
&mut self,
stmt: &PreparedStatement,
) -> Result<ResultSet, QueryError> {
if stmt.is_closed() {
return Err(QueryError::StatementClosed);
}
self.session
.validate_ready()
.await
.map_err(|e| QueryError::InvalidState(e.to_string()))?;
self.session.set_state(SessionState::Executing).await;
self.session.increment_query_count();
let params_data = stmt.build_parameters_data()?;
let mut transport = self.transport.lock().await;
let result = transport
.execute_prepared_statement(stmt.handle_ref(), params_data)
.await
.map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
drop(transport);
self.update_session_state_after_query().await;
ResultSet::from_transport_result(result, Arc::clone(&self.transport))
}
pub async fn execute_prepared_update(
&mut self,
stmt: &PreparedStatement,
) -> Result<i64, QueryError> {
if stmt.is_closed() {
return Err(QueryError::StatementClosed);
}
self.session
.validate_ready()
.await
.map_err(|e| QueryError::InvalidState(e.to_string()))?;
self.session.set_state(SessionState::Executing).await;
let params_data = stmt.build_parameters_data()?;
let mut transport = self.transport.lock().await;
let result = transport
.execute_prepared_statement(stmt.handle_ref(), params_data)
.await
.map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
drop(transport);
self.update_session_state_after_query().await;
match result {
QueryResult::RowCount { count } => Ok(count),
QueryResult::ResultSet { .. } => Err(QueryError::UnexpectedResultSet),
}
}
pub async fn close_prepared(&mut self, mut stmt: PreparedStatement) -> Result<(), QueryError> {
if stmt.is_closed() {
return Ok(());
}
let mut transport = self.transport.lock().await;
transport
.close_prepared_statement(stmt.handle_ref())
.await
.map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
stmt.mark_closed();
Ok(())
}
pub async fn execute(&mut self, sql: impl Into<String>) -> Result<ResultSet, QueryError> {
let stmt = self.create_statement(sql);
self.execute_statement(&stmt).await
}
pub async fn query(&mut self, sql: impl Into<String>) -> Result<Vec<RecordBatch>, QueryError> {
let result_set = self.execute(sql).await?;
result_set.fetch_all().await
}
pub async fn execute_update(&mut self, sql: impl Into<String>) -> Result<i64, QueryError> {
let stmt = self.create_statement(sql);
self.execute_statement_update(&stmt).await
}
pub async fn begin_transaction(&mut self) -> Result<(), QueryError> {
self.transport
.lock()
.await
.set_autocommit(false)
.await
.map_err(|e| QueryError::TransactionError(e.to_string()))?;
self.session
.begin_transaction()
.await
.map_err(|e| QueryError::TransactionError(e.to_string()))?;
Ok(())
}
pub async fn commit(&mut self) -> Result<(), QueryError> {
if !self.in_transaction() {
return Ok(());
}
self.execute_update("COMMIT").await?;
self.session
.commit_transaction()
.await
.map_err(|e| QueryError::TransactionError(e.to_string()))?;
Ok(())
}
pub async fn rollback(&mut self) -> Result<(), QueryError> {
if !self.in_transaction() {
return Ok(());
}
self.execute_update("ROLLBACK").await?;
self.session
.rollback_transaction()
.await
.map_err(|e| QueryError::TransactionError(e.to_string()))?;
Ok(())
}
pub fn in_transaction(&self) -> bool {
self.session.in_transaction()
}
pub async fn current_schema(&self) -> Option<String> {
self.session.current_schema().await
}
pub async fn set_schema(&mut self, schema: impl Into<String>) -> Result<(), QueryError> {
let schema_name = schema.into();
self.execute_update(format!("OPEN SCHEMA {}", schema_name))
.await?;
self.session.set_current_schema(Some(schema_name)).await;
Ok(())
}
pub async fn get_catalogs(&mut self) -> Result<ResultSet, QueryError> {
self.execute("SELECT DISTINCT SCHEMA_NAME AS CATALOG_NAME FROM SYS.EXA_ALL_SCHEMAS ORDER BY CATALOG_NAME")
.await
}
pub async fn get_schemas(&mut self, catalog: Option<&str>) -> Result<ResultSet, QueryError> {
let sql = if let Some(cat) = catalog {
format!(
"SELECT SCHEMA_NAME FROM SYS.EXA_ALL_SCHEMAS WHERE SCHEMA_NAME = '{}' ORDER BY SCHEMA_NAME",
cat.replace('\'', "''")
)
} else {
"SELECT SCHEMA_NAME FROM SYS.EXA_ALL_SCHEMAS ORDER BY SCHEMA_NAME".to_string()
};
self.execute(sql).await
}
pub async fn get_tables(
&mut self,
catalog: Option<&str>,
schema: Option<&str>,
table: Option<&str>,
) -> Result<ResultSet, QueryError> {
let mut conditions = vec!["OBJECT_TYPE IN ('TABLE', 'VIEW')".to_string()];
let _ = catalog;
if let Some(sch) = schema {
conditions.push(format!("ROOT_NAME = '{}'", sch.replace('\'', "''")));
}
if let Some(tbl) = table {
conditions.push(format!("OBJECT_NAME = '{}'", tbl.replace('\'', "''")));
}
let where_clause = format!("WHERE {}", conditions.join(" AND "));
let sql = format!(
"SELECT ROOT_NAME AS TABLE_SCHEMA, OBJECT_NAME AS TABLE_NAME, OBJECT_TYPE AS TABLE_TYPE FROM SYS.EXA_ALL_OBJECTS {} ORDER BY ROOT_NAME, OBJECT_NAME",
where_clause
);
self.execute(sql).await
}
pub async fn get_columns(
&mut self,
catalog: Option<&str>,
schema: Option<&str>,
table: Option<&str>,
column: Option<&str>,
) -> Result<ResultSet, QueryError> {
let mut conditions = Vec::new();
if let Some(cat) = catalog {
conditions.push(format!("COLUMN_SCHEMA = '{}'", cat.replace('\'', "''")));
}
if let Some(sch) = schema {
conditions.push(format!("COLUMN_SCHEMA = '{}'", sch.replace('\'', "''")));
}
if let Some(tbl) = table {
conditions.push(format!("COLUMN_TABLE = '{}'", tbl.replace('\'', "''")));
}
if let Some(col) = column {
conditions.push(format!("COLUMN_NAME = '{}'", col.replace('\'', "''")));
}
let where_clause = if conditions.is_empty() {
String::new()
} else {
format!("WHERE {}", conditions.join(" AND "))
};
let sql = format!(
"SELECT COLUMN_SCHEMA, COLUMN_TABLE, COLUMN_NAME, COLUMN_TYPE, COLUMN_NUM_PREC, COLUMN_NUM_SCALE, COLUMN_IS_NULLABLE \
FROM SYS.EXA_ALL_COLUMNS {} ORDER BY COLUMN_SCHEMA, COLUMN_TABLE, ORDINAL_POSITION",
where_clause
);
self.execute(sql).await
}
pub fn session_id(&self) -> &str {
self.session.session_id()
}
pub fn params(&self) -> &ConnectionParams {
&self.params
}
pub async fn is_closed(&self) -> bool {
self.session.is_closed().await
}
pub async fn close(self) -> Result<(), ConnectionError> {
self.session.close().await?;
let mut transport = self.transport.lock().await;
transport
.close()
.await
.map_err(|e| ConnectionError::ConnectionFailed {
host: self.params.host.clone(),
port: self.params.port,
message: e.to_string(),
})?;
Ok(())
}
pub async fn shutdown(&self) -> Result<(), ConnectionError> {
self.session.close().await?;
let mut transport = self.transport.lock().await;
transport
.close()
.await
.map_err(|e| ConnectionError::ConnectionFailed {
host: self.params.host.clone(),
port: self.params.port,
message: e.to_string(),
})?;
Ok(())
}
fn make_sql_executor(
&self,
) -> impl Fn(
String,
)
-> std::pin::Pin<Box<dyn std::future::Future<Output = Result<u64, String>> + Send>> {
let transport = Arc::clone(&self.transport);
move |sql: String| {
let transport = Arc::clone(&transport);
Box::pin(async move {
let mut transport_guard = transport.lock().await;
match transport_guard.execute_query(&sql).await {
Ok(QueryResult::RowCount { count }) => Ok(count as u64),
Ok(QueryResult::ResultSet { .. }) => Ok(0),
Err(e) => Err(e.to_string()),
}
})
}
}
pub async fn import_csv_from_file(
&mut self,
table: &str,
file_path: &std::path::Path,
options: crate::import::csv::CsvImportOptions,
) -> Result<u64, crate::import::ImportError> {
let options = options
.exasol_host(&self.params.host)
.exasol_port(self.params.port);
crate::import::csv::import_from_file(self.make_sql_executor(), table, file_path, options)
.await
}
pub async fn import_csv_from_stream<R>(
&mut self,
table: &str,
reader: R,
options: crate::import::csv::CsvImportOptions,
) -> Result<u64, crate::import::ImportError>
where
R: tokio::io::AsyncRead + Unpin + Send + 'static,
{
let options = options
.exasol_host(&self.params.host)
.exasol_port(self.params.port);
crate::import::csv::import_from_stream(self.make_sql_executor(), table, reader, options)
.await
}
pub async fn import_csv_from_iter<I, T, S>(
&mut self,
table: &str,
rows: I,
options: crate::import::csv::CsvImportOptions,
) -> Result<u64, crate::import::ImportError>
where
I: IntoIterator<Item = T> + Send + 'static,
T: IntoIterator<Item = S> + Send,
S: AsRef<str>,
{
let options = options
.exasol_host(&self.params.host)
.exasol_port(self.params.port);
crate::import::csv::import_from_iter(self.make_sql_executor(), table, rows, options).await
}
pub async fn export_csv_to_file(
&mut self,
source: crate::query::export::ExportSource,
file_path: &std::path::Path,
options: crate::export::csv::CsvExportOptions,
) -> Result<u64, crate::export::csv::ExportError> {
let options = options
.exasol_host(&self.params.host)
.exasol_port(self.params.port);
let mut transport_guard = self.transport.lock().await;
crate::export::csv::export_to_file(&mut *transport_guard, source, file_path, options).await
}
pub async fn export_csv_to_stream<W>(
&mut self,
source: crate::query::export::ExportSource,
writer: W,
options: crate::export::csv::CsvExportOptions,
) -> Result<u64, crate::export::csv::ExportError>
where
W: tokio::io::AsyncWrite + Unpin,
{
let options = options
.exasol_host(&self.params.host)
.exasol_port(self.params.port);
let mut transport_guard = self.transport.lock().await;
crate::export::csv::export_to_stream(&mut *transport_guard, source, writer, options).await
}
pub async fn export_csv_to_list(
&mut self,
source: crate::query::export::ExportSource,
options: crate::export::csv::CsvExportOptions,
) -> Result<Vec<Vec<String>>, crate::export::csv::ExportError> {
let options = options
.exasol_host(&self.params.host)
.exasol_port(self.params.port);
let mut transport_guard = self.transport.lock().await;
crate::export::csv::export_to_list(&mut *transport_guard, source, options).await
}
pub async fn import_csv_from_files<S: crate::import::IntoFileSources>(
&mut self,
table: &str,
paths: S,
options: crate::import::csv::CsvImportOptions,
) -> Result<u64, crate::import::ImportError> {
let options = options
.exasol_host(&self.params.host)
.exasol_port(self.params.port);
crate::import::csv::import_from_files(self.make_sql_executor(), table, paths, options).await
}
pub async fn import_from_parquet(
&mut self,
table: &str,
file_path: &std::path::Path,
options: crate::import::parquet::ParquetImportOptions,
) -> Result<u64, crate::import::ImportError> {
let options = options
.with_exasol_host(&self.params.host)
.with_exasol_port(self.params.port);
crate::import::parquet::import_from_parquet(
self.make_sql_executor(),
table,
file_path,
options,
)
.await
}
pub async fn import_parquet_from_files<S: crate::import::IntoFileSources>(
&mut self,
table: &str,
paths: S,
options: crate::import::parquet::ParquetImportOptions,
) -> Result<u64, crate::import::ImportError> {
let options = options
.with_exasol_host(&self.params.host)
.with_exasol_port(self.params.port);
crate::import::parquet::import_from_parquet_files(
self.make_sql_executor(),
table,
paths,
options,
)
.await
}
pub async fn export_to_parquet(
&mut self,
source: crate::query::export::ExportSource,
file_path: &std::path::Path,
options: crate::export::parquet::ParquetExportOptions,
) -> Result<u64, crate::export::csv::ExportError> {
let options = options
.exasol_host(&self.params.host)
.exasol_port(self.params.port);
let mut transport_guard = self.transport.lock().await;
crate::export::parquet::export_to_parquet_via_transport(
&mut *transport_guard,
source,
file_path,
options,
)
.await
}
pub async fn import_from_record_batch(
&mut self,
table: &str,
batch: &RecordBatch,
options: crate::import::arrow::ArrowImportOptions,
) -> Result<u64, crate::import::ImportError> {
let options = options
.exasol_host(&self.params.host)
.exasol_port(self.params.port);
crate::import::arrow::import_from_record_batch(
self.make_sql_executor(),
table,
batch,
options,
)
.await
}
pub async fn import_from_record_batches<I>(
&mut self,
table: &str,
batches: I,
options: crate::import::arrow::ArrowImportOptions,
) -> Result<u64, crate::import::ImportError>
where
I: IntoIterator<Item = RecordBatch>,
{
let options = options
.exasol_host(&self.params.host)
.exasol_port(self.params.port);
crate::import::arrow::import_from_record_batches(
self.make_sql_executor(),
table,
batches,
options,
)
.await
}
pub async fn import_from_arrow_ipc<R>(
&mut self,
table: &str,
reader: R,
options: crate::import::arrow::ArrowImportOptions,
) -> Result<u64, crate::import::ImportError>
where
R: tokio::io::AsyncRead + Unpin + Send,
{
let options = options
.exasol_host(&self.params.host)
.exasol_port(self.params.port);
crate::import::arrow::import_from_arrow_ipc(
self.make_sql_executor(),
table,
reader,
options,
)
.await
}
pub async fn export_to_record_batches(
&mut self,
source: crate::query::export::ExportSource,
options: crate::export::arrow::ArrowExportOptions,
) -> Result<Vec<RecordBatch>, crate::export::csv::ExportError> {
let options = options
.exasol_host(&self.params.host)
.exasol_port(self.params.port);
let mut transport_guard = self.transport.lock().await;
crate::export::arrow::export_to_record_batches(&mut *transport_guard, source, options).await
}
pub async fn export_to_arrow_ipc(
&mut self,
source: crate::query::export::ExportSource,
file_path: &std::path::Path,
options: crate::export::arrow::ArrowExportOptions,
) -> Result<u64, crate::export::csv::ExportError> {
let options = options
.exasol_host(&self.params.host)
.exasol_port(self.params.port);
let mut transport_guard = self.transport.lock().await;
crate::export::arrow::export_to_arrow_ipc(&mut *transport_guard, source, file_path, options)
.await
}
pub fn blocking_import_csv_from_file(
&mut self,
table: &str,
file_path: &std::path::Path,
options: crate::import::csv::CsvImportOptions,
) -> Result<u64, crate::import::ImportError> {
blocking_runtime().block_on(self.import_csv_from_file(table, file_path, options))
}
pub fn blocking_import_csv_from_files<S: crate::import::IntoFileSources>(
&mut self,
table: &str,
paths: S,
options: crate::import::csv::CsvImportOptions,
) -> Result<u64, crate::import::ImportError> {
blocking_runtime().block_on(self.import_csv_from_files(table, paths, options))
}
pub fn blocking_import_from_parquet(
&mut self,
table: &str,
file_path: &std::path::Path,
options: crate::import::parquet::ParquetImportOptions,
) -> Result<u64, crate::import::ImportError> {
blocking_runtime().block_on(self.import_from_parquet(table, file_path, options))
}
pub fn blocking_import_parquet_from_files<S: crate::import::IntoFileSources>(
&mut self,
table: &str,
paths: S,
options: crate::import::parquet::ParquetImportOptions,
) -> Result<u64, crate::import::ImportError> {
blocking_runtime().block_on(self.import_parquet_from_files(table, paths, options))
}
pub fn blocking_import_from_record_batch(
&mut self,
table: &str,
batch: &RecordBatch,
options: crate::import::arrow::ArrowImportOptions,
) -> Result<u64, crate::import::ImportError> {
blocking_runtime().block_on(self.import_from_record_batch(table, batch, options))
}
pub fn blocking_import_from_arrow_ipc(
&mut self,
table: &str,
file_path: &std::path::Path,
options: crate::import::arrow::ArrowImportOptions,
) -> Result<u64, crate::import::ImportError> {
blocking_runtime().block_on(async {
let file = tokio::fs::File::open(file_path)
.await
.map_err(crate::import::ImportError::IoError)?;
self.import_from_arrow_ipc(table, file, options).await
})
}
pub fn blocking_export_csv_to_file(
&mut self,
source: crate::query::export::ExportSource,
file_path: &std::path::Path,
options: crate::export::csv::CsvExportOptions,
) -> Result<u64, crate::export::csv::ExportError> {
blocking_runtime().block_on(self.export_csv_to_file(source, file_path, options))
}
pub fn blocking_export_to_parquet(
&mut self,
source: crate::query::export::ExportSource,
file_path: &std::path::Path,
options: crate::export::parquet::ParquetExportOptions,
) -> Result<u64, crate::export::csv::ExportError> {
blocking_runtime().block_on(self.export_to_parquet(source, file_path, options))
}
pub fn blocking_export_to_record_batches(
&mut self,
source: crate::query::export::ExportSource,
options: crate::export::arrow::ArrowExportOptions,
) -> Result<Vec<RecordBatch>, crate::export::csv::ExportError> {
blocking_runtime().block_on(self.export_to_record_batches(source, options))
}
pub fn blocking_export_to_arrow_ipc(
&mut self,
source: crate::query::export::ExportSource,
file_path: &std::path::Path,
options: crate::export::arrow::ArrowExportOptions,
) -> Result<u64, crate::export::csv::ExportError> {
blocking_runtime().block_on(self.export_to_arrow_ipc(source, file_path, options))
}
async fn update_session_state_after_query(&self) {
if self.session.in_transaction() {
self.session.set_state(SessionState::InTransaction).await;
} else {
self.session.set_state(SessionState::Ready).await;
}
self.session.update_activity().await;
}
}
impl std::fmt::Debug for Connection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Connection")
.field("session_id", &self.session.session_id())
.field("host", &self.params.host)
.field("port", &self.params.port)
.field("username", &self.params.username)
.field("in_transaction", &self.session.in_transaction())
.finish()
}
}
pub struct ConnectionBuilder {
params_builder: crate::connection::params::ConnectionBuilder,
}
impl ConnectionBuilder {
pub fn new() -> Self {
Self {
params_builder: crate::connection::params::ConnectionBuilder::new(),
}
}
pub fn host(mut self, host: &str) -> Self {
self.params_builder = self.params_builder.host(host);
self
}
pub fn port(mut self, port: u16) -> Self {
self.params_builder = self.params_builder.port(port);
self
}
pub fn username(mut self, username: &str) -> Self {
self.params_builder = self.params_builder.username(username);
self
}
pub fn password(mut self, password: &str) -> Self {
self.params_builder = self.params_builder.password(password);
self
}
pub fn schema(mut self, schema: &str) -> Self {
self.params_builder = self.params_builder.schema(schema);
self
}
pub fn use_tls(mut self, use_tls: bool) -> Self {
self.params_builder = self.params_builder.use_tls(use_tls);
self
}
pub async fn connect(self) -> Result<Connection, ExasolError> {
let params = self.params_builder.build()?;
Ok(Connection::from_params(params).await?)
}
}
impl Default for ConnectionBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_connection_builder() {
let _builder = ConnectionBuilder::new()
.host("localhost")
.port(8563)
.username("test")
.password("secret")
.schema("MY_SCHEMA")
.use_tls(false);
}
#[test]
fn test_create_statement_is_sync() {
}
}
#[cfg(test)]
mod blocking_tests {
use super::*;
#[test]
fn test_session_type_alias_exists() {
fn takes_session(_session: &Session) {}
fn takes_connection(_connection: &Connection) {}
fn verify_interchangeable<F1, F2>(f1: F1, f2: F2)
where
F1: Fn(&Session),
F2: Fn(&Connection),
{
let _ = (f1, f2);
}
verify_interchangeable(takes_session, takes_connection);
}
#[test]
fn test_blocking_runtime_exists() {
let runtime = blocking_runtime();
let _ = runtime.handle();
}
#[test]
fn test_connection_has_blocking_import_csv() {
fn _check_method_exists(_conn: &mut Connection) {
let _: fn(
&mut Connection,
&str,
&std::path::Path,
crate::import::csv::CsvImportOptions,
) -> Result<u64, crate::import::ImportError> =
Connection::blocking_import_csv_from_file;
}
}
#[test]
fn test_connection_has_blocking_import_parquet() {
fn _check_method_exists(_conn: &mut Connection) {
let _: fn(
&mut Connection,
&str,
&std::path::Path,
crate::import::parquet::ParquetImportOptions,
) -> Result<u64, crate::import::ImportError> = Connection::blocking_import_from_parquet;
}
}
#[test]
fn test_connection_has_blocking_import_record_batch() {
fn _check_method_exists(_conn: &mut Connection) {
let _: fn(
&mut Connection,
&str,
&RecordBatch,
crate::import::arrow::ArrowImportOptions,
) -> Result<u64, crate::import::ImportError> =
Connection::blocking_import_from_record_batch;
}
}
#[test]
fn test_connection_has_blocking_import_arrow_ipc() {
fn _check_method_exists(_conn: &mut Connection) {
let _: fn(
&mut Connection,
&str,
&std::path::Path,
crate::import::arrow::ArrowImportOptions,
) -> Result<u64, crate::import::ImportError> =
Connection::blocking_import_from_arrow_ipc;
}
}
#[test]
fn test_connection_has_blocking_export_csv() {
fn _check_method_exists(_conn: &mut Connection) {
let _: fn(
&mut Connection,
crate::query::export::ExportSource,
&std::path::Path,
crate::export::csv::CsvExportOptions,
) -> Result<u64, crate::export::csv::ExportError> =
Connection::blocking_export_csv_to_file;
}
}
#[test]
fn test_connection_has_blocking_export_parquet() {
fn _check_method_exists(_conn: &mut Connection) {
let _: fn(
&mut Connection,
crate::query::export::ExportSource,
&std::path::Path,
crate::export::parquet::ParquetExportOptions,
) -> Result<u64, crate::export::csv::ExportError> =
Connection::blocking_export_to_parquet;
}
}
#[test]
fn test_connection_has_blocking_export_record_batches() {
fn _check_method_exists(_conn: &mut Connection) {
let _: fn(
&mut Connection,
crate::query::export::ExportSource,
crate::export::arrow::ArrowExportOptions,
) -> Result<Vec<RecordBatch>, crate::export::csv::ExportError> =
Connection::blocking_export_to_record_batches;
}
}
#[test]
fn test_connection_has_blocking_export_arrow_ipc() {
fn _check_method_exists(_conn: &mut Connection) {
let _: fn(
&mut Connection,
crate::query::export::ExportSource,
&std::path::Path,
crate::export::arrow::ArrowExportOptions,
) -> Result<u64, crate::export::csv::ExportError> =
Connection::blocking_export_to_arrow_ipc;
}
}
}