#![expect(
dead_code,
reason = "infrastructure for direct transport creation; accessors used under specific feature combos"
)]
use crate::error::{Error, Result};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum TransportType {
Tcp,
#[cfg(unix)]
UnixSocket,
#[cfg(windows)]
NamedPipe,
Grpc,
}
impl TransportType {
#[expect(
clippy::trivially_copy_pass_by_ref,
reason = "signature kept for API consistency with the trait family that unifies Copy and non-Copy implementers"
)]
pub(crate) fn as_str(&self) -> &'static str {
match self {
TransportType::Tcp => "TCP",
#[cfg(unix)]
TransportType::UnixSocket => "Unix Socket",
#[cfg(windows)]
TransportType::NamedPipe => "Named Pipe",
TransportType::Grpc => "gRPC",
}
}
}
impl std::fmt::Display for TransportType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
pub(crate) fn detect_transport_type(endpoint: &str) -> TransportType {
if endpoint.starts_with("https://") || endpoint.starts_with("http://") {
TransportType::Grpc
} else {
#[cfg(unix)]
{
if endpoint.starts_with("tab.domain://") || endpoint.starts_with('/') {
return TransportType::UnixSocket;
}
}
#[cfg(windows)]
{
if endpoint.starts_with("tab.pipe://") || endpoint.starts_with(r"\\") {
return TransportType::NamedPipe;
}
}
TransportType::Tcp
}
}
pub(crate) enum Transport {
Tcp(Box<TcpTransport>),
Grpc(Box<GrpcTransport>),
}
pub(crate) struct TcpTransport {
pub(crate) client: hyperdb_api_core::client::Client,
}
pub(crate) struct GrpcTransport {
pub(crate) client: hyperdb_api_core::client::grpc::GrpcClientSync,
pub(crate) config: hyperdb_api_core::client::grpc::GrpcConfig,
}
fn parse_endpoint(endpoint: &str) -> (&str, u16) {
if let Some(colon_pos) = endpoint.rfind(':') {
let host = &endpoint[..colon_pos];
let port_str = &endpoint[colon_pos + 1..];
let port = port_str.parse::<u16>().unwrap_or(7483);
(host, port)
} else {
(endpoint, 7483)
}
}
impl Transport {
pub(crate) fn transport_type(&self) -> TransportType {
match self {
Transport::Tcp(_) => TransportType::Tcp,
Transport::Grpc(_) => TransportType::Grpc,
}
}
pub(crate) fn supports_writes(&self) -> bool {
match self {
Transport::Tcp(_) => true,
Transport::Grpc(_) => false, }
}
pub(crate) fn connect_tcp(endpoint: &str) -> Result<Self> {
let (host, port) = parse_endpoint(endpoint);
let config = hyperdb_api_core::client::Config::new()
.with_host(host)
.with_port(port);
let client = hyperdb_api_core::client::Client::connect(&config)?;
Ok(Transport::Tcp(Box::new(TcpTransport { client })))
}
pub(crate) fn connect_tcp_with_auth(
endpoint: &str,
user: &str,
password: &str,
) -> Result<Self> {
let (host, port) = parse_endpoint(endpoint);
let config = hyperdb_api_core::client::Config::new()
.with_host(host)
.with_port(port)
.with_user(user)
.with_password(password);
let client = hyperdb_api_core::client::Client::connect(&config)?;
Ok(Transport::Tcp(Box::new(TcpTransport { client })))
}
pub(crate) fn connect_grpc(config: hyperdb_api_core::client::grpc::GrpcConfig) -> Result<Self> {
let client = hyperdb_api_core::client::grpc::GrpcClientSync::connect(config.clone())?;
Ok(Transport::Grpc(Box::new(GrpcTransport { client, config })))
}
pub(crate) fn execute_command(&self, sql: &str) -> Result<u64> {
match self {
Transport::Tcp(tcp) => Ok(tcp.client.exec(sql)?),
Transport::Grpc(_) => Err(Error::new(
"gRPC transport is read-only. Write operations (INSERT, UPDATE, DELETE, DDL) \
are not yet supported over gRPC. Use a TCP connection for write operations.",
)),
}
}
pub(crate) fn execute_query_to_arrow(&self, sql: &str) -> Result<bytes::Bytes> {
match self {
Transport::Tcp(tcp) => {
let copy_query = format!("COPY ({sql}) TO STDOUT WITH (format arrowstream)");
Ok(bytes::Bytes::from(tcp.client.copy_out(©_query)?))
}
Transport::Grpc(grpc) => {
let mut client =
hyperdb_api_core::client::grpc::GrpcClientSync::connect(grpc.config.clone())?;
Ok(client.execute_query_to_arrow(sql)?)
}
}
}
}