use std::path::Path;
use hyperdb_api_core::client::Client;
use crate::error::{Error, Result};
use crate::names::escape_sql_path;
use crate::process::HyperProcess;
use crate::result::{Row, Rowset, DEFAULT_BINARY_CHUNK_SIZE};
use crate::transport::Transport;
use std::any::Any;
use std::sync::{Arc, Mutex};
use crate::query_stats::{QueryStats, QueryStatsProvider};
pub trait ScalarValue: Sized {
fn from_row(row: &Row, col: usize) -> Option<Self>;
}
impl ScalarValue for i64 {
fn from_row(row: &Row, col: usize) -> Option<Self> {
row.get_i64(col)
}
}
impl ScalarValue for i32 {
fn from_row(row: &Row, col: usize) -> Option<Self> {
row.get_i32(col)
}
}
impl ScalarValue for i16 {
fn from_row(row: &Row, col: usize) -> Option<Self> {
row.get_i16(col)
}
}
impl ScalarValue for f64 {
fn from_row(row: &Row, col: usize) -> Option<Self> {
row.get_f64(col)
}
}
impl ScalarValue for bool {
fn from_row(row: &Row, col: usize) -> Option<Self> {
row.get_bool(col)
}
}
impl ScalarValue for String {
fn from_row(row: &Row, col: usize) -> Option<Self> {
row.get_string(col)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum CreateMode {
#[default]
DoNotCreate,
Create,
CreateIfNotExists,
CreateAndReplace,
}
pub struct Connection {
transport: Transport,
database: Option<String>,
stats_provider: Option<Arc<dyn QueryStatsProvider>>,
pending_stats: Mutex<Option<(Box<dyn Any + Send>, String)>>,
}
impl std::fmt::Debug for Connection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Connection")
.field("database", &self.database)
.finish_non_exhaustive()
}
}
impl Connection {
pub fn new(
instance: &HyperProcess,
database_path: impl AsRef<Path>,
create_mode: CreateMode,
) -> Result<Self> {
if let Some(conn_endpoint) = instance.connection_endpoint() {
return Self::connect_with_endpoint(
conn_endpoint,
&database_path.as_ref().to_string_lossy(),
create_mode,
);
}
let endpoint = instance.require_endpoint()?;
Self::connect(
endpoint,
&database_path.as_ref().to_string_lossy(),
create_mode,
)
}
fn connect_with_endpoint(
endpoint: &hyperdb_api_core::client::ConnectionEndpoint,
database_path: &str,
create_mode: CreateMode,
) -> Result<Self> {
let db_path_str = Some(database_path.to_string());
let config = hyperdb_api_core::client::Config::new().with_user("tableau_internal_user");
let client = hyperdb_api_core::client::Client::connect_endpoint(endpoint, &config)?;
let conn = Connection::from_client(client, db_path_str.clone());
if let Some(db_path) = db_path_str {
conn.handle_creation_mode(&db_path, create_mode)?;
conn.attach_and_set_path(&db_path)?;
}
Ok(conn)
}
pub fn connect(endpoint: &str, database_path: &str, create_mode: CreateMode) -> Result<Self> {
crate::ConnectionBuilder::new(endpoint)
.database(database_path)
.create_mode(create_mode)
.build()
}
#[must_use]
pub fn builder(endpoint: &str) -> crate::ConnectionBuilder {
crate::ConnectionBuilder::new(endpoint)
}
pub(crate) fn from_client(client: Client, database: Option<String>) -> Self {
Connection {
transport: Transport::Tcp(Box::new(crate::transport::TcpTransport { client })),
database,
stats_provider: None,
pending_stats: Mutex::new(None),
}
}
#[allow(
dead_code,
reason = "used by ConnectionBuilder for the gRPC path; not reached under non-gRPC feature builds"
)]
pub(crate) fn from_transport(transport: Transport, database: Option<String>) -> Self {
Connection {
transport,
database,
stats_provider: None,
pending_stats: Mutex::new(None),
}
}
pub fn transport_type(&self) -> &'static str {
self.transport.transport_type().as_str()
}
pub fn supports_writes(&self) -> bool {
self.transport.supports_writes()
}
pub(crate) fn handle_creation_mode(
&self,
database_path: &str,
create_mode: CreateMode,
) -> Result<()> {
match create_mode {
CreateMode::DoNotCreate => {}
CreateMode::Create => {
self.execute_command(&format!(
"CREATE DATABASE {}",
escape_sql_path(database_path)
))?;
}
CreateMode::CreateIfNotExists => {
if let Err(e) = self.execute_command(&format!(
"CREATE DATABASE IF NOT EXISTS {}",
escape_sql_path(database_path)
)) {
if !is_already_exists_error(&e) {
return Err(Error::with_cause(
format!("Failed to create database '{database_path}': {e}"),
e,
));
}
}
}
CreateMode::CreateAndReplace => {
let _ = self.execute_command(&format!(
"DROP DATABASE IF EXISTS {}",
escape_sql_path(database_path)
));
self.execute_command(&format!(
"CREATE DATABASE {}",
escape_sql_path(database_path)
))?;
}
}
Ok(())
}
pub(crate) fn attach_and_set_path(&self, database_path: &str) -> Result<()> {
let db_alias = std::path::Path::new(database_path)
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("db");
self.execute_command(&format!(
"ATTACH DATABASE {} AS {}",
escape_sql_path(database_path),
escape_sql_path(db_alias)
))?;
self.execute_command(&format!(
"SET search_path TO {}, public",
escape_sql_path(db_alias)
))?;
Ok(())
}
pub fn connect_with_auth(
endpoint: &str,
database_path: &str,
create_mode: CreateMode,
user: &str,
password: &str,
) -> Result<Self> {
crate::ConnectionBuilder::new(endpoint)
.database(database_path)
.create_mode(create_mode)
.user(user.to_string())
.password(password)
.build()
}
pub fn without_database(endpoint: &str) -> Result<Self> {
crate::ConnectionBuilder::new(endpoint).build()
}
pub fn execute_command(&self, command: &str) -> Result<u64> {
let token = self.stats_before_query(command);
let result = self.transport.execute_command(command);
self.stats_store_pending(token, command);
result
}
pub fn execute_query(&self, query: &str) -> Result<Rowset<'_>> {
let token = self.stats_before_query(query);
let result = match &self.transport {
Transport::Tcp(tcp) => {
let stream = tcp
.client
.query_streaming(query, DEFAULT_BINARY_CHUNK_SIZE)?;
Ok(Rowset::new(stream))
}
Transport::Grpc(grpc) => {
let mut client =
hyperdb_api_core::client::grpc::GrpcClientSync::connect(grpc.config.clone())?;
let stream = client.execute_query_stream(query)?;
let source = Box::new(crate::grpc_connection::GrpcChunkStreamSource::new(stream));
let arrow_rowset = crate::arrow_result::ArrowRowset::from_stream(source)?;
Ok(Rowset::from_arrow(arrow_rowset))
}
};
self.stats_store_pending(token, query);
result
}
pub fn execute_query_to_arrow(&self, select_query: &str) -> Result<bytes::Bytes> {
self.transport.execute_query_to_arrow(select_query)
}
pub fn export_table_to_arrow(&self, table_name: &str) -> Result<bytes::Bytes> {
self.execute_query_to_arrow(&format!("SELECT * FROM {table_name}"))
}
pub fn execute_query_to_batches(
&self,
select_query: &str,
) -> Result<Vec<arrow::record_batch::RecordBatch>> {
let arrow_data = self.execute_query_to_arrow(select_query)?;
crate::arrow_result::parse_arrow_ipc(arrow_data)
}
pub fn fetch_one<Q>(&self, query: Q) -> Result<crate::Row>
where
Q: AsRef<str>,
{
let query = query.as_ref();
let result = self.execute_query(query)?;
result.require_first_row()
}
pub fn fetch_optional<Q>(&self, query: Q) -> Result<Option<crate::Row>>
where
Q: AsRef<str>,
{
let query = query.as_ref();
let result = self.execute_query(query)?;
result.first_row()
}
pub fn fetch_all<Q>(&self, query: Q) -> Result<Vec<crate::Row>>
where
Q: AsRef<str>,
{
let query = query.as_ref();
let result = self.execute_query(query)?;
result.collect_rows()
}
pub fn fetch_one_as<T: crate::FromRow>(&self, query: &str) -> Result<T> {
let row = self.fetch_one(query)?;
T::from_row(&row)
}
pub fn fetch_all_as<T: crate::FromRow>(&self, query: &str) -> Result<Vec<T>> {
let rows = self.fetch_all(query)?;
rows.iter().map(|r| T::from_row(r)).collect()
}
pub fn fetch_scalar<T, Q>(&self, query: Q) -> Result<T>
where
T: crate::connection::ScalarValue + crate::result::RowValue,
Q: AsRef<str>,
{
let query = query.as_ref();
let result = self.execute_query(query)?;
result.require_scalar()
}
pub fn fetch_optional_scalar<T, Q>(&self, query: Q) -> Result<Option<T>>
where
T: crate::connection::ScalarValue + crate::result::RowValue,
Q: AsRef<str>,
{
let query = query.as_ref();
let result = self.execute_query(query)?;
result.scalar()
}
#[inline]
pub fn execute_scalar_query<T>(&self, query: &str) -> Result<Option<T>>
where
T: ScalarValue + crate::result::RowValue,
{
self.fetch_optional_scalar(query)
}
pub fn query_count(&self, query: &str) -> Result<i64> {
self.fetch_optional_scalar::<i64, _>(query)
.map(|opt| opt.unwrap_or(0))
}
pub fn query_params(
&self,
query: &str,
params: &[&dyn crate::params::ToSqlParam],
) -> Result<Rowset<'_>> {
let client = match &self.transport {
Transport::Tcp(tcp) => &tcp.client,
Transport::Grpc(_) => {
return Err(Error::new(
"prepared statements are not supported over gRPC transport",
));
}
};
let oids: Vec<crate::Oid> = params.iter().map(|p| p.sql_oid()).collect();
let stmt = client.prepare_typed(query, &oids)?;
let encoded: Vec<Option<Vec<u8>>> = params.iter().map(|p| p.encode_param()).collect();
let stream =
client.execute_streaming(&stmt, encoded, crate::result::DEFAULT_BINARY_CHUNK_SIZE)?;
Ok(Rowset::from_prepared(stream).with_statement_guard(stmt))
}
pub fn command_params(
&self,
query: &str,
params: &[&dyn crate::params::ToSqlParam],
) -> Result<u64> {
let client = match &self.transport {
Transport::Tcp(tcp) => &tcp.client,
Transport::Grpc(_) => {
return Err(Error::new(
"prepared statements are not supported over gRPC transport",
));
}
};
let oids: Vec<crate::Oid> = params.iter().map(|p| p.sql_oid()).collect();
let stmt = client.prepare_typed(query, &oids)?;
let encoded: Vec<Option<Vec<u8>>> = params.iter().map(|p| p.encode_param()).collect();
Ok(client.execute_no_result(&stmt, encoded)?)
}
pub fn execute_batch(&self, statements: &[&str]) -> Result<u64> {
let mut total = 0u64;
for (i, stmt) in statements.iter().enumerate() {
if !stmt.trim().is_empty() {
total += self.execute_command(stmt).map_err(|e| {
let preview: String = stmt.chars().take(80).collect();
Error::with_cause(
format!(
"execute_batch failed at statement {} of {}: {}",
i + 1,
statements.len(),
preview,
),
e,
)
})?;
}
}
Ok(total)
}
pub fn database(&self) -> Option<&str> {
self.database.as_deref()
}
pub fn create_database(&self, path: &str) -> Result<()> {
let sql = format!("CREATE DATABASE IF NOT EXISTS {}", escape_sql_path(path));
self.execute_command(&sql)?;
Ok(())
}
pub fn drop_database(&self, path: &str) -> Result<()> {
let sql = format!("DROP DATABASE IF EXISTS {}", escape_sql_path(path));
self.execute_command(&sql)?;
Ok(())
}
pub fn attach_database(&self, path: &str, alias: Option<&str>) -> Result<()> {
let sql = if let Some(alias) = alias {
format!(
"ATTACH DATABASE {} AS {}",
escape_sql_path(path),
escape_sql_path(alias)
)
} else {
format!("ATTACH DATABASE {}", escape_sql_path(path))
};
self.execute_command(&sql)?;
Ok(())
}
pub fn detach_database(&self, alias: &str) -> Result<()> {
let sql = format!("DETACH DATABASE {}", escape_sql_path(alias));
self.execute_command(&sql)?;
Ok(())
}
pub fn detach_all_databases(&self) -> Result<()> {
self.execute_command("DETACH ALL DATABASES")?;
Ok(())
}
pub fn create_schema<T>(&self, schema_name: T) -> Result<()>
where
T: TryInto<crate::SchemaName>,
crate::Error: From<T::Error>,
{
crate::catalog::Catalog::new(self).create_schema(schema_name)
}
pub fn has_schema<T>(&self, schema: T) -> Result<bool>
where
T: TryInto<crate::SchemaName>,
crate::Error: From<T::Error>,
{
use crate::catalog::Catalog;
Catalog::new(self).has_schema(schema)
}
pub fn has_table<T>(&self, table_name: T) -> Result<bool>
where
T: TryInto<crate::TableName>,
crate::Error: From<T::Error>,
{
use crate::catalog::Catalog;
Catalog::new(self).has_table(table_name)
}
pub fn server_version(&self) -> Option<crate::ServerVersion> {
let version_str = self.parameter_status("server_version")?;
crate::ServerVersion::parse(&version_str)
}
pub fn copy_database(&self, source: &str, destination: &str) -> Result<()> {
let sql = format!(
"COPY DATABASE {} TO {}",
escape_sql_path(source),
escape_sql_path(destination)
);
self.execute_command(&sql)?;
Ok(())
}
pub fn explain(&self, query: &str) -> Result<String> {
let explain_sql = format!("EXPLAIN {query}");
let result = self.execute_query(&explain_sql)?;
let mut lines = Vec::new();
for row in result.rows() {
let row = row?;
if let Some(line) = row.get::<String>(0) {
lines.push(line);
}
}
Ok(lines.join("\n"))
}
pub fn explain_analyze(&self, query: &str) -> Result<String> {
let explain_sql = format!("EXPLAIN ANALYZE {query}");
let result = self.execute_query(&explain_sql)?;
let mut lines = Vec::new();
for row in result.rows() {
let row = row?;
if let Some(line) = row.get::<String>(0) {
lines.push(line);
}
}
Ok(lines.join("\n"))
}
pub fn tcp_client(&self) -> Option<&Client> {
match &self.transport {
Transport::Tcp(tcp) => Some(&tcp.client),
Transport::Grpc(_) => None,
}
}
pub(crate) fn transport(&self) -> &Transport {
&self.transport
}
pub fn prepare(&self, query: &str) -> Result<crate::PreparedStatement<'_>> {
self.prepare_typed(query, &[])
}
pub fn prepare_typed(
&self,
query: &str,
param_types: &[crate::Oid],
) -> Result<crate::PreparedStatement<'_>> {
let client = match &self.transport {
Transport::Tcp(tcp) => &tcp.client,
Transport::Grpc(_) => {
return Err(Error::new(
"prepared statements are not supported over gRPC transport",
));
}
};
let inner = client.prepare_typed(query, param_types)?;
crate::PreparedStatement::new(self, inner)
}
pub fn is_alive(&self) -> bool {
match &self.transport {
Transport::Tcp(tcp) => tcp.client.is_alive(),
Transport::Grpc(_) => true, }
}
pub fn ping(&self) -> Result<()> {
self.execute_command("SELECT 1")?;
Ok(())
}
pub fn process_id(&self) -> i32 {
match &self.transport {
Transport::Tcp(tcp) => tcp.client.process_id(),
Transport::Grpc(_) => 0,
}
}
pub fn secret_key(&self) -> i32 {
match &self.transport {
Transport::Tcp(tcp) => tcp.client.secret_key(),
Transport::Grpc(_) => 0,
}
}
pub fn parameter_status(&self, name: &str) -> Option<String> {
match &self.transport {
Transport::Tcp(tcp) => tcp.client.parameter_status(name),
Transport::Grpc(_) => None, }
}
pub fn set_notice_receiver(
&mut self,
receiver: Option<hyperdb_api_core::client::NoticeReceiver>,
) {
match &mut self.transport {
Transport::Tcp(tcp) => tcp.client.set_notice_receiver(receiver),
Transport::Grpc(_) => {} }
}
pub fn cancel(&self) -> Result<()> {
match &self.transport {
Transport::Tcp(tcp) => tcp.client.cancel().map_err(Error::from),
Transport::Grpc(_) => Err(Error::new(
"Query cancellation is not yet supported for gRPC connections.",
)),
}
}
pub fn close(self) -> Result<()> {
let detach_err = if let Some(ref db_path) = self.database {
let db_alias = std::path::Path::new(db_path)
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("db");
self.execute_command(&format!("DETACH DATABASE {}", escape_sql_path(db_alias)))
.err()
} else {
None
};
let close_result = match self.transport {
Transport::Tcp(tcp) => tcp.client.close(),
Transport::Grpc(_) => Ok(()), };
if let Err(e) = close_result {
return Err(Error::with_cause("Failed to close connection", e));
}
if let Some(e) = detach_err {
return Err(Error::with_cause(
"Failed to detach database during close",
e,
));
}
Ok(())
}
pub fn unload_database(&self) -> Result<()> {
self.execute_command("UNLOAD DATABASE")?;
Ok(())
}
pub fn unload_release(&self) -> Result<()> {
self.execute_command("UNLOAD RELEASE")?;
Ok(())
}
pub fn enable_query_stats(&mut self, provider: impl QueryStatsProvider + 'static) {
self.stats_provider = Some(Arc::new(provider));
}
pub fn disable_query_stats(&mut self) {
self.stats_provider = None;
if let Ok(mut guard) = self.pending_stats.lock() {
*guard = None;
}
}
pub fn last_query_stats(&self) -> Option<QueryStats> {
let provider = self.stats_provider.as_ref()?;
let mut guard = self.pending_stats.lock().ok()?;
let (token, sql) = guard.take()?;
provider.after_query(token, &sql)
}
fn stats_before_query(&self, sql: &str) -> Option<Box<dyn Any + Send>> {
self.stats_provider.as_ref().map(|p| p.before_query(sql))
}
fn stats_store_pending(&self, token: Option<Box<dyn Any + Send>>, sql: &str) {
if let Some(token) = token {
if let Ok(mut guard) = self.pending_stats.lock() {
*guard = Some((token, sql.to_string()));
}
}
}
}
impl Connection {
pub fn begin_transaction(&self) -> Result<()> {
self.execute_command("BEGIN TRANSACTION")?;
Ok(())
}
pub fn commit(&self) -> Result<()> {
self.execute_command("COMMIT")?;
Ok(())
}
pub fn rollback(&self) -> Result<()> {
self.execute_command("ROLLBACK")?;
Ok(())
}
pub fn transaction(&mut self) -> Result<crate::Transaction<'_>> {
crate::Transaction::new(self)
}
}
fn is_already_exists_error(err: &Error) -> bool {
err.sqlstate()
.is_some_and(|code| matches!(code, "42P04" | "42710" | "42P06" | "42P07"))
}