use std::path::{Path, PathBuf};
use std::time::Duration;
use crate::async_connection::AsyncConnection;
use crate::async_transport::AsyncTransport;
use crate::connection::CreateMode;
use crate::error::{Error, Result};
use crate::transport::{detect_transport_type, TransportType};
use hyperdb_api_core::client::{AsyncClient, Config};
#[derive(Debug, Clone)]
pub struct AsyncConnectionBuilder {
endpoint: String,
database: Option<PathBuf>,
create_mode: CreateMode,
user: Option<String>,
password: Option<String>,
login_timeout: Option<Duration>,
query_timeout: Option<Duration>,
application_name: Option<String>,
transfer_mode: Option<hyperdb_api_core::client::grpc::TransferMode>,
}
impl Default for AsyncConnectionBuilder {
fn default() -> Self {
Self::new("localhost:7483")
}
}
impl AsyncConnectionBuilder {
pub fn new(endpoint: impl Into<String>) -> Self {
Self {
endpoint: endpoint.into(),
database: None,
create_mode: CreateMode::default(),
user: Some("tableau_internal_user".to_string()),
password: None,
login_timeout: None,
query_timeout: None,
application_name: None,
transfer_mode: None,
}
}
#[must_use]
pub fn database(mut self, path: impl AsRef<Path>) -> Self {
self.database = Some(path.as_ref().to_path_buf());
self
}
#[must_use]
pub fn create_mode(mut self, mode: CreateMode) -> Self {
self.create_mode = mode;
self
}
#[must_use]
pub fn user(mut self, user: impl Into<String>) -> Self {
self.user = Some(user.into());
self
}
#[must_use]
pub fn password(mut self, password: impl Into<String>) -> Self {
self.password = Some(password.into());
self
}
#[must_use]
pub fn login_timeout(mut self, timeout: Duration) -> Self {
self.login_timeout = Some(timeout);
self
}
#[must_use]
pub fn query_timeout(mut self, timeout: Duration) -> Self {
self.query_timeout = Some(timeout);
self
}
#[must_use]
pub fn application_name(mut self, name: impl Into<String>) -> Self {
self.application_name = Some(name.into());
self
}
#[must_use]
pub fn auth(mut self, user: impl Into<String>, password: impl Into<String>) -> Self {
self.user = Some(user.into());
self.password = Some(password.into());
self
}
#[must_use]
pub fn create_new_database(mut self, database_path: impl AsRef<Path>) -> Self {
self.database = Some(database_path.as_ref().to_path_buf());
self.create_mode = CreateMode::Create;
self
}
#[must_use]
pub fn create_or_open_database(mut self, database_path: impl AsRef<Path>) -> Self {
self.database = Some(database_path.as_ref().to_path_buf());
self.create_mode = CreateMode::CreateIfNotExists;
self
}
#[must_use]
pub fn open_database(mut self, database_path: impl AsRef<Path>) -> Self {
self.database = Some(database_path.as_ref().to_path_buf());
self.create_mode = CreateMode::DoNotCreate;
self
}
#[must_use]
pub fn transfer_mode(mut self, mode: hyperdb_api_core::client::grpc::TransferMode) -> Self {
self.transfer_mode = Some(mode);
self
}
pub async fn build(self) -> Result<AsyncConnection> {
let transport_type = detect_transport_type(&self.endpoint);
match transport_type {
TransportType::Tcp => self.build_tcp().await,
#[cfg(unix)]
TransportType::UnixSocket => self.build_unix().await,
#[cfg(windows)]
TransportType::NamedPipe => self.build_named_pipe().await,
TransportType::Grpc => self.build_grpc().await,
}
}
async fn build_tcp(self) -> Result<AsyncConnection> {
let mut config: Config = self
.endpoint
.parse()
.map_err(|e| Error::new(format!("invalid endpoint: {e}")))?;
if let Some(user) = &self.user {
config = config.with_user(user);
}
if let Some(password) = &self.password {
config = config.with_password(password);
}
if let Some(ref app_name) = self.application_name {
config = config.with_application_name(app_name);
}
if let Some(timeout) = self.login_timeout {
config = config.with_connect_timeout(timeout);
}
let db_path_str = self
.database
.as_ref()
.map(|p| p.to_string_lossy().to_string());
let client = AsyncClient::connect(&config).await?;
let conn = AsyncConnection::from_async_client(client, db_path_str.clone());
if let Some(db_path) = db_path_str {
conn.handle_creation_mode_public(&db_path, self.create_mode)
.await?;
conn.attach_and_set_path_public(&db_path).await?;
}
Ok(conn)
}
#[cfg(unix)]
async fn build_unix(self) -> Result<AsyncConnection> {
use hyperdb_api_core::client::ConnectionEndpoint;
let socket_path = if self.endpoint.starts_with("tab.domain://") {
let endpoint = ConnectionEndpoint::parse(&self.endpoint)
.map_err(|e| Error::new(format!("invalid Unix socket endpoint: {e}")))?;
match endpoint {
ConnectionEndpoint::DomainSocket { directory, name } => directory.join(&name),
ConnectionEndpoint::Tcp { .. } => {
return Err(Error::new("expected Unix domain socket endpoint"))
}
}
} else {
std::path::PathBuf::from(&self.endpoint)
};
let mut config = Config::new();
if let Some(user) = &self.user {
config = config.with_user(user);
}
if let Some(password) = &self.password {
config = config.with_password(password);
}
let db_path_str = self
.database
.as_ref()
.map(|p| p.to_string_lossy().to_string());
let client = AsyncClient::connect_unix(&socket_path, &config).await?;
let conn = AsyncConnection::from_async_client(client, db_path_str.clone());
if let Some(db_path) = db_path_str {
conn.handle_creation_mode_public(&db_path, self.create_mode)
.await?;
conn.attach_and_set_path_public(&db_path).await?;
}
Ok(conn)
}
#[cfg(windows)]
async fn build_named_pipe(self) -> Result<AsyncConnection> {
use hyperdb_api_core::client::ConnectionEndpoint;
let pipe_path = if self.endpoint.starts_with("tab.pipe://") {
let endpoint = ConnectionEndpoint::parse(&self.endpoint)
.map_err(|e| Error::new(format!("invalid named pipe endpoint: {e}")))?;
match endpoint {
ConnectionEndpoint::NamedPipe { host, name } => {
format!(r"\\{host}\pipe\{name}")
}
_ => return Err(Error::new("expected named pipe endpoint")),
}
} else {
self.endpoint.clone()
};
let mut config = Config::new();
if let Some(user) = &self.user {
config = config.with_user(user);
}
if let Some(password) = &self.password {
config = config.with_password(password);
}
let db_path_str = self
.database
.as_ref()
.map(|p| p.to_string_lossy().to_string());
let client = AsyncClient::connect_named_pipe(&pipe_path, &config).await?;
let conn = AsyncConnection::from_async_client(client, db_path_str.clone());
if let Some(db_path) = db_path_str {
conn.handle_creation_mode_public(&db_path, self.create_mode)
.await?;
conn.attach_and_set_path_public(&db_path).await?;
}
Ok(conn)
}
async fn build_grpc(self) -> Result<AsyncConnection> {
if self.create_mode != CreateMode::DoNotCreate {
return Err(Error::new(
"gRPC transport is read-only. Use CreateMode::DoNotCreate for gRPC connections.",
));
}
let db_path_str = self
.database
.as_ref()
.map(|p| p.to_string_lossy().to_string());
let mut grpc_config = hyperdb_api_core::client::grpc::GrpcConfig::new(&self.endpoint);
if let Some(ref db_path) = db_path_str {
grpc_config = grpc_config.database(db_path);
}
if let Some(mode) = self.transfer_mode {
grpc_config = grpc_config.transfer_mode(mode);
}
let transport = AsyncTransport::connect_grpc(grpc_config).await?;
Ok(AsyncConnection::from_transport(transport, db_path_str))
}
}